如何在Python中实现自定义多进程连续(异步)控制?

1 投票
2 回答
1572 浏览
提问于 2025-04-29 07:52

我在我的selenium网格上有两种浏览器类型:火狐(40个实例)和谷歌浏览器(40个实例)。此外,我还有一堆测试,有些需要在火狐上运行,有些需要在谷歌浏览器上运行,还有一些对浏览器没有特别要求。

最初,@skrrgwasme建议的解决方案是把那些不需要特定浏览器的测试分成两组,最终形成两个队列(一个在火狐上执行,另一个在谷歌浏览器上执行):如何实现对python multiprocessing.Pool的自定义控制?

这个方案不错,但还有改进的空间:因为不同的浏览器处理请求的速度不同,谷歌浏览器的队列会完成得更快,而火狐的队列则会花更长时间。这可以通过自定义的持续控制来解决,也就是说,我决定使用哪个浏览器不是在创建池之前,而是在这些池已经启动之后。

所以,我们应该有一个池,里面每个进程的启动都由我们控制。

@skrrgwasme建议使用apply_async来实现这个功能。但我就是搞不懂在python中怎么做到这一点,因为它不像node.js那样是异步的。

你能分享一些例子吗?我对python的经验很少,似乎完全卡住了 :(

暂无标签

2 个回答

1

使用 pool.apply_async 可以理解为手动设置每一个调用,就像你之前提到的 map 一样。你只需要把所有的任务添加到池中,然后当新的工作进程可用时,它就会快速处理这些任务。你可以使用每个浏览器一个函数的方法,就像 skrrgwasme 提到的。下面的代码大部分是参考了他之前的回答:

from multiprocessing import Pool

params = [1,2,3,4,5 ... ]

def ff_func(param):
    # Do FireFox stuff

def ch_func(param):
    # Do Chrome stuff

pool = Pool(80)

# For each parameter, add two tasks to the pool--one FF, one Chrome.
for param in params:
    pool.apply_async(ff_func, param)
    pool.apply_async(ch_func, param)

pool.close()
pool.join()

这里发生的事情是,你为池子构建了一个大的异步任务队列。池子会按照它认为合适的顺序处理所有定义好的任务。

需要注意的是,与之前的回答不同,这里并没有保证每个浏览器的最大池大小为40,因为你要求我们更好地利用资源。利用最多80个进程的最佳方式是让它们尽可能一直在工作。

如果你在两种“类型”中都不能同时使用超过40个进程,那么你就无法在之前的两个池的方案上有所改进。在这种情况下,你的瓶颈就是这40个进程完成一个或另一个队列的速度。如果你不允许使用这些空闲的进程,那么来自更快队列的空闲进程就无法被利用了;-)

2

我觉得这里最简单的方法是让每个工作进程同时从两个 Queue 对象中获取任务。一个是针对特定浏览器的 Queue,另一个是共享的“通用” Queue。这样,你可以让40个进程从Chrome的 Queue 中获取任务,当这个队列的任务处理完后,再切换到通用的 Queue;同样的,40个进程也可以从Firefox的 Queue 中获取任务,处理完后再切换到通用的 Queue。下面是一个使用8个进程而不是80个的例子:

from multiprocessing import Pool, Manager
from Queue import Empty
import time

ff_tests = [1,2,3,4,5]
chrome_tests = [10, 11, 12, 13, 14, 15]
general_tests = [20, 21,22, 23,24,25]

def process_func(spec_queue, general_queue, browser):
    while True:
        try:
            test = spec_queue.get_nowait()
            print("Processing {} in {} process".format(test, browser))
            time.sleep(2)
        except Empty:
            break

    while True:
        try:
            test = general_queue.get_nowait()
            print("Processing {} in {} process".format(test, browser))
            time.sleep(2)
        except Empty:
            break


if __name__ == "__main__":
    m = Manager()
    ff_queue = m.Queue()
    chrome_queue = m.Queue()
    general_queue = m.Queue()

    for queue, tests in [(ff_queue, ff_tests), (chrome_queue, chrome_tests),
                         (general_queue, general_tests)]:
        for test in tests:
            queue.put(test)


    pool = Pool(8)
    for _ in range(4):
        pool.apply_async(process_func, args=(ff_queue, general_queue, "firefox"))
        pool.apply_async(process_func, args=(chrome_queue, general_queue, "chrome"))
    pool.close()
    pool.join()

输出结果:

Processing 1 in firefox process
Processing 10 in chrome process
Processing 2 in firefox process
Processing 11 in chrome process
Processing 3 in firefox process
Processing 12 in chrome process
Processing 4 in firefox process
Processing 13 in chrome process
Processing 5 in firefox process
Processing 14 in chrome process
Processing 20 in firefox process
Processing 15 in chrome process
Processing 21 in firefox process
Processing 22 in chrome process
Processing 23 in firefox process
Processing 24 in chrome process
Processing 25 in chrome process

如你所见,特定浏览器的队列在各自的浏览器进程中被处理完,然后这两种类型的进程一起合作处理通用队列的任务。

撰写回答