Postgres:插入语句后可以执行代码吗?
这听起来可能有点奇怪,但我很好奇在Postgres数据库中,是否可以在执行INSERT语句后运行一段代码块?
具体来说,我想知道在pg数据库执行INSERT语句之后,能否运行Python代码。
1 个回答
有几种方法可以解决这个问题。
监听/通知
最简单的方法是使用 PostgreSQL 的 通知功能。
你可以在插入或更新数据后添加一个触发器,这个触发器会发送通知:
CREATE OR REPLACE FUNCTION on_insert() RETURNS trigger AS
$$
BEGIN
execute E'NOTIFY ENTITY_CHANGE, \'' || NEW.id || E'\'';
RETURN NEW;
END
$$
LANGUAGE 'plpgsql' VOLATILE;
create trigger trig_on_insert
after insert on ENTITY
for each row
execute procedure on_insert_to_t();
ENTITY_CHANGE
是你可以自定义的频道标识符,随便取个名字都可以。
然后你的应用程序应该在一个单独的线程(或进程)中 监听 这个通知,并执行相应的操作:
from django.db import connection
curs = connection.cursor()
curs.execute("LISTEN ENTITY_CHANGED;")
while not_finish:
if select.select([connection],[],[],5) == ([],[],[]):
print "Timeout"
else:
connection.poll()
while connection.notifies:
notify = connection.notifies.pop()
entity_id = notify.payload
do_post_save(entity_id)
需要注意的是,通知不是事务性的,如果发生严重故障,通知可能会丢失。比如说,如果你的应用程序收到了通知,但在处理通知之前崩溃了(或者被强制关闭),那么这个通知就会永远丢失。
如果你需要确保保存后的处理总是能执行,你需要维护一个任务表。在插入或更新数据后,触发器应该将任务添加到这个表中,然后某个 Python 进程应该定期检查这个表并执行所需的处理。缺点是这种定期检查会在系统没有保存数据时进行不必要的查询。
你可以将这两种方法结合起来,既使用通知来启动处理,又让处理程序从触发器填充的任务表中获取任务。在你的应用程序启动时,也应该运行处理未完成工作的程序(如果有的话)。
Django pgpubsub 库正是实现了这种方法,并提供了一个相对简单的声明式 API,允许在 Django 模型发生变化时执行回调:
# this defines a postgres channel that is used to send notifications
# and a trigger that does NOTIFY on MyModel change
@dataclass
class MyModelTriggerChannel(TriggerChannel):
model = MyModel
# This defines a callback to be invoked on MyModel chagne
@pgpubsub.post_update_listener(MyModelTriggerChannel)
def on_my_model_update(old: MyModel, new: MyModel):
# use new variable to access updated model data
...
逻辑复制
更好且更可靠的方法是使用 逻辑复制。
这个选项直接使用 事务日志,并且消费者会确认收到的变更通知,这样就不会错过任何通知,交付也能更可靠。
为了演示这个,我这里使用了一个 预配置的镜像,它已经为逻辑复制做好了设置,并安装了 wal2json 插件用于 WAL 解码:
docker run -d --name "logical" -e POSTGRES_PASSWORD=123 -p 10000:5432 -d debezium/postgres:14
下面是消费者的一个示例:
import psycopg2
from psycopg2.errors import UndefinedObject
from psycopg2.extras import LogicalReplicationConnection
my_connection = psycopg2.connect(
"dbname='postgres' host='localhost' port='10000' user='postgres' password='123'",
connection_factory=LogicalReplicationConnection,
)
cur = my_connection.cursor()
try:
cur.drop_replication_slot("wal2json_test_slot")
except UndefinedObject:
pass
cur.create_replication_slot("wal2json_test_slot", output_plugin="wal2json")
cur.start_replication(
slot_name="wal2json_test_slot", options={"pretty-print": 1}, decode=True
)
def consume(msg):
print(msg.payload)
msg.cursor.send_feedback(flush_lsn=msg.data_start)
cur.consume_stream(consume)
现在执行像 insert into table1 values (1, 'hello')
这样的插入操作会产生以下结果:
{
"change": [
{
"kind": "insert",
"schema": "public",
"table": "table1",
"columnnames": ["i", "t"],
"columntypes": ["integer", "text"],
"columnvalues": [1, "hello"]
}
]
}
这个方法的缺点是你会收到数据库中的 所有 变更,并且需要自己过滤和解码数据(我不知道有没有库可以让这对你来说变得简单)。