在多线程Python应用中,如何通过SQLAlchemy在Postgres中插入多行?

1 投票
1 回答
58 浏览
提问于 2025-04-14 16:08

我正在使用SQLAlchemy与Postgres数据库进行交互。有时候,当我尝试在表中插入数据时,会出现UniqueViolation异常,尤其是在请求频率最高的时候。

这是我代码的一部分:

class Prediction(Base):
    __tablename__ = "predictions"

    id = Column(Integer, primary_key=True, index=True, unique=True, autoincrement=True)
    # and much more columns

# ...

db = sessionmaker(autocommit=False, autoflush=False, bind=engine)

# ...

def predict(image_id: int, image_key: str):
    result = detector.detect(image_key, classes)

    for i in result:
        db_prediction = models.Prediction(
            # pass all params except id
        )
        db.add(db_prediction)
    db.commit()  # exception raising at this line

我明白问题出在我尝试插入的行中id列的值不是唯一的。我正在想,怎样才能更好地捕捉到这样的异常,并插入行,而不丢失函数的结果。

我知道有rollback()这个方法,但我该如何将它与try ... catch结合使用,并插入一个正确的id呢?

更新:异常的追踪信息:

[2024-03-14 14:13:13,222: ERROR/MainProcess] Task project.worker.gpu_worker.predict_local[7cb690c2-bfc0-483f-943f-ffc673b68875] raised unexpected: IntegrityError('(psycopg2.errors.UniqueViolation) duplicate key value violates unique constraint "predictions_pkey"\nDETAIL:  Key (id)=(13) already exists.\n')
Traceback (most recent call last):
  File "/home/user/project/venv/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1970, in _exec_single_context
    self.dialect.do_execute(
  File "/home/user/project/venv/lib/python3.10/site-packages/sqlalchemy/engine/default.py", line 924, in do_execute
    cursor.execute(statement, parameters)
psycopg2.errors.UniqueViolation: duplicate key value violates unique constraint "predictions_pkey"
DETAIL:  Key (id)=(13) already exists.


The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/user/project/venv/lib/python3.10/site-packages/celery/app/trace.py", line 477, in trace_task
    R = retval = fun(*args, **kwargs)
  File "/home/user/project/venv/lib/python3.10/site-packages/celery/app/trace.py", line 760, in __protected_call__
    return self.run(*args, **kwargs)
  File "/home/user/project/venv/lib/python3.10/site-packages/neurospector/worker/gpu_worker.py", line 54, in predict_local
    db.commit()
  File "/home/user/project/venv/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 1972, in commit
    trans.commit(_to_root=True)
  File "<string>", line 2, in commit
  File "/home/user/project/venv/lib/python3.10/site-packages/sqlalchemy/orm/state_changes.py", line 139, in _go
    ret_value = fn(self, *arg, **kw)
  File "/home/user/project/venv/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 1257, in commit
    self._prepare_impl()
  File "<string>", line 2, in _prepare_impl
  File "/home/user/project/venv/lib/python3.10/site-packages/sqlalchemy/orm/state_changes.py", line 139, in _go
    ret_value = fn(self, *arg, **kw)
  File "/home/user/project/venv/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 1232, in _prepare_impl
    self.session.flush()
  File "/home/user/project/venv/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 4296, in flush
    self._flush(objects)
  File "/home/user/project/venv/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 4431, in _flush
    with util.safe_reraise():
  File "/home/user/project/venv/lib/python3.10/site-packages/sqlalchemy/util/langhelpers.py", line 146, in __exit__
    raise exc_value.with_traceback(exc_tb)
  File "/home/user/project/venv/lib/python3.10/site-packages/sqlalchemy/orm/session.py", line 4392, in _flush
    flush_context.execute()
  File "/home/user/project/venv/lib/python3.10/site-packages/sqlalchemy/orm/unitofwork.py", line 466, in execute
    rec.execute(self)
  File "/home/user/project/venv/lib/python3.10/site-packages/sqlalchemy/orm/unitofwork.py", line 642, in execute
    util.preloaded.orm_persistence.save_obj(
  File "/home/user/project/venv/lib/python3.10/site-packages/sqlalchemy/orm/persistence.py", line 93, in save_obj
    _emit_insert_statements(
  File "/home/user/project/venv/lib/python3.10/site-packages/sqlalchemy/orm/persistence.py", line 1048, in _emit_insert_statements
    result = connection.execute(
  File "/home/user/project/venv/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1421, in execute
    return meth(
  File "/home/user/project/venv/lib/python3.10/site-packages/sqlalchemy/sql/elements.py", line 514, in _execute_on_connection
    return connection._execute_clauseelement(
  File "/home/user/project/venv/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1643, in _execute_clauseelement
    ret = self._execute_context(
  File "/home/user/project/venv/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1849, in _execute_context
    return self._exec_single_context(
  File "/home/user/project/venv/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1989, in _exec_single_context
    self._handle_dbapi_exception(
  File "/home/user/project/venv/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 2356, in _handle_dbapi_exception
    raise sqlalchemy_exception.with_traceback(exc_info[2]) from e
  File "/home/user/project/venv/lib/python3.10/site-packages/sqlalchemy/engine/base.py", line 1970, in _exec_single_context
    self.dialect.do_execute(
  File "/home/user/project/venv/lib/python3.10/site-packages/sqlalchemy/engine/default.py", line 924, in do_execute
    cursor.execute(statement, parameters)
sqlalchemy.exc.IntegrityError: (psycopg2.errors.UniqueViolation) duplicate key value violates unique constraint "predictions_pkey"
DETAIL:  Key (id)=(13) already exists.

[SQL: INSERT INTO predictions (id, image_id, x1, y1, x2, y2, class_id, confidence) VALUES (%(id)s, %(image_id)s, %(x1)s, %(y1)s, %(x2)s, %(y2)s, %(class_id)s, %(confidence)s)]
[parameters: {'id': 13, 'image_id': 5727, 'x1': 852.0, 'y1': 136.0, 'x2': 964.0, 'y2': 534.0, 'class_id': 1, 'confidence': 0.264892578125}]
(Background on this error at: https://sqlalche.me/e/20/gkpj)

SQLAlchemy的版本是2.0.28

表的DDL(数据定义语言):

--
-- PostgreSQL database dump
--

-- Dumped from database version 14.11 (Ubuntu 14.11-0ubuntu0.22.04.1)
-- Dumped by pg_dump version 14.11 (Ubuntu 14.11-0ubuntu0.22.04.1)

SET statement_timeout = 0;
SET lock_timeout = 0;
SET idle_in_transaction_session_timeout = 0;
SET client_encoding = 'UTF8';
SET standard_conforming_strings = on;
SELECT pg_catalog.set_config('search_path', '', false);
SET check_function_bodies = false;
SET xmloption = content;
SET client_min_messages = warning;
SET row_security = off;

SET default_tablespace = '';

SET default_table_access_method = heap;

--
-- Name: predictions; Type: TABLE; Schema: public; Owner: user
--

CREATE TABLE public.predictions (
    id integer NOT NULL,
    image_id integer,
    x1 integer,
    y1 integer,
    x2 integer,
    y2 integer,
    class_id integer,
    confidence double precision
);


ALTER TABLE public.predictions OWNER TO user;

--
-- Name: predictions_id_seq; Type: SEQUENCE; Schema: public; Owner: user
--

CREATE SEQUENCE public.predictions_id_seq
    AS integer
    START WITH 1
    INCREMENT BY 1
    NO MINVALUE
    NO MAXVALUE
    CACHE 1;


ALTER TABLE public.predictions_id_seq OWNER TO user;

--
-- Name: predictions_id_seq; Type: SEQUENCE OWNED BY; Schema: public; Owner: user
--

ALTER SEQUENCE public.predictions_id_seq OWNED BY public.predictions.id;


--
-- Name: predictions id; Type: DEFAULT; Schema: public; Owner: user
--

ALTER TABLE ONLY public.predictions ALTER COLUMN id SET DEFAULT nextval('public.predictions_id_seq'::regclass);


--
-- Name: predictions predictions_pkey; Type: CONSTRAINT; Schema: public; Owner: user
--

ALTER TABLE ONLY public.predictions
    ADD CONSTRAINT predictions_pkey PRIMARY KEY (id);


--
-- Name: ix_predictions_class_id; Type: INDEX; Schema: public; Owner: user
--

CREATE INDEX ix_predictions_class_id ON public.predictions USING btree (class_id);


--
-- Name: ix_predictions_confidence; Type: INDEX; Schema: public; Owner: user
--

CREATE INDEX ix_predictions_confidence ON public.predictions USING btree (confidence);


--
-- Name: ix_predictions_id; Type: INDEX; Schema: public; Owner: user
--

CREATE UNIQUE INDEX ix_predictions_id ON public.predictions USING btree (id);


--
-- Name: ix_predictions_image_id; Type: INDEX; Schema: public; Owner: user
--

CREATE INDEX ix_predictions_image_id ON public.predictions USING btree (image_id);


--
-- Name: predictions predictions_class_id_fkey; Type: FK CONSTRAINT; Schema: public; Owner: user
--

ALTER TABLE ONLY public.predictions
    ADD CONSTRAINT predictions_class_id_fkey FOREIGN KEY (class_id) REFERENCES public.classes(id);


--
-- Name: predictions predictions_image_id_fkey; Type: FK CONSTRAINT; Schema: public; Owner: user
--

ALTER TABLE ONLY public.predictions
    ADD CONSTRAINT predictions_image_id_fkey FOREIGN KEY (image_id) REFERENCES public.images(id);


--
-- PostgreSQL database dump complete
--

1 个回答

0

感谢 Marmite Bombersnakecharmerb 的评论,这就是答案:

class Prediction(Base):
__tablename__ = "predictions"

id = Column("id", Integer, Identity(start=1, cycle=False), primary_key=True, index=True, unique=True)

问题在于我以为 autoincrement 会像 Identity 一样工作。

撰写回答