并行编程:进程同步

1 投票
1 回答
65 浏览
提问于 2025-04-13 00:39

我有一个程序,里面有很多音乐播放的部分(比如:deck 1、deck 2、music_clip_deck、speakers_deck、ip_call_1、ip_call_2、ip_call_3)。每个部分都是在不同的进程中运行的。我用来处理mp3文件、重新传输流、麦克风的声音和aiortc-pyav的声音的时间片是125毫秒。之后,我会填充一些队列(每个进程一个队列),然后把最终的队列发送到最后一个线程,进行最终的音频处理,最后再传给客户。

我该如何让所有的进程同步运行,使得每个进程的运行时间都正好是125毫秒呢?

这里有一张图可以帮助理解:

enter image description here

这种方法可能完全没有帮助:

class Deck_1_Proc(Process):
...
...
...
    def run(self):
        while(True):
            t1 = time.time()
            ...
            ...
            ...
            t2 = time.time()
            if t2 - t1 < 0.125:
                time.sleep(0.125 - (t2 - t1))

也许更好的方法是使用类似于javascript的setInterval,设置时间参数为125毫秒。

from threading import Event, Thread

def call_repeatedly(interval, func, *args):
    stopped = Event()
    def loop():
        while not stopped.wait(interval): # the first call is in `interval` secs
            func(*args)
    Thread(target=loop).start()    
    return stopped.set

#call:
cancel_future_calls = call_repeatedly(0.125, run)
#stopping to app termination:
cancel_future_calls()

1 个回答

2

主要问题是大多数定时器都会出现漂移,也就是说它们的计时不太准确。而且,sleep 也不准确,甚至连 QTimer 也不例外。所以,如果你想要一个稳定的定时器(比如第100次滴答声接近12.5秒),你就得采取一些特别的措施。

import time
from multiprocessing import Condition
def infinite_heartbreat(cv: Condition):
    next_beat = time.time()
    while True:
        next_beat += 0.125
        time_to_sleep = next_beat - time.time()
        if time_to_sleep > 0:
            time.sleep(time_to_sleep)
        with cv:
            cv.notify_all()

你可以很容易地让所有进程在同一时间醒来,使用一种叫做 条件变量。不过,如果其中一个进程慢了几毫秒,你可能需要用到 multiprocessing.Value,这样可以确保它们只有在没有落后的情况下才会等待,具体做法如下:

import threading
import time
from multiprocessing import Condition, Value, Process, Event
def infinite_heartbreat(cv: Condition, frame: Value, quit_event: Event):
    next_beat = time.time()
    while True:
        next_beat += 0.125
        time_to_sleep = next_beat - time.time()
        if time_to_sleep > 0:
            time.sleep(time_to_sleep)
        with cv:
            frame.value += 1
            cv.notify_all()
            if quit_event.is_set():
                return

def worker(cv, frame_number, worker_id, quit_event: Event):
    current_frame = 0
    while True:
        with cv:
            cv.wait_for(lambda: current_frame <= frame_number.value)
            if quit_event.is_set():
                return
        print(f"processed frame {current_frame} in worker {worker_id}")
        current_frame += 1

if __name__ == "__main__":
    condition = Condition()
    frame = Value('q', lock=False)
    quit_event = Event()
    processes = []
    for i in range(4):
        process = Process(target=worker, args=(condition, frame, i, quit_event))
        process.start()
        processes.append(process)
    tr = threading.Thread(target=infinite_heartbreat, args=(condition,frame, quit_event))
    tr.start()
    time.sleep(5)
    quit_event.set()

补充说明:添加了一个 quit_event,它是以 multiprocessing.Event 的形式存在,因为它是原子的。

补充说明2:根据 @Booboo 的建议,把值改成了无锁的有符号 q,这样可以节省一个文件描述符,并且允许负帧数(因为 -1 也是有用的)。

撰写回答