取消多处理池带SIGINT

2024-04-26 13:18:15 发布

您现在位置:Python中文网/ 问答频道 /正文

我的目标是并行计算多个数据点,并将它们异步写入IO设备。数据点不能单独编写,而是需要包装;更准确地说,我正在编写一个JSON数组,结构周围的[]符号是必需的。你知道吗

我目前的方法是使用multiprocessing.Pool并应用异步来计算数据点。然后使用回调函数将数据点发送到multiprocessing.Queue,同时一个单独的线程从队列中同步提取元素并将它们写入IO设备。你知道吗

可能我需要用SIGINT来取消这个过程。在这种情况下,我希望计算安全地完成,即停止所有Pool进程的计算,但完成对队列中所有剩余元素和]符号的写入。你知道吗

到目前为止,我还没有找到解决问题的有效方法。目前,我的两个问题是:

  • 有时,这个过程不会退出。调用了SIGINT_handler,但进程没有终止。我无法验证这是从何而来,但我假设这可能是队列中的死锁?但我不知道怎么防止。你知道吗
  • 据我所知,pool.terminate()向其所有子进程发送SIGTERM。很明显,这会导致每一个键盘都出现一个键盘中断异常,使终端混乱不堪,有十几个堆栈跟踪。你知道吗

我的代码可以在下面找到。你知道吗

# Initialize the worker pool and necessary variables.
pool = multiprocessing.Pool(os.cpu_count() - 1)
data_queue = multiprocessing.Queue()
counter_lock = threading.Lock()
threads_todo = args.autnum

# This function is executed after each successful experiment.
def apply_finished(data):
    data_queue.put(data)
    with counter_lock:
        nonlocal threads_todo
        threads_todo -= 1

# Start the pool.
for i in range(args.autnum):
    pool.apply_async(collect_data, (args,), callback=apply_finished)
pool.close()

# This function is called if SIGINT is send to this process.
def SIGINT_handler(sig, frame):
    sys.stderr.write("SIGINT received. Cancelling...")
    sys.stderr.flush()
    pool.terminate()
    with counter_lock:
        nonlocal threads_todo
        threads_todo = 0
signal.signal(signal.SIGINT, SIGINT_handler)

# Write the data to stdout until all workers terminate or a SIGINT is received.
sys.stdout.write("[\n")
while threads_todo > 0 or not data_queue.empty():
    try:
        data = data_queue.get(True, 1)
        s = data.decode('utf-8')
        sys.stdout.write(s)
        sys.stdout.flush()
    except queue.Empty:
        data = None
sys.stdout.write("]")

Tags: 数据data队列queue进程isstdoutsys