适用于背景处理的方法

2024-05-16 22:13:43 发布

您现在位置:Python中文网/ 问答频道 /正文

我对Python3中的多进程编程一无所知,我正在尝试找到解决问题的最佳方法。 我有一个具有以下结构的主要功能:

  • 初始阶段

  • while循环,收集T时间的数据

  • 以最快的方式处理收集的数据,以缩短未收集数据的时间

  • 回到聚会

我的想法是将数据收集放在一个单独的进程中,该进程也可以在处理之前收集的数据时运行。这样,没有收集数据时的时间窗口可以短到将数据从收集过程移动到分析器

我在谷歌上搜索了如何做到这一点,以及如何在收集过程之间共享数据,如何向过程发出我需要数据的信号,以及如何获取收集到的数据,但我找到了很多方法(队列、管理器、管道、事件),我不知道该算法的最佳方法是什么。 我应该从哪里开始寻找multiprocess.Event()对于主进程和后台进程之间的信令似乎很有希望,但我也希望获得数据(结构化为dict

你推荐什么


Tags: 数据方法功能分析器进程过程编程方式
1条回答
网友
1楼 · 发布于 2024-05-16 22:13:43

这是一种使用multiprocessing.Queue的方法,您的“数据采集器”可以从子进程向其写入数据,而主进程将从中读取数据,以将任务提交给multiprocessing.Pool实例。在下面的演示中,我使用了^{}方法,它方便地允许我使用生成器函数指定表示要提交的任务的iterable,它允许我在这些任务可用时“延迟”提交,并在任务完成时处理返回值

出于演示目的,收集的“数据”只是整数,每次收集需要0.1秒,处理过程包括对这些整数进行平方运算,每次计算大约需要0.25秒

import multiprocessing
import time

QUARTER_SECOND_ITERATIONS = 5_000_000

def quarter_second():
    """ Burn up .25 seconds of CPU approximately on my desktop """
    sum = 0
    for _ in range(QUARTER_SECOND_ITERATIONS):
        sum += 1
    return sum


EOD = None # "end of data" indicator

def data_gatherer_worker(output_queue):
    """
    This runs in another process and "gathers" data.
    For demo purposes, we just generate some integers.
    """
    for i in range(20):
        time.sleep(.1) # simulate taking .1 seconds to do this
        output_queue.put(i)
    output_queue.put(EOD) # end of data indicator


def data_gatherer():
    """ Generator function to get next input data. """
    output_queue = multiprocessing.Queue()
    p = multiprocessing.Process(target=data_gatherer_worker, args=(output_queue,))
    p.start()
    while True:
        data = output_queue.get()
        if data is EOD:
            # No more data
            break
        yield data
    p.join() # wait for process to complete


def init():
    """ Perform initialization phase. """
    ...

def process(data):
    """
    This runs in another process and processes the data.
    For demo purposes, we just square the passed integer and return the data and result.
    """
    # simulate .25 seconds of processing:
    quarter_second()
    return data, data ** 2


def main():
    """ Main logic """
    init()
    n_processors = multiprocessing.cpu_count()
    # leave one processor free for data gathering
    with multiprocessing.Pool(n_processors - 1) as pool:
        # Submit new tasks as data becomes available
        # and get results as tasks complete:
        for result in pool.imap(process, data_gatherer()):
            # unpack:
            data, data_squared = result
            print(f'{data} ** 2 = {data_squared}')

# Required for Windows:
if __name__ == '__main__':
    t = time.time()
    main()
    print('Total time:', time.time() - t)

印刷品:

0 ** 2 = 0
1 ** 2 = 1
2 ** 2 = 4
3 ** 2 = 9
4 ** 2 = 16
5 ** 2 = 25
6 ** 2 = 36
7 ** 2 = 49
8 ** 2 = 64
9 ** 2 = 81
10 ** 2 = 100
11 ** 2 = 121
12 ** 2 = 144
13 ** 2 = 169
14 ** 2 = 196
15 ** 2 = 225
16 ** 2 = 256
17 ** 2 = 289
18 ** 2 = 324
19 ** 2 = 361
Total time: 2.636831045150757

相关问题 更多 >