如何使用多处理.Pool.apply\u asyn

2024-04-27 00:32:31 发布

您现在位置:Python中文网/ 问答频道 /正文

我无法将日志记录到使用multprocess.Pool.apply\u异步. 我试图改编日志烹饪书中的this示例,但它只适用于multiprocessing.Process。将日志队列传递到apply_async似乎没有效果。 我想使用一个池,这样我可以轻松地管理并发线程的数量。在

以下改编的示例多处理过程对我来说还可以,只是我没有收到来自主进程的日志消息,而且我认为当我有100个大型作业时,它不会很好地工作。在

import logging
import logging.handlers
import numpy as np
import time
import multiprocessing
import pandas as pd
log_file = 'PATH_TO_FILE/log_file.log'

def listener_configurer():
    root = logging.getLogger()
    h = logging.FileHandler(log_file)
    f = logging.Formatter('%(asctime)s %(processName)-10s %(name)s %(levelname)-8s %(message)s')
    h.setFormatter(f)
    root.addHandler(h)

# This is the listener process top-level loop: wait for logging events
# (LogRecords)on the queue and handle them, quit when you get a None for a
# LogRecord.
def listener_process(queue, configurer):
    configurer()
    while True:
        try:
            record = queue.get()
            if record is None:  # We send this as a sentinel to tell the listener to quit.
                break
            logger = logging.getLogger(record.name)
            logger.handle(record)  # No level or filter logic applied - just do it!
        except Exception:
            import sys, traceback
            print('Whoops! Problem:', file=sys.stderr)
            traceback.print_exc(file=sys.stderr)


def worker_configurer(queue):
    h = logging.handlers.QueueHandler(queue)  # Just the one handler needed
    root = logging.getLogger()
    root.addHandler(h)
    # send all messages, for demo; no other level or filter logic applied.
    root.setLevel(logging.DEBUG)


# This is the worker process top-level loop, which just logs ten events with
# random intervening delays before terminating.
# The print messages are just so you know it's doing something!
def worker_function(sleep_time, name, queue, configurer):
    configurer(queue)
    start_message = 'Worker {} started and will now sleep for {}s'.format(name, sleep_time)
    logging.info(start_message)
    time.sleep(sleep_time)
    success_message = 'Worker {} has finished sleeping for {}s'.format(name, sleep_time)
    logging.info(success_message)

def main_with_process():
    start_time = time.time()
    single_thread_time = 0.
    queue = multiprocessing.Queue(-1)
    listener = multiprocessing.Process(target=listener_process,
                                       args=(queue, listener_configurer))
    listener.start()
    workers = []
    for i in range(10):
        name = str(i)
        sleep_time = np.random.randint(10) / 2
        single_thread_time += sleep_time
        worker = multiprocessing.Process(target=worker_function,
                                         args=(sleep_time, name, queue, worker_configurer))
        workers.append(worker)
        worker.start()
    for w in workers:
        w.join()
    queue.put_nowait(None)
    listener.join()
    end_time = time.time()
    final_message = "Script execution time was {}s, but single-thread time was {}s".format(
        (end_time - start_time),
        single_thread_time
    )
    print(final_message)

if __name__ == "__main__":
    main_with_process()

但我无法适应以下工作:

^{pr2}$

我试过很多细微的变化多处理管理器, 多处理队列, 多处理.get_logger,应用_异步.get(),但还没找到工作。在

我认为会有现成的解决方案。我应该试试芹菜吗?在

谢谢


Tags: nameimportmessagefortimequeueloggingsleep
2条回答

考虑使用两个队列。第一个队列是放置工人数据的位置。作业完成后,每个工人将结果推送到第二个队列。现在使用第二个队列将日志写入文件。在

这里实际上有两个相互交织的问题:

  • 不能将multiprocessing.Queue()对象作为参数传递给基于池的函数(可以直接将其传递给启动的工作进程,但不能像以前那样“进一步”)。在
  • 在将None发送到侦听器进程之前,必须等待所有异步工作进程完成。在

要修复第一个,请替换:

queue = multiprocessing.Queue(-1)

有:

^{pr2}$

作为管理器管理的Queue()实例可以通过。在

要修复第二个问题,请从每个异步调用收集每个结果,或者关闭池并等待它,例如:

pool.close()
pool.join()
queue.put_nowait(None)

或者更复杂的:

getters = []
for i, sleep_time in enumerate(job_list):
    name = str(i)
    getters.append(
        pool.apply_async(worker_function,
                     args=(sleep_time, name, queue, worker_configurer))
    )
while len(getters):
    getters.pop().get()
# optionally, close and join pool here (generally a good idea anyway)
queue.put_nowait(None)

(您还应该考虑将put_nowait替换为put的等待版本,而不是使用无限长的队列。)

相关问题 更多 >