我有一个生产者/消费者系统,使用Process
和Queue
来自multiprocessing
。进程捕获KeyboardInterrupt
,并成功返回最终结果字典。但是,进程不断发送此错误消息:
Traceback (most recent call last):
File "/home/ubuntu/anaconda3/envs/nlp/lib/python3.7/multiprocessing/queues.py", line 242, in _feed
send_bytes(obj)
File "/home/ubuntu/anaconda3/envs/nlp/lib/python3.7/multiprocessing/connection.py", line 200, in send_bytes
self._send_bytes(m[offset:offset + size])
File "/home/ubuntu/anaconda3/envs/nlp/lib/python3.7/multiprocessing/connection.py", line 404, in _send_bytes
self._send(header + buf)
File "/home/ubuntu/anaconda3/envs/nlp/lib/python3.7/multiprocessing/connection.py", line 368, in _send
n = write(self._handle, buf)
BrokenPipeError: [Errno 32] Broken pipe
这会告诉我某处有东西试图放在Queue
上,但没有成功
import signal
import sys
from multiprocessing import Process, Queue
from typing import Iterable
def do_work(task):
pass
def pull_worker(
worker_num, work_queue: Queue, result_queue: Queue
):
run = True
def signal_handle(_signal, frame):
nonlocal run
run = False
result_queue.put(None) # Signal to the consumer that the worker is finished
result_queue.close()
result_queue.join_thread()
work_queue.close()
work_queue.join_thread()
sys.exit(0)
signal.signal(signal.SIGINT, signal_handle)
signal.signal(signal.SIGTERM, signal_handle)
signal.signal(signal.SIGHUP, signal_handle)
while run:
try:
task = work_queue.get_nowait()
except work_queue.Empty:
break
else:
result = do_work(task)
if run:
result_queue.put((task, result))
result_queue.put(None) # Signal to the consumer that the worker is finished
def save_consumer(result_queue, final_result_queue, n_workers):
"""
Puts all results into the final result queue when all workers have finished
"""
n_finished_producers = 0
results = {}
def signal_handle(_signal, frame):
print("Caught keyboard interrupt in consumer")
signal.signal(signal.SIGINT, signal_handle)
signal.signal(signal.SIGTERM, signal_handle)
signal.signal(signal.SIGHUP, signal_handle)
while n_finished_producers < n_workers:
result = result_queue.get()
if result is None:
n_finished_producers += 1
continue
(task, task_result) = result
results[task] = task_result
final_result_queue.put(results)
final_result_queue.close()
final_result_queue.join_thread()
def main(
tasks: Iterable[str],
n_workers: int,
):
n_workers = min(len(tasks), n_workers)
# For tasks to be done
work_queue = Queue()
# For results as they are fetched from the workers
result_queue = Queue()
# For the final, single, result dictionary when workers are shut down
final_result_queue = Queue()
for task in tasks:
work_queue.put(task)
consumer = Process(
target=save_consumer, args=(result_queue, final_result_queue, n_workers)
)
consumer.start()
producers = []
for worker_num in range(n_workers):
proc = Process(
target=pull_worker,
args=(worker_num, work_queue, result_queue)
)
proc.start()
producers.append(proc)
try:
for proc in producers:
proc.join()
except (KeyboardInterrupt, BaseException, SystemExit) as e:
print(f"Caught interrupt in Main: {type(e)} {e}")
finally:
for proc in producers:
proc.join()
results = final_result_queue.get()
consumer.join()
return results
目前没有回答
相关问题 更多 >
编程相关推荐