Python带工作进程的池

34 投票
3 回答
44054 浏览
提问于 2025-04-17 11:14

我正在尝试在Python中使用进程对象来创建一个工作池。每个工作者(也就是一个进程)需要进行一些初始化,这个过程需要花费不少时间。然后,它会接收一系列的任务(最好是用map()来处理),最后返回一些结果。在这个过程中,不需要进行其他的沟通。不过,我一直搞不清楚怎么用map()来调用我工作者的compute()函数。

from multiprocessing import Pool, Process

class Worker(Process):
    def __init__(self):
        print 'Worker started'
        # do some initialization here
        super(Worker, self).__init__()

    def compute(self, data):
        print 'Computing things!'
        return data * data

if __name__ == '__main__':
    # This works fine
    worker = Worker()
    print worker.compute(3)

    # workers get initialized fine
    pool = Pool(processes = 4,
                initializer = Worker)
    data = range(10)
    # How to use my worker pool?
    result = pool.map(compute, data)

那么,使用任务队列是不是更好的选择,还是我可以继续用map()呢?

3 个回答

2

从Python 3.3开始,你可以使用starmap这个功能,它可以让你同时处理多个参数,并且以非常简单的语法返回结果:

import multiprocessing

nb_cores = multiprocessing.cpu_count()

def caps(nb, letter):
    print('Exec nb:', nb)
    return letter.upper()

if __name__ == '__main__':

    multiprocessing.freeze_support() # for Windows, also requires to be in the statement: if __name__ == '__main__'

    input_data = ['a','b','c','d','e','f','g','h']
    input_order = [1,2,3,4,5,6,7,8,9]

    with multiprocessing.Pool(processes=nb_cores) as pool: # auto closing workers
        results = pool.starmap(caps, zip(input_order, input_data))

    print(results)
7

initializer 需要一个可以调用的函数,这个函数主要用来初始化一些东西,比如设置一些全局变量,而不是一个 Process 的子类;而 map 则可以接受任何可迭代的对象:

#!/usr/bin/env python
import multiprocessing as mp

def init(val):
    print('do some initialization here')

def compute(data):
    print('Computing things!')
    return data * data

def produce_data():
    yield -100
    for i in range(10):
        yield i
    yield 100

if __name__=="__main__":
  p = mp.Pool(initializer=init, initargs=('arg',))
  print(p.map(compute, produce_data()))
66

我建议你使用一个队列来处理这个问题。

class Worker(Process):
    def __init__(self, queue):
        super(Worker, self).__init__()
        self.queue = queue

    def run(self):
        print('Worker started')
        # do some initialization here

        print('Computing things!')
        for data in iter(self.queue.get, None):
            # Use data

这样你就可以启动很多这样的工作者,它们都从同一个队列里获取任务。

request_queue = Queue()
for i in range(4):
    Worker(request_queue).start()
for data in the_real_source:
    request_queue.put(data)
# Sentinel objects to allow clean shutdown: 1 per worker.
for i in range(4):
    request_queue.put(None) 

这样做可以让你把启动时的高成本分摊到多个工作者身上。

撰写回答