如何加速多进程队列的同时读写?

2 投票
1 回答
3010 浏览
提问于 2025-04-18 08:04

简而言之 - 有没有办法提高同时读写多进程队列的速度?

我有一个处理审计数据的应用程序。可以把它想象成一个系统日志中继。它接收数据,解析这些数据,然后将事件继续发送出去。事件的处理速度可能很高——我希望能达到每秒处理15,000个事件(EPS)。

in_queue = multiprocessing.Queue()

out_queue = multiprocessing.Queue()

  • ReaderProc - 一个进程,负责从套接字读取数据,并使用 in_queue.put() 将数据放入 in_queue 中。
  • ParserProcs - 多个进程,使用 in_queue.get() 从队列中获取数据,处理这些数据,然后将处理结果放入 out_queue,使用 out_queue.put()
  • WriterProc - 一个进程,使用 out_queue.get()out_queue 中读取数据,并通过 TCP 套接字连接将数据发送出去。

我用队列进行了测试——我可以以每秒25,000个事件的速度放入或提取事件。但是,当多个解析进程(4个)从队列中提取数据时,速度就会下降,降到每秒不到10,000个事件。我猜测是底层的管道、锁等造成了这个延迟。

我查了一下管道,发现它似乎只支持两个端点。我需要将 CPU 密集型的解析任务分配给多个进程。像多进程内存共享这样的替代方法能否获得更好的结果?我该如何提高队列中 .put().get() 操作的同时进行效率?

1 个回答

3

根据你的性能需求,我觉得使用像 ZeroMQRabbitMQ 这样的第三方消息中间件会更好。我找到了一些对比测试,虽然这些测试不完全符合你的情况,但可以看看多处理的效果。性能差异非常明显:

multiprocessing.Queue 的结果

1
2
3

python2 ./multiproc_with_queue.py
Duration: 164.182257891
Messages Per Second: 60907.9210414

0mq 的结果

1
2
3

python2 ./multiproc_with_zeromq.py
Duration: 23.3490710258
Messages Per Second: 428282.563744

我进行了这两个测试,并提供了一个更复杂的工作负载,因为 multiprocessing.Queue 的一个好处是它可以为你处理序列化。以下是新的脚本:

mult_queue.py

import sys
import time
from  multiprocessing import Process, Queue

def worker(q):
    for task_nbr in range(1000000):
        message = q.get()
    sys.exit(1)

def main():
    send_q = Queue()
    Process(target=worker, args=(send_q,)).start()
    msg = {
            'something': "More",
            "another": "thing",
            "what?": range(200),
            "ok": ['asdf', 'asdf', 'asdf']
            }
    for num in range(1000000):
        send_q.put(msg)

if __name__ == "__main__":
    start_time = time.time()
    main()
    end_time = time.time()
    duration = end_time - start_time
    msg_per_sec = 1000000 / duration

    print "Duration: %s" % duration
    print "Messages Per Second: %s" % msg_per_sec

multi_zmq.py

import sys
import zmq
from  multiprocessing import Process
import time
import json
import cPickle as pickle

def worker():
    context = zmq.Context()
    work_receiver = context.socket(zmq.PULL)
    work_receiver.connect("tcp://127.0.0.1:5557")

    for task_nbr in range(1000000):
        message = work_receiver.recv_pyobj()

    sys.exit(1)

def main():
    Process(target=worker, args=()).start()
    context = zmq.Context()
    ventilator_send = context.socket(zmq.PUSH)
    ventilator_send.bind("tcp://127.0.0.1:5557")
    msg = {
            'something': "More",
            "another": "thing",
            "what?": range(200),
            "ok": ['asdf', 'asdf', 'asdf']
            }
    for num in range(1000000):
        ventilator_send.send_pyobj(msg)

if __name__ == "__main__":
    start_time = time.time()
    main()
    end_time = time.time()
    duration = end_time - start_time
    msg_per_sec = 1000000 / duration

    print "Duration: %s" % duration
    print "Messages Per Second: %s" % msg_per_sec

输出结果:

dan@dan:~$ ./mult_zmq.py 
Duration: 14.0204648972
Messages Per Second: 71324.3110935
dan@dan:~$ ./mult_queue.py 
Duration: 27.2135331631
Messages Per Second: 36746.4229657

撰写回答