我在翻阅我不久前写的一些代码,它有如下结构。我试着在需要的地方添加解释性意见。你知道吗
# Create a reader and a writer process
reader_proc = Process(target=self.reader)
reader_proc.start()
writer_proc = Process(target=self.writer, args=(pfin,))
writer_proc.start()
# start a pool of workers
with Pool(n_workers, maxtasksperchild=max_tasks_per_child) as pool:
# a list to keep track of workers
worker_jobs = []
# a list to keep track of return values
return_vals = []
# get input chunks from the reader
# reader writes input chunks to a work_q (queue)
while True:
work = work_q.get()
if work == 'done':
break
# process_chunk is a function that ... processes the given chunk
# this function will do some computations and write those to a results_q (queue)
# which the writer will then write to a file
# the function also returns another type of value that is processed below
job = pool.apply_async(process_chunk, (work,))
worker_jobs.append(job)
print('Done reading chunks!')
# reader is done reading
reader_proc.join()
reader_proc.terminate()
# When a worker has finished its job, get its information back
for job_idx, job in enumerate(worker_jobs, 1):
print(f"Processing job {job_idx}")
res1, res2 = job.get()
return_vals.append((res1, res2))
# process results in main process
process_results(return_vals)
# Notify the writer that we're done
results_q.put('done')
dr池使用apply_async
处理队列中的块。队列耗尽后,我们.get()
返回结果并处理它们。你知道吗
我不确定应用到池时是否立即执行作业,或者是否等待调用.get()
?这一点很重要,因为如果它们等待执行直到队列耗尽,那么对于长队列来说,这可能需要很长时间。你知道吗
另一方面,如果它们不等待并立即执行,那么这些函数的结果存储在哪里?由于我们要等到.get()
才能获取结果,这是否意味着在调用.get()
之前子进程会被阻塞?你知道吗
我问这个问题的原因是因为在第一个打印语句(reading done)和后面的语句(processing job x)之间有很长的延迟,我不知道为什么。你知道吗
工作人员一有空就执行任务。得到结果或根本得不到结果都不会对此产生影响。你知道吗
您的辅助程序结果存储在
AsyncResult
对象中,在您的情况下,job
就是其中之一,worker_jobs
拥有所有这些对象。然后你做正确的事情,循环你的结果对象,得到结果。你知道吗池在内部存储结果,直到您得到它们为止—即使您根本没有得到结果,它也不会阻止工作进程—在许多并行处理的情况下,如果工作进程只是基于输入执行特定任务,您甚至可能对工作进程的“结果”不感兴趣。一旦工作进程完成并存储结果(或异常!)在这个对象中,可以自由地接受池中的另一个作业。你知道吗
这也意味着您必须在关闭池之前获得结果-就像您现在所做的那样。如果将“processing job”循环移到
with Pool...
结构之外,那么在尝试获取结果时,结果就会丢失。你知道吗有关
AsyncResult
对象的可用方法,请参见https://docs.python.org/3.4/library/multiprocessing.html?highlight=process。你知道吗如果工作进程引发异常,
AsyncResult
对象还存储异常。它不会在工作进程遇到异常时立即触发,而只是存储在那里并在您获得()结果时引发。如果您的worker可以引发异常,那么您应该为get循环而不是worker构建异常处理。你知道吗相关问题 更多 >
编程相关推荐