了解Python多处理应用程序

2024-04-26 17:59:58 发布

您现在位置:Python中文网/ 问答频道 /正文

我在翻阅我不久前写的一些代码,它有如下结构。我试着在需要的地方添加解释性意见。你知道吗

# Create a reader and a writer process
reader_proc = Process(target=self.reader)
reader_proc.start()

writer_proc = Process(target=self.writer, args=(pfin,))
writer_proc.start()

# start a pool of workers
with Pool(n_workers, maxtasksperchild=max_tasks_per_child) as pool:
    # a list to keep track of workers
    worker_jobs = []
    # a list to keep track of return values
    return_vals = []

    # get input chunks from the reader
    # reader writes input chunks to a work_q (queue)
    while True:
        work = work_q.get()
        if work == 'done':
            break

        # process_chunk is a function that ... processes the given chunk
        # this function will do some computations and write those to a results_q (queue)
        # which the writer will then write to a file
        # the function also returns another type of value that is processed below
        job = pool.apply_async(process_chunk, (work,))
        worker_jobs.append(job)

    print('Done reading chunks!')
    # reader is done reading
    reader_proc.join()
    reader_proc.terminate()

    # When a worker has finished its job, get its information back
    for job_idx, job in enumerate(worker_jobs, 1):
        print(f"Processing job {job_idx}")
        res1, res2 = job.get()
        return_vals.append((res1, res2))

    # process results in main process
    process_results(return_vals)

    # Notify the writer that we're done
    results_q.put('done')

dr池使用apply_async处理队列中的块。队列耗尽后,我们.get()返回结果并处理它们。你知道吗

我不确定应用到池时是否立即执行作业,或者是否等待调用.get()?这一点很重要,因为如果它们等待执行直到队列耗尽,那么对于长队列来说,这可能需要很长时间。你知道吗

另一方面,如果它们不等待并立即执行,那么这些函数的结果存储在哪里?由于我们要等到.get()才能获取结果,这是否意味着在调用.get()之前子进程会被阻塞?你知道吗

我问这个问题的原因是因为在第一个打印语句(reading done)和后面的语句(processing job x)之间有很长的延迟,我不知道为什么。你知道吗


Tags: ofthetogetreturn队列jobproc
1条回答
网友
1楼 · 发布于 2024-04-26 17:59:58

工作人员一有空就执行任务。得到结果或根本得不到结果都不会对此产生影响。你知道吗

您的辅助程序结果存储在AsyncResult对象中,在您的情况下,job就是其中之一,worker_jobs拥有所有这些对象。然后你做正确的事情,循环你的结果对象,得到结果。你知道吗

池在内部存储结果,直到您得到它们为止—即使您根本没有得到结果,它也不会阻止工作进程—在许多并行处理的情况下,如果工作进程只是基于输入执行特定任务,您甚至可能对工作进程的“结果”不感兴趣。一旦工作进程完成并存储结果(或异常!)在这个对象中,可以自由地接受池中的另一个作业。你知道吗

这也意味着您必须在关闭池之前获得结果-就像您现在所做的那样。如果将“processing job”循环移到with Pool...结构之外,那么在尝试获取结果时,结果就会丢失。你知道吗

有关AsyncResult对象的可用方法,请参见https://docs.python.org/3.4/library/multiprocessing.html?highlight=process。你知道吗

如果工作进程引发异常,AsyncResult对象还存储异常。它不会在工作进程遇到异常时立即触发,而只是存储在那里并在您获得()结果时引发。如果您的worker可以引发异常,那么您应该为get循环而不是worker构建异常处理。你知道吗

相关问题 更多 >