Python多处理和处理worker中的异常

2024-05-15 11:25:11 发布

您现在位置:Python中文网/ 问答频道 /正文

我将python多处理库用于一个算法,在该算法中,我有许多工作人员处理某些数据并将结果返回给父进程。我使用multiprocessing.Queue将作业传递给工人,然后使用second收集结果。

在worker无法处理某些数据块之前,这一切都很正常。在下面的简化示例中,每个工人有两个阶段:

  • 初始化-可能会失败,在这种情况下,应销毁工作进程
  • 数据处理-处理数据块可能会失败,在这种情况下,工作进程应跳过此块并继续处理下一个数据。

当这两个阶段中的任何一个失败时,我会在脚本完成后出现死锁。此代码模拟我的问题:

import multiprocessing as mp
import random

workers_count = 5
# Probability of failure, change to simulate failures
fail_init_p = 0.2
fail_job_p = 0.3


#========= Worker =========
def do_work(job_state, arg):
    if random.random() < fail_job_p:
        raise Exception("Job failed")
    return "job %d processed %d" % (job_state, arg)

def init(args):
    if random.random() < fail_init_p:
        raise Exception("Worker init failed")
    return args

def worker_function(args, jobs_queue, result_queue):
    # INIT
    # What to do when init() fails?
    try:
        state = init(args)
    except:
        print "!Worker %d init fail" % args
        return
    # DO WORK
    # Process data in the jobs queue
    for job in iter(jobs_queue.get, None):
        try:
            # Can throw an exception!
            result = do_work(state, job)
            result_queue.put(result)
        except:
            print "!Job %d failed, skip..." % job
        finally:
            jobs_queue.task_done()
    # Telling that we are done with processing stop token
    jobs_queue.task_done()



#========= Parent =========
jobs = mp.JoinableQueue()
results = mp.Queue()
for i in range(workers_count):
    mp.Process(target=worker_function, args=(i, jobs, results)).start()

# Populate jobs queue
results_to_expect = 0
for j in range(30):
    jobs.put(j)
    results_to_expect += 1

# Collecting the results
# What if some workers failed to process the job and we have
# less results than expected
for r in range(results_to_expect):
    result = results.get()
    print result

#Signal all workers to finish
for i in range(workers_count):
    jobs.put(None)

#Wait for them to finish
jobs.join()

关于这个密码我有两个问题:

  1. init()失败时,如何检测该工作进程是否无效,而不是等待它完成?
  2. do_work()失败时,如何通知父进程在结果队列中应该得到更少的结果?

谢谢你的帮助!


Tags: to数据inforqueue进程initjobs
1条回答
网友
1楼 · 发布于 2024-05-15 11:25:11

我稍微修改了你的代码以使其工作(见下面的解释)。

import multiprocessing as mp
import random

workers_count = 5
# Probability of failure, change to simulate failures
fail_init_p = 0.5
fail_job_p = 0.4


#========= Worker =========
def do_work(job_state, arg):
    if random.random() < fail_job_p:
        raise Exception("Job failed")
    return "job %d processed %d" % (job_state, arg)

def init(args):
    if random.random() < fail_init_p:
        raise Exception("Worker init failed")
    return args

def worker_function(args, jobs_queue, result_queue):
    # INIT
    # What to do when init() fails?
    try:
        state = init(args)
    except:
        print "!Worker %d init fail" % args
        result_queue.put('init failed')
        return
    # DO WORK
    # Process data in the jobs queue
    for job in iter(jobs_queue.get, None):
        try:
            # Can throw an exception!
            result = do_work(state, job)
            result_queue.put(result)
        except:
            print "!Job %d failed, skip..." % job
            result_queue.put('job failed')


#========= Parent =========
jobs = mp.Queue()
results = mp.Queue()
for i in range(workers_count):
    mp.Process(target=worker_function, args=(i, jobs, results)).start()

# Populate jobs queue
results_to_expect = 0
for j in range(30):
    jobs.put(j)
    results_to_expect += 1

init_failures = 0
job_failures = 0
successes = 0
while job_failures + successes < 30 and init_failures < workers_count:
    result = results.get()
    init_failures += int(result == 'init failed')
    job_failures += int(result == 'job failed')
    successes += int(result != 'init failed' and result != 'job failed')
    #print init_failures, job_failures, successes

for ii in range(workers_count):
    jobs.put(None)

我的更改:

  1. jobs更改为普通的Queue(而不是JoinableQueue)。
  2. 工人现在将特殊结果字符串“init failed”和“job failed”传回。
  3. 只要特定条件有效,主过程监控上述特殊结果。
  4. 最后,不管你有多少工人,都要提出“停止”请求(即None工作)。请注意,并非所有这些都可以从队列中提取(以防工作进程未能初始化)。

顺便说一句,你的原始代码很好用,也很容易使用。随机概率位很酷。

相关问题 更多 >