使用Python多进程生成和合并数据

2 投票
1 回答
926 浏览
提问于 2025-04-15 21:43

我有一份初始数据列表。我想对这些初始数据应用一个函数,这个函数会为每个初始数据生成一些新的数据。有些新数据是重复的,我想把这些重复的去掉。

简单来说,顺序执行的版本大致是这样的:

def create_new_data_for(datum):
    """make a list of new data from some old datum"""
    return [datum.modified_copy(k) for k in datum.k_list]

data = [some list of data] #some data to start with

#generate a list of new data from the old data, we'll reduce it next
newdata = []
for d in data:
    newdata.extend(create_new_data_for(d))

#now reduce the data under ".matches(other)"
reduced = []
for d in newdata:
    for seen in reduced:
        if d.matches(seen):
            break
    #so we haven't seen anything like d yet
    seen.append(d)

#now reduced is finished and is what we want!

我想通过多进程来加快这个过程。

我在想可以用一个 multiprocessing.Queue 来处理数据生成。每个进程只需要把它生成的数据放到这个队列里,当进程们在减少数据的时候,就可以从队列中获取数据。

但是我不太确定如何让不同的进程安全地循环处理这些减少后的数据,并进行修改,而不出现竞争条件或其他问题。

有什么好的方法可以安全地做到这一点吗?或者有没有更好的方法来实现这个目标?

1 个回答

1

我会使用一个多进程锁(和线程锁类似),这个锁是标准库里提供的。

这里有一个来自标准文档的例子

from multiprocessing import Process, Lock

def f(l, i):
    l.acquire()
    print 'hello world', i
    l.release()

if __name__ == '__main__':
    lock = Lock()

    for num in range(10):
        Process(target=f, args=(lock, num)).start()

为了避免竞争条件,确保在进行任何修改之前先调用“mylock.acquire()”,完成后再调用“mylock.release()”。

撰写回答