到目前为止,每当我需要使用^{
例如:
from multiprocessing import Process, Queue
class MyClass:
def __init__(self, num_processes):
self._log = logging.getLogger()
self.process_list = []
self.work_queue = Queue()
for i in range(num_processes):
p_name = 'CPU_%02d' % (i+1)
self._log.info('Initializing process %s', p_name)
p = Process(target = do_stuff,
args = (self.work_queue, 'arg1'),
name = p_name)
这样我就可以向队列中添加子进程将要使用的内容。然后,我可以通过检查Queue.qsize()
来监视处理的距离:
while True:
qsize = self.work_queue.qsize()
if qsize == 0:
self._log.info('Processing finished')
break
else:
self._log.info('%d simulations still need to be calculated', qsize)
现在我认为^{
我不知道的是,我怎样才能监控还有多少“工作”要做。
举个例子:
from multiprocessing import Pool
class MyClass:
def __init__(self, num_processes):
self.process_pool = Pool(num_processes)
# ...
result_list = []
for i in range(1000):
result = self.process_pool.apply_async(do_stuff, ('arg1',))
result_list.append(result)
# ---> here: how do I monitor the Pool's processing progress?
# ...?
有什么想法吗?
我想出了下面的异步调用解决方案。
小玩具脚本的例子,但应该广泛应用我认为。
基本上,在无限循环中,在列表生成器中轮询结果对象的ready值,然后求和以计算剩余的已调度池任务数。
一旦没有剩余的break和join()&close()。
根据需要添加睡眠循环。
与上述解决方案相同的原理,但没有队列。如果还跟踪最初发送池的任务数,则可以计算完成百分比等。。。
使用
Manager
队列。这是在工作进程之间共享的队列。如果使用普通队列,则每个工作进程都会对其进行pickle和unpickle操作,并因此进行复制,这样每个工作进程就无法更新队列。然后让您的工作人员向队列中添加内容,并在工作人员工作时监视队列的状态。您需要使用
map_async
来执行此操作,因为这样可以看到整个结果何时就绪,从而可以中断监视循环。示例:
我也遇到过同样的问题,并为MapResult对象提出了一个简单的解决方案(尽管使用了内部的MapResult数据)
注意,剩余的值并不总是精确的,因为块大小通常是根据要处理的项的数量向上舍入的。
您可以使用
pool.map_async(get_stuff, todo, chunksize=1)
来循环此操作相关问题 更多 >
编程相关推荐