Python多进程. 同时结束所有进程
我的目标是把一些数据上传到数据库。我正在使用psycopg2这个库,它有一个规则:所有的进程必须有自己的数据库连接。在我的情况下,这意味着我必须在工作进程中提交数据。问题是,我只能在所有进程完成SQL插入命令后才能提交。我的需求是:
from multiprocessing import Process
def worker1(s):
conn = psycopg2.connect("dbname=mydb user=postgres")
cursor = conn.cursor()
pg_cursor.execute(
""" insert into "MyTable1"("Column1")
values(%s)""", [1])
#wait all processes
conn.commit()
def worker2(s):
conn = psycopg2.connect("dbname=mydb user=postgres")
cursor = conn.cursor()
pg_cursor.execute(
""" insert into "MyTable2"("Column1")
values(%s)""", [1])
#wait all processes
conn.commit()
if __name__ == '__main__':
p1 = Process(target=worker1)
p2 = Process(target=worker2)
p1.start()
p2.start()
我该如何让所有进程等到SQL命令完成后再继续?这样做的正确方法是什么?这个SQL插入只是个例子,实际上我需要插入数百万条记录。
1 个回答
0
你可以使用一对 multiprocessing.Event
对象,让两个工作者可以互相通知对方自己完成了工作,同时也可以强制让他们等待对方的信号。
from multiprocessing import Process, Event
def worker1(s, my_e, other_e):
conn = psycopg2.connect("dbname=mydb user=postgres")
cursor = conn.cursor()
pg_cursor.execute(
""" insert into "MyTable1"("Column1")
values(%s)""", [1])
#wait all processes
my_e.set()
other_e.wait()
conn.commit()
def worker2(s, my_e, other_e):
conn = psycopg2.connect("dbname=mydb user=postgres")
cursor = conn.cursor()
pg_cursor.execute(
""" insert into "MyTable2"("Column1")
values(%s)""", [1])
#wait all processes
my_e.set()
other_e.wait()
conn.commit()
if __name__ == '__main__':
e1 = Event()
e2 = Event()
p1 = Process(target=worker1, args=(e1, e2))
p2 = Process(target=worker2, args=(e2, e1))
p1.start()
p2.start()
p1.join()
p2.join()
不过要小心,如果其中一个工作者因为某种原因出现问题,可能会导致死锁。你可以考虑给 Event.wait
的可选参数 timeout
设置一个比较大的值,如果超时了就回退。或者在每个工作者中使用 try
/except
,确保 my_e
能够调用 set()
,同时在两个工作者之间共享一个变量,用来告诉对方是否发生了故障。