Python多进程队列:调用get()是瓶颈

1 投票
1 回答
40 浏览
提问于 2025-04-14 15:27

我在Python中创建了一个事件生成器。

我有三个模块(每个模块都是一个独立的Process):

  • 输入模块:这个模块会在不同的时间点被唤醒,然后把时间戳放入输入队列
  • 事件模块:这个模块从输入队列获取时间戳,生成带有该时间戳的事件,并将其发送到事件队列
  • 输出模块:这个模块从事件队列获取已生成的事件,并将其写入某个终端;

在高事件每秒(EPS)值的情况下,性能问题出现在事件模块上。

问题在于Queue.get()这个调用消耗了执行时间的大部分。输入队列是提前填充好的,所以在下面的例子中,输入队列始终可以用来执行获取操作。

这里有一些简化的代码和性能分析:


spent_on_render = 0.0
spent_on_putting = 0.0
spent_on_getting = 0.0
spent_summary = 0.0

events_batch = []
last_flush = perf_counter()

while True:
    start = perf_counter()

    if (
        len(events_batch) >= FLUSH_AFTER_SIZE
        or (perf_counter() - last_flush) > FLUSH_AFTER_SECONDS
    ):
        putting_time = perf_counter()

        for event in events_batch:
            event_queue.put(event)

        spent_on_putting += perf_counter() - putting_time

        events_batch.clear()
        last_flush = perf_counter()

    try:
        get_time = perf_counter()
        timestamp = input_queue.get(
            block=False,
            timeout=0
        )
        spent_on_getting += perf_counter() - get_time
    except Empty:
        spent_summary += perf_counter() - start
        continue

    render_time = perf_counter()
    events_batch.append(event_plugin.produce(timestamp=timestamp))
    spent_on_render += perf_counter() - render_time
    

    spent_summary += perf_counter() - start

一些执行时间的结果如下:

=================================
Spent on render: 12.241767558232823
Spent on putting: 0.326323675999447
Spent on getting: 22.14875863072848
Spent summary: 35.32571034637658

让我困惑的是,如果我只是循环从队列中获取,而不进行任何额外的操作,那么从队列中读取相同数量的时间戳所需的时间会减少很多。

那么在这种情况下,我该怎么做才能提高性能,或者我是否应该使用不同的进程间通信(IPC)方式呢?


更新:

这是一些额外的研究:

当我第一次读取所有输入队列的内容,然后将其放入下一个队列时:

timestamps = []
start = perf_counter()
qsize = input_queue.qsize()
for _ in range(qsize):
    timestamps.append(input_queue.get())

print('Size:', qsize)
print('Time spent after get:', perf_counter() - start)

for ts in timestamps:
    event_queue.put(ts)

print('Time spent after put:', perf_counter() - start)

输出是:

Size: 1000000
Time spent after get: 1.935170126002049
Time spent after put: 2.2695002569998906

当我在读取元素的同时,将其放入下一个队列时:

start = perf_counter()
qsize = input_queue.qsize()
for _ in range(qsize):
    ts = input_queue.get()
    event_queue.put(ts)

print('Size:', qsize)
print('Time spent after get and put:', perf_counter() - start)

输出是:

Size: 1000000
Time spent after get and put: 16.109829995999462

为什么这个差异这么大?

1 个回答

0

这个问题跟上下文切换有关。我把批处理的大小设置得更大,结果得到了想要的性能。

撰写回答