使用多个进程和队列时优雅地处理键盘中断

2024-06-16 11:09:27 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一个生产者/消费者系统,使用ProcessQueue来自multiprocessing。进程捕获KeyboardInterrupt,并成功返回最终结果字典。但是,进程不断发送此错误消息:

Traceback (most recent call last):
  File "/home/ubuntu/anaconda3/envs/nlp/lib/python3.7/multiprocessing/queues.py", line 242, in _feed
    send_bytes(obj)
  File "/home/ubuntu/anaconda3/envs/nlp/lib/python3.7/multiprocessing/connection.py", line 200, in send_bytes
    self._send_bytes(m[offset:offset + size])
  File "/home/ubuntu/anaconda3/envs/nlp/lib/python3.7/multiprocessing/connection.py", line 404, in _send_bytes
    self._send(header + buf)
  File "/home/ubuntu/anaconda3/envs/nlp/lib/python3.7/multiprocessing/connection.py", line 368, in _send
    n = write(self._handle, buf)
BrokenPipeError: [Errno 32] Broken pipe

这会告诉我某处有东西试图放在Queue上,但没有成功

import signal
import sys
from multiprocessing import Process, Queue
from typing import Iterable


def do_work(task):
    pass


def pull_worker(
    worker_num, work_queue: Queue, result_queue: Queue
):
    run = True

    def signal_handle(_signal, frame):
        nonlocal run
        run = False
        result_queue.put(None)  # Signal to the consumer that the worker is finished
        result_queue.close()
        result_queue.join_thread()
        work_queue.close()
        work_queue.join_thread()
        sys.exit(0)

    signal.signal(signal.SIGINT, signal_handle)
    signal.signal(signal.SIGTERM, signal_handle)
    signal.signal(signal.SIGHUP, signal_handle)

    while run:
        try:
            task = work_queue.get_nowait()
        except work_queue.Empty:
            break
        else:
            result = do_work(task)
            if run:
                result_queue.put((task, result))
    result_queue.put(None)  # Signal to the consumer that the worker is finished


def save_consumer(result_queue, final_result_queue, n_workers):
    """
    Puts all results into the final result queue when all workers have finished
    """
    n_finished_producers = 0
    results = {}

    def signal_handle(_signal, frame):
        print("Caught keyboard interrupt in consumer")

    signal.signal(signal.SIGINT, signal_handle)
    signal.signal(signal.SIGTERM, signal_handle)
    signal.signal(signal.SIGHUP, signal_handle)

    while n_finished_producers < n_workers:
        result = result_queue.get()
        if result is None:
            n_finished_producers += 1
            continue
        (task, task_result) = result
        results[task] = task_result

    final_result_queue.put(results)
    final_result_queue.close()
    final_result_queue.join_thread()


def main(
    tasks: Iterable[str],
    n_workers: int,
):
    n_workers = min(len(tasks), n_workers)

    # For tasks to be done
    work_queue = Queue()
    # For results as they are fetched from the workers
    result_queue = Queue()
    # For the final, single, result dictionary when workers are shut down
    final_result_queue = Queue()

    for task in tasks:
        work_queue.put(task)

    consumer = Process(
        target=save_consumer, args=(result_queue, final_result_queue, n_workers)
    )
    consumer.start()

    producers = []
    for worker_num in range(n_workers):
        proc = Process(
            target=pull_worker,
            args=(worker_num, work_queue, result_queue)
        )
        proc.start()
        producers.append(proc)

    try:
        for proc in producers:
            proc.join()
    except (KeyboardInterrupt, BaseException, SystemExit) as e:
        print(f"Caught interrupt in Main: {type(e)} {e}")
    finally:
        for proc in producers:
            proc.join()
        results = final_result_queue.get()
        consumer.join()
        return results

Tags: theintasksignalqueueconsumerresultresults