我将python多处理库用于一个算法,在该算法中,我有许多工作人员处理某些数据并将结果返回给父进程。我使用multiprocessing.Queue将作业传递给工人,然后使用second收集结果。
在worker无法处理某些数据块之前,这一切都很正常。在下面的简化示例中,每个工人有两个阶段:
当这两个阶段中的任何一个失败时,我会在脚本完成后出现死锁。此代码模拟我的问题:
import multiprocessing as mp
import random
workers_count = 5
# Probability of failure, change to simulate failures
fail_init_p = 0.2
fail_job_p = 0.3
#========= Worker =========
def do_work(job_state, arg):
if random.random() < fail_job_p:
raise Exception("Job failed")
return "job %d processed %d" % (job_state, arg)
def init(args):
if random.random() < fail_init_p:
raise Exception("Worker init failed")
return args
def worker_function(args, jobs_queue, result_queue):
# INIT
# What to do when init() fails?
try:
state = init(args)
except:
print "!Worker %d init fail" % args
return
# DO WORK
# Process data in the jobs queue
for job in iter(jobs_queue.get, None):
try:
# Can throw an exception!
result = do_work(state, job)
result_queue.put(result)
except:
print "!Job %d failed, skip..." % job
finally:
jobs_queue.task_done()
# Telling that we are done with processing stop token
jobs_queue.task_done()
#========= Parent =========
jobs = mp.JoinableQueue()
results = mp.Queue()
for i in range(workers_count):
mp.Process(target=worker_function, args=(i, jobs, results)).start()
# Populate jobs queue
results_to_expect = 0
for j in range(30):
jobs.put(j)
results_to_expect += 1
# Collecting the results
# What if some workers failed to process the job and we have
# less results than expected
for r in range(results_to_expect):
result = results.get()
print result
#Signal all workers to finish
for i in range(workers_count):
jobs.put(None)
#Wait for them to finish
jobs.join()
关于这个密码我有两个问题:
init()
失败时,如何检测该工作进程是否无效,而不是等待它完成?do_work()
失败时,如何通知父进程在结果队列中应该得到更少的结果?谢谢你的帮助!
我稍微修改了你的代码以使其工作(见下面的解释)。
我的更改:
jobs
更改为普通的Queue
(而不是JoinableQueue
)。None
工作)。请注意,并非所有这些都可以从队列中提取(以防工作进程未能初始化)。顺便说一句,你的原始代码很好用,也很容易使用。随机概率位很酷。
相关问题 更多 >
编程相关推荐