Python多进程. 同时结束所有进程

0 投票
1 回答
884 浏览
提问于 2025-04-18 13:40

我的目标是把一些数据上传到数据库。我正在使用psycopg2这个库,它有一个规则:所有的进程必须有自己的数据库连接。在我的情况下,这意味着我必须在工作进程中提交数据。问题是,我只能在所有进程完成SQL插入命令后才能提交。我的需求是:

from multiprocessing import Process

def worker1(s):    
    conn = psycopg2.connect("dbname=mydb user=postgres")
    cursor = conn.cursor()

    pg_cursor.execute(
            """ insert into "MyTable1"("Column1")
                values(%s)""", [1])

    #wait all processes
    conn.commit()       

def worker2(s):    
    conn = psycopg2.connect("dbname=mydb user=postgres")
    cursor = conn.cursor()

    pg_cursor.execute(
            """ insert into "MyTable2"("Column1")
                values(%s)""", [1])

    #wait all processes
    conn.commit()

if __name__ == '__main__':
    p1 = Process(target=worker1)
    p2 = Process(target=worker2)
    p1.start()
    p2.start()

我该如何让所有进程等到SQL命令完成后再继续?这样做的正确方法是什么?这个SQL插入只是个例子,实际上我需要插入数百万条记录。

1 个回答

0

你可以使用一对 multiprocessing.Event 对象,让两个工作者可以互相通知对方自己完成了工作,同时也可以强制让他们等待对方的信号。

from multiprocessing import Process, Event

def worker1(s, my_e, other_e):    
    conn = psycopg2.connect("dbname=mydb user=postgres")
    cursor = conn.cursor()

    pg_cursor.execute(
            """ insert into "MyTable1"("Column1")
                values(%s)""", [1])

    #wait all processes
    my_e.set()
    other_e.wait()
    conn.commit()       

def worker2(s, my_e, other_e):    
    conn = psycopg2.connect("dbname=mydb user=postgres")
    cursor = conn.cursor()

    pg_cursor.execute(
            """ insert into "MyTable2"("Column1")
                values(%s)""", [1])

    #wait all processes
    my_e.set()
    other_e.wait()
    conn.commit()

if __name__ == '__main__':
    e1 = Event()
    e2 = Event()
    p1 = Process(target=worker1, args=(e1, e2))
    p2 = Process(target=worker2, args=(e2, e1))
    p1.start()
    p2.start()
    p1.join()
    p2.join()

不过要小心,如果其中一个工作者因为某种原因出现问题,可能会导致死锁。你可以考虑给 Event.wait 的可选参数 timeout 设置一个比较大的值,如果超时了就回退。或者在每个工作者中使用 try/except,确保 my_e 能够调用 set(),同时在两个工作者之间共享一个变量,用来告诉对方是否发生了故障。

撰写回答