在Python中使用多进程时,增加消费者与生产者的比例并不能让它们跟上工作进度

1 投票
1 回答
70 浏览
提问于 2025-04-14 18:03

我想做的事情是:生产者生成一个值(基本上是反复进行哈希计算),然后把这个值映射到“箱子”里。生产者会把每个数字放入两个箱子中。消费者则负责处理一部分箱子的地址范围。生产者把随机生成的数字放到一个工作队列中,供负责相应地址范围的消费者使用(对这两个箱子都这样做)。消费者从自己的队列中取出任务,进行一些计算,然后继续下一个任务。

我的问题在于,我在测试时想确定生产者和消费者的比例,以防止队列不断增长。我发现生产者和消费者的比例为1:7时,队列不会增长。但当比例变成2:14时,队列就开始增长了。实际上,似乎没有一个合理的比例能让消费者完成足够的工作,从而阻止队列里的项目数量线性增长。我发现增加消费者可以减缓队列的增长,但从未能完全停止。现在,我并不指望能简单地把生产者数量翻倍,消费者也翻倍,但我希望能找到一个合理的比例,而不是2对几百。为了说明,我是在云端运行这段代码,因此可以确保每个进程都有一个独立的CPU来运行。

这是我创建进程和分配给它们的函数的方式。我使用了多进程库来创建和管理进程及队列。

import hashlib
import math
import sys
import time
from multiprocessing import Process, JoinableQueue, cpu_count
from gmpy2 import mpz, powmod, is_prime, bit_set
from random import randint, getrandbits
from secrets import token_urlsafe
from queue import Empty

class Blake2b(object):
    def __init__(self, output_bit_length=64, salt=None, optimized64=False):
        self.output_bit_length = output_bit_length

        digest_size = math.ceil(output_bit_length / 8) # convert to bytes
        if salt is None:
            self.hash = hashlib.blake2b(digest_size=digest_size)
        else:
            self.hash = hashlib.blake2b(digest_size=digest_size, salt=salt)
        self.output_modulus = mpz(2**(self.output_bit_length - 1)) # integer double the size of the bit length - 1 (so double -2)

    def generating_value(self, preimage): 
        h = self.hash.copy() 
        h.update(preimage)  
        current_digest = h.digest()

        while True:
            U = int.from_bytes(current_digest, sys.byteorder) % self.output_modulus 
            candidate = bit_set(self.output_modulus + U, 0)

            if is_prime(candidate):
                return candidate

            h.update(b'0') 
            current_digest = h.digest()

    def KeyGen(self, preimage, size):
        h = self.hash.copy()
        h.update(preimage)
        digest = h.hexdigest()
        return int(digest, 16) % size

def task(bins, task_queue, ident, f):
    while True:
        try:
            next_task = task_queue.get(timeout=0.5)
        except Empty:
            print('Consumer: gave up waiting...', file=f, flush=True)
            continue
        if next_task is None:
            # Poison pill tells it to shutdown
            print (f'Consumer {ident}: Exiting', flush=True, file=f)
            task_queue.task_done()
            break
        
        value, in_bin = next_task
        N, A = bins[in_bin]
        answer = powmod(A, value, N)
        bins[in_bin][1] = answer

        task_queue.task_done()
    return

def add_to_queue(tableSize, num_consumers, task_queues, f):

    bit_length = 86 #The security parameter
    b = Blake2b(output_bit_length=bit_length)
    leftKey = Blake2b(output_bit_length=10, salt=b'left')
    rightKey = Blake2b(output_bit_length=10, salt=b'right')
    bins_per_process = tableSize // num_consumers

    while True:
        rnd_val = b.generating_value(token_urlsafe(16).encode('utf-8'))
        KeyLeft = leftKey.KeyGen(str(rnd_val).encode('utf-8'), tableSize)
        KeyRight = rightKey.KeyGen(str(rnd_val).encode('utf-8'), tableSize)
        
        task_queues[KeyLeft//bins_per_process].put([rnd_val, KeyLeft % bins_per_process]) 
        task_queues[KeyRight//bins_per_process].put([rnd_val, KeyRight % bins_per_process]) 
    return

if __name__ == '__main__':
    num_cpus = cpu_count()
    num_consumers = 16
    num_producers = 2
    table_size = num_consumers*8 #how many bins total

    with open("qGrowthTests.txt", "a") as f:
        table = [[getrandbits(3072), randint(2, 2**3072 - 1)] for _ in range(table_size)]
        bins_lst = [table[(i * len(table)) // num_consumers:((i + 1) * len(table)) // num_consumers] for i in range(num_consumers)]

        task_queues = [JoinableQueue() for _ in range(num_consumers)]

        producers = []
        for i in range(num_producers):
            producer = Process(target=add_to_queue, args=(table_size, num_consumers, task_queues, f))
            producers.append(producer)
            producer.start()

        consumers = []
        for queue_num, queue in enumerate(task_queues, start=0):
            process = Process(target=task, args=(bins_lst[queue_num], queue, queue_num, f))
            consumers.append(process)
            process.start()

        # checking the size of the queues once every 5 seconds for a 5 minute test
        n = 1
        while n < 60: # 5 minutes = 300 seconds, 300/5=60
            time.sleep(5)
            for q_num, q in enumerate(task_queues, start=0):
                print("Queue ", q_num, " has: ", q.qsize(), file=f)
            n += 1

        # Force all the processes to finish
        for process in producers:
            if process.is_alive():
                print (f'Producer {process}: Exiting', flush=True, file=f)
                process.terminate()
                process.join()

        for process in consumers:
            if process.is_alive():
                print (f'Consumer {process}: Exiting', flush=True, file=f)
                process.terminate()
                process.join()
    
    print('Test Complete')

当我测试完成一定工作量所需的时间时(给2个生产者分配一些任务,计时,然后让消费者处理队列),我发现当我把消费者的数量翻倍时,他们完成相同工作的时间减半(这是预期的结果)。例如,4个消费者完成的时间是2个消费者的一半,8个消费者的时间又是4个消费者的一半,依此类推。根据这个推算,我认为消费者的比例应该是2:16,才能和生产者生成任务的速度相匹配。然而,当我运行代码五分钟,让生产者尽可能多地完成任务,同时有16个或更多的消费者时,我仍然看到队列线性增长。例如,当有16个消费者时,所有队列的最终大小在450000到470000之间,数量都差不多。

我查看了确保生产者给所有消费者分配均等的工作量。此外,我还确保队列中的项目确实被移除。我尝试使用并发未来的进程池和管理器来处理队列,虽然所有这些进程会同时完成,但我觉得这会增加额外的开销,因为这些进程完成相同数量的任务所需的时间更长。当我检查CPU负载时,大多数都在30%左右,而我上面提到的创建进程的方式则是100%。

我试图找出我的代码中可能存在的问题,但我看不出为什么2个生产者会产生比42个消费者还多的工作。我觉得可能是我对多进程工作原理的理解有些缺失,但我看不出具体是什么。有什么想法吗?

1 个回答

0

这个问题是因为 multiprocessing.Queue 的实现方式。它内部使用了一个单独的线程来交换元素。但是,你的生产者(add_to_queue)占用了全局解释器锁(GIL),这就阻碍了这个线程的工作,导致元素无法交换。

你可以在消费者那边确认这个问题,方法如下:

def task(bins, task_queue, ident):
    while True:
        try:
            next_task = task_queue.get_nowait()
        except Empty:
            print(f'Consumer: gave up waiting... {task_queue.qsize()=}, {task_queue.empty()=}', flush=True)
            continue
Consumer: gave up waiting... task_queue.qsize()=19185, task_queue.empty()=True

虽然 qsize 的值很大,但 get_nowait 却无法获取到任何元素。

最简单的解决办法是使用 multiprocessing.Manager().JoinableQueue()

from multiprocessing import Manager

...

    # task_queues = [JoinableQueue() for _ in range(num_consumers)]
    task_queues = [Manager().JoinableQueue() for _ in range(num_consumers)]

不过,使用这个管理器会比较慢。如果你更看重性能,就需要比较一下吞吐量,看看这个差异是否能接受。

另外一个解决方案是限制队列的大小:

    task_queues = [JoinableQueue(maxsize=1000) for _ in range(num_consumers)]

当队列满的时候,全局解释器锁会被释放,这样生产者就能保持一个稳定的生产速度。不过,使用这种方法时,队列总是满的,所以你现在通过队列大小来判断消费者的比例就不太实际了。

在这种情况下,比较处理一百万个元素所需的时间会更好。如果消费者的数量足够多,进一步增加消费者的数量也不会改变处理时间。

撰写回答