我的目标是并行计算多个数据点,并将它们异步写入IO设备。数据点不能单独编写,而是需要包装;更准确地说,我正在编写一个JSON数组,结构周围的[]
符号是必需的。你知道吗
我目前的方法是使用multiprocessing.Pool
并应用异步来计算数据点。然后使用回调函数将数据点发送到multiprocessing.Queue
,同时一个单独的线程从队列中同步提取元素并将它们写入IO设备。你知道吗
可能我需要用SIGINT来取消这个过程。在这种情况下,我希望计算安全地完成,即停止所有Pool
进程的计算,但完成对队列中所有剩余元素和]
符号的写入。你知道吗
到目前为止,我还没有找到解决问题的有效方法。目前,我的两个问题是:
SIGINT_handler
,但进程没有终止。我无法验证这是从何而来,但我假设这可能是队列中的死锁?但我不知道怎么防止。你知道吗pool.terminate()
向其所有子进程发送SIGTERM。很明显,这会导致每一个键盘都出现一个键盘中断异常,使终端混乱不堪,有十几个堆栈跟踪。你知道吗我的代码可以在下面找到。你知道吗
# Initialize the worker pool and necessary variables.
pool = multiprocessing.Pool(os.cpu_count() - 1)
data_queue = multiprocessing.Queue()
counter_lock = threading.Lock()
threads_todo = args.autnum
# This function is executed after each successful experiment.
def apply_finished(data):
data_queue.put(data)
with counter_lock:
nonlocal threads_todo
threads_todo -= 1
# Start the pool.
for i in range(args.autnum):
pool.apply_async(collect_data, (args,), callback=apply_finished)
pool.close()
# This function is called if SIGINT is send to this process.
def SIGINT_handler(sig, frame):
sys.stderr.write("SIGINT received. Cancelling...")
sys.stderr.flush()
pool.terminate()
with counter_lock:
nonlocal threads_todo
threads_todo = 0
signal.signal(signal.SIGINT, SIGINT_handler)
# Write the data to stdout until all workers terminate or a SIGINT is received.
sys.stdout.write("[\n")
while threads_todo > 0 or not data_queue.empty():
try:
data = data_queue.get(True, 1)
s = data.decode('utf-8')
sys.stdout.write(s)
sys.stdout.flush()
except queue.Empty:
data = None
sys.stdout.write("]")
目前没有回答
相关问题 更多 >
编程相关推荐