在Python中使用多进程时,增加消费者与生产者的比例并不能让它们跟上工作进度
我想做的事情是:生产者生成一个值(基本上是反复进行哈希计算),然后把这个值映射到“箱子”里。生产者会把每个数字放入两个箱子中。消费者则负责处理一部分箱子的地址范围。生产者把随机生成的数字放到一个工作队列中,供负责相应地址范围的消费者使用(对这两个箱子都这样做)。消费者从自己的队列中取出任务,进行一些计算,然后继续下一个任务。
我的问题在于,我在测试时想确定生产者和消费者的比例,以防止队列不断增长。我发现生产者和消费者的比例为1:7时,队列不会增长。但当比例变成2:14时,队列就开始增长了。实际上,似乎没有一个合理的比例能让消费者完成足够的工作,从而阻止队列里的项目数量线性增长。我发现增加消费者可以减缓队列的增长,但从未能完全停止。现在,我并不指望能简单地把生产者数量翻倍,消费者也翻倍,但我希望能找到一个合理的比例,而不是2对几百。为了说明,我是在云端运行这段代码,因此可以确保每个进程都有一个独立的CPU来运行。
这是我创建进程和分配给它们的函数的方式。我使用了多进程库来创建和管理进程及队列。
import hashlib
import math
import sys
import time
from multiprocessing import Process, JoinableQueue, cpu_count
from gmpy2 import mpz, powmod, is_prime, bit_set
from random import randint, getrandbits
from secrets import token_urlsafe
from queue import Empty
class Blake2b(object):
def __init__(self, output_bit_length=64, salt=None, optimized64=False):
self.output_bit_length = output_bit_length
digest_size = math.ceil(output_bit_length / 8) # convert to bytes
if salt is None:
self.hash = hashlib.blake2b(digest_size=digest_size)
else:
self.hash = hashlib.blake2b(digest_size=digest_size, salt=salt)
self.output_modulus = mpz(2**(self.output_bit_length - 1)) # integer double the size of the bit length - 1 (so double -2)
def generating_value(self, preimage):
h = self.hash.copy()
h.update(preimage)
current_digest = h.digest()
while True:
U = int.from_bytes(current_digest, sys.byteorder) % self.output_modulus
candidate = bit_set(self.output_modulus + U, 0)
if is_prime(candidate):
return candidate
h.update(b'0')
current_digest = h.digest()
def KeyGen(self, preimage, size):
h = self.hash.copy()
h.update(preimage)
digest = h.hexdigest()
return int(digest, 16) % size
def task(bins, task_queue, ident, f):
while True:
try:
next_task = task_queue.get(timeout=0.5)
except Empty:
print('Consumer: gave up waiting...', file=f, flush=True)
continue
if next_task is None:
# Poison pill tells it to shutdown
print (f'Consumer {ident}: Exiting', flush=True, file=f)
task_queue.task_done()
break
value, in_bin = next_task
N, A = bins[in_bin]
answer = powmod(A, value, N)
bins[in_bin][1] = answer
task_queue.task_done()
return
def add_to_queue(tableSize, num_consumers, task_queues, f):
bit_length = 86 #The security parameter
b = Blake2b(output_bit_length=bit_length)
leftKey = Blake2b(output_bit_length=10, salt=b'left')
rightKey = Blake2b(output_bit_length=10, salt=b'right')
bins_per_process = tableSize // num_consumers
while True:
rnd_val = b.generating_value(token_urlsafe(16).encode('utf-8'))
KeyLeft = leftKey.KeyGen(str(rnd_val).encode('utf-8'), tableSize)
KeyRight = rightKey.KeyGen(str(rnd_val).encode('utf-8'), tableSize)
task_queues[KeyLeft//bins_per_process].put([rnd_val, KeyLeft % bins_per_process])
task_queues[KeyRight//bins_per_process].put([rnd_val, KeyRight % bins_per_process])
return
if __name__ == '__main__':
num_cpus = cpu_count()
num_consumers = 16
num_producers = 2
table_size = num_consumers*8 #how many bins total
with open("qGrowthTests.txt", "a") as f:
table = [[getrandbits(3072), randint(2, 2**3072 - 1)] for _ in range(table_size)]
bins_lst = [table[(i * len(table)) // num_consumers:((i + 1) * len(table)) // num_consumers] for i in range(num_consumers)]
task_queues = [JoinableQueue() for _ in range(num_consumers)]
producers = []
for i in range(num_producers):
producer = Process(target=add_to_queue, args=(table_size, num_consumers, task_queues, f))
producers.append(producer)
producer.start()
consumers = []
for queue_num, queue in enumerate(task_queues, start=0):
process = Process(target=task, args=(bins_lst[queue_num], queue, queue_num, f))
consumers.append(process)
process.start()
# checking the size of the queues once every 5 seconds for a 5 minute test
n = 1
while n < 60: # 5 minutes = 300 seconds, 300/5=60
time.sleep(5)
for q_num, q in enumerate(task_queues, start=0):
print("Queue ", q_num, " has: ", q.qsize(), file=f)
n += 1
# Force all the processes to finish
for process in producers:
if process.is_alive():
print (f'Producer {process}: Exiting', flush=True, file=f)
process.terminate()
process.join()
for process in consumers:
if process.is_alive():
print (f'Consumer {process}: Exiting', flush=True, file=f)
process.terminate()
process.join()
print('Test Complete')
当我测试完成一定工作量所需的时间时(给2个生产者分配一些任务,计时,然后让消费者处理队列),我发现当我把消费者的数量翻倍时,他们完成相同工作的时间减半(这是预期的结果)。例如,4个消费者完成的时间是2个消费者的一半,8个消费者的时间又是4个消费者的一半,依此类推。根据这个推算,我认为消费者的比例应该是2:16,才能和生产者生成任务的速度相匹配。然而,当我运行代码五分钟,让生产者尽可能多地完成任务,同时有16个或更多的消费者时,我仍然看到队列线性增长。例如,当有16个消费者时,所有队列的最终大小在450000到470000之间,数量都差不多。
我查看了确保生产者给所有消费者分配均等的工作量。此外,我还确保队列中的项目确实被移除。我尝试使用并发未来的进程池和管理器来处理队列,虽然所有这些进程会同时完成,但我觉得这会增加额外的开销,因为这些进程完成相同数量的任务所需的时间更长。当我检查CPU负载时,大多数都在30%左右,而我上面提到的创建进程的方式则是100%。
我试图找出我的代码中可能存在的问题,但我看不出为什么2个生产者会产生比42个消费者还多的工作。我觉得可能是我对多进程工作原理的理解有些缺失,但我看不出具体是什么。有什么想法吗?
1 个回答
这个问题是因为 multiprocessing.Queue
的实现方式。它内部使用了一个单独的线程来交换元素。但是,你的生产者(add_to_queue
)占用了全局解释器锁(GIL),这就阻碍了这个线程的工作,导致元素无法交换。
你可以在消费者那边确认这个问题,方法如下:
def task(bins, task_queue, ident):
while True:
try:
next_task = task_queue.get_nowait()
except Empty:
print(f'Consumer: gave up waiting... {task_queue.qsize()=}, {task_queue.empty()=}', flush=True)
continue
Consumer: gave up waiting... task_queue.qsize()=19185, task_queue.empty()=True
虽然 qsize
的值很大,但 get_nowait
却无法获取到任何元素。
最简单的解决办法是使用 multiprocessing.Manager().JoinableQueue()
:
from multiprocessing import Manager
...
# task_queues = [JoinableQueue() for _ in range(num_consumers)]
task_queues = [Manager().JoinableQueue() for _ in range(num_consumers)]
不过,使用这个管理器会比较慢。如果你更看重性能,就需要比较一下吞吐量,看看这个差异是否能接受。
另外一个解决方案是限制队列的大小:
task_queues = [JoinableQueue(maxsize=1000) for _ in range(num_consumers)]
当队列满的时候,全局解释器锁会被释放,这样生产者就能保持一个稳定的生产速度。不过,使用这种方法时,队列总是满的,所以你现在通过队列大小来判断消费者的比例就不太实际了。
在这种情况下,比较处理一百万个元素所需的时间会更好。如果消费者的数量足够多,进一步增加消费者的数量也不会改变处理时间。