Postgres:插入语句后可以执行代码吗?

2 投票
1 回答
1668 浏览
提问于 2025-04-18 16:38

这听起来可能有点奇怪,但我很好奇在Postgres数据库中,是否可以在执行INSERT语句后运行一段代码块?

具体来说,我想知道在pg数据库执行INSERT语句之后,能否运行Python代码。

1 个回答

4

有几种方法可以解决这个问题。

监听/通知

最简单的方法是使用 PostgreSQL 的 通知功能

你可以在插入或更新数据后添加一个触发器,这个触发器会发送通知:

CREATE OR REPLACE FUNCTION on_insert() RETURNS trigger AS
$$
BEGIN

    execute E'NOTIFY ENTITY_CHANGE, \'' || NEW.id || E'\'';

    RETURN NEW;
END
$$
LANGUAGE 'plpgsql' VOLATILE;

create trigger trig_on_insert
after insert on ENTITY
for each row
execute procedure on_insert_to_t();

ENTITY_CHANGE 是你可以自定义的频道标识符,随便取个名字都可以。

然后你的应用程序应该在一个单独的线程(或进程)中 监听 这个通知,并执行相应的操作:

from django.db import connection

curs = connection.cursor()
curs.execute("LISTEN ENTITY_CHANGED;")

while not_finish:
    if select.select([connection],[],[],5) == ([],[],[]):
        print "Timeout"
    else:
        connection.poll()
        while connection.notifies:
            notify = connection.notifies.pop()
            entity_id = notify.payload
            do_post_save(entity_id)

需要注意的是,通知不是事务性的,如果发生严重故障,通知可能会丢失。比如说,如果你的应用程序收到了通知,但在处理通知之前崩溃了(或者被强制关闭),那么这个通知就会永远丢失。

如果你需要确保保存后的处理总是能执行,你需要维护一个任务表。在插入或更新数据后,触发器应该将任务添加到这个表中,然后某个 Python 进程应该定期检查这个表并执行所需的处理。缺点是这种定期检查会在系统没有保存数据时进行不必要的查询。

你可以将这两种方法结合起来,既使用通知来启动处理,又让处理程序从触发器填充的任务表中获取任务。在你的应用程序启动时,也应该运行处理未完成工作的程序(如果有的话)。

Django pgpubsub 库正是实现了这种方法,并提供了一个相对简单的声明式 API,允许在 Django 模型发生变化时执行回调:

# this defines a postgres channel that is used to send notifications
# and a trigger that does NOTIFY on MyModel change
@dataclass
class MyModelTriggerChannel(TriggerChannel):
    model = MyModel

# This defines a callback to be invoked on MyModel chagne
@pgpubsub.post_update_listener(MyModelTriggerChannel)
def on_my_model_update(old: MyModel, new: MyModel):
   # use new variable to access updated model data
   ...

逻辑复制

更好且更可靠的方法是使用 逻辑复制

这个选项直接使用 事务日志,并且消费者会确认收到的变更通知,这样就不会错过任何通知,交付也能更可靠。

为了演示这个,我这里使用了一个 预配置的镜像,它已经为逻辑复制做好了设置,并安装了 wal2json 插件用于 WAL 解码:

docker run -d --name "logical" -e POSTGRES_PASSWORD=123 -p 10000:5432 -d debezium/postgres:14

下面是消费者的一个示例:

import psycopg2
from psycopg2.errors import UndefinedObject
from psycopg2.extras import LogicalReplicationConnection

my_connection = psycopg2.connect(
    "dbname='postgres' host='localhost' port='10000' user='postgres' password='123'",
    connection_factory=LogicalReplicationConnection,
)
cur = my_connection.cursor()
try:
    cur.drop_replication_slot("wal2json_test_slot")
except UndefinedObject:
    pass
cur.create_replication_slot("wal2json_test_slot", output_plugin="wal2json")
cur.start_replication(
    slot_name="wal2json_test_slot", options={"pretty-print": 1}, decode=True
)

def consume(msg):
    print(msg.payload)
    msg.cursor.send_feedback(flush_lsn=msg.data_start)

cur.consume_stream(consume)

现在执行像 insert into table1 values (1, 'hello') 这样的插入操作会产生以下结果:

{
    "change": [
        {
            "kind": "insert",
            "schema": "public",
            "table": "table1",
            "columnnames": ["i", "t"],
            "columntypes": ["integer", "text"],
            "columnvalues": [1, "hello"]
        }
    ]
}

这个方法的缺点是你会收到数据库中的 所有 变更,并且需要自己过滤和解码数据(我不知道有没有库可以让这对你来说变得简单)。

撰写回答