Python:如何在ProcessPoolExecutor中使用外部队列?

7 投票
2 回答
5006 浏览
提问于 2025-04-20 20:34

我最近刚开始使用Python的多线程和多进程功能。

我尝试写一段代码,使用生产者/消费者的方式,从一个JSON日志文件中读取数据块,把这些数据块作为事件放入一个队列中,然后启动一组进程,从这个队列中获取事件(文件块)并处理每一个,最后打印出结果。

我的想法是先启动这些进程,让它们等待事件进入队列。

我现在使用的代码看起来可以工作,是从我找到的一些例子中拼凑而来的:

import re, sys
from multiprocessing import Process, Queue

def process(file, chunk):
    f = open(file, "rb")
    f.seek(chunk[0])
    for entry in pat.findall(f.read(chunk[1])):
        print(entry)

def getchunks(file, size=1024*1024):
    f = open(file, "rb")
    while True:
        start = f.tell()
        f.seek(size, 1)
        s = f.readline() # skip forward to next line ending
        yield start, f.tell() - start
        if not s:
            break

def processingChunks(queue):
    while True:
        queueEvent = queue.get()
        if (queueEvent == None):
            queue.put(None)
            break
        process(queueEvent[0], queueEvent[1])

if __name__ == "__main__":
    testFile = "testFile.json"
    pat = re.compile(r".*?\n")
    queue = Queue()

    for w in xrange(6):
        p = Process(target=processingChunks, args=(queue,))
        p.start()

    for chunk in getchunks(testFile):
        queue.put((testFile, chunk))
        print(queue.qsize())
    queue.put(None)

不过,我想学习如何使用concurrent.futures中的ProcessPoolExecutor,以异步的方式实现相同的结果,使用Future结果对象。

我第一次尝试是使用一个外部队列,这个队列是用multiprocessing的Manager创建的,我打算把它传递给进程进行轮询。

但是这似乎不太奏效,我觉得这可能不是ProcessPoolExecutor设计的用法,因为它似乎使用了自己的内部队列。

我使用了这段代码:

import concurrent
from concurrent.futures import as_completed
import re, sys
from multiprocessing import Lock, Process, Queue, current_process, Pool, Manager

def process(file, chunk):
    entries = []
    f = open(file, "rb")
    f.seek(chunk[0])
    for entry in pat.findall(f.read(chunk[1])):
        entries.append(entry)
        return entries

def getchunks(file, size=1024*1024):
    f = open(file, "rb")
    while True:
        start = f.tell()
        f.seek(size, 1)
        s = f.readline() # skip forward to next line ending
        yield start, f.tell() - start
        if not s:
            break

def processingChunks(queue):
    while True:
        queueEvent = queue.get()
        if (queueEvent == None):
            queue.put(None)
            break
        return process(queueEvent[0], queueEvent[1])

if __name__ == "__main__":
    testFile = "testFile.json"
    pat = re.compile(r".*?\n")
    procManager = Manager()
    queue = procManager.Queue()

    with concurrent.futures.ProcessPoolExecutor(max_workers = 6) as executor:
        futureResults = []
        for i in range(6):
            future_result = executor.submit(processingChunks, queue)
            futureResults.append(future_result)

        for complete in as_completed(futureResults):
            res = complete.result()
            for i in res:
                print(i)


    for chunk in getchunks(testFile):
        queue.put((testFile, chunk))
        print(queue.qsize())
    queue.put(None)

我无法得到任何结果,所以显然我做错了什么,可能是对这个概念理解得不够透彻。

你们能帮我理解一下我该如何实现这个吗?

2 个回答

2

感谢Blckknght的回复,让我找到了正确的方向。下面是我最初问题的一个可能解决方案:

#!/usr/bin/python
import concurrent
from concurrent.futures import as_completed
import re, sys

def process(event):
    entries = []
    fl = event[0]
    chunk = event[1]
    pat = event[2]
    f = open(fl, "rb")
    f.seek(chunk[0])
    for entry in pat.findall(f.read(chunk[1])):
       entries.append(entry)
    return entries

def getchunks(file, pat, size=1024*1024):
    f = open(file, "rb")
    while True:
        start = f.tell()
        f.seek(size, 1)
        s = f.readline() # skip forward to next line ending
        yield (file, (start, f.tell() - start), pat)
        if not s:
            break

if __name__ == "__main__":
    testFile = "testFile.json"
    pat = re.compile(r".*?\n")
    results = []

    with concurrent.futures.ProcessPoolExecutor() as executor:
        for res in (executor.submit(process, event) for event in getchunks(testFile, pat)):
            results.append(res)

    for complete in as_completed(results):
        for entry in complete.result():
            print('Event result: %s' % entry)    
1

如果你在使用 ProcessPoolExecutor,那么你根本不需要你的 processingChunks 函数,也不需要从 multiprocessing 导入的那些东西。这个池子基本上自动完成了你之前函数的工作。相反,你可以用下面这样的方式一次性安排和分配所有的工作:

with concurrent.futures.ProcessPoolExecutor(max_workers = 6) as executor:
    executor.map(process, itertools.repeat(testFile), getchunks(testFile))

我不太明白你原来的代码是怎么工作的,因为 pat 并没有作为 process 的参数(我本以为每个工作进程都会因为 NameError 异常而失败)。如果这真的是个问题(而不是你示例代码的一个小错误),你可能需要稍微修改一下,把它和 file 以及 chunk 一起传递给工作进程(itertools.repeat 可能会派上用场)。

撰写回答