并行编程:进程同步
我有一个程序,里面有很多音乐播放的部分(比如:deck 1、deck 2、music_clip_deck、speakers_deck、ip_call_1、ip_call_2、ip_call_3)。每个部分都是在不同的进程中运行的。我用来处理mp3文件、重新传输流、麦克风的声音和aiortc-pyav的声音的时间片是125毫秒。之后,我会填充一些队列(每个进程一个队列),然后把最终的队列发送到最后一个线程,进行最终的音频处理,最后再传给客户。
我该如何让所有的进程同步运行,使得每个进程的运行时间都正好是125毫秒呢?
这里有一张图可以帮助理解:
这种方法可能完全没有帮助:
class Deck_1_Proc(Process):
...
...
...
def run(self):
while(True):
t1 = time.time()
...
...
...
t2 = time.time()
if t2 - t1 < 0.125:
time.sleep(0.125 - (t2 - t1))
也许更好的方法是使用类似于javascript的setInterval,设置时间参数为125毫秒。
from threading import Event, Thread
def call_repeatedly(interval, func, *args):
stopped = Event()
def loop():
while not stopped.wait(interval): # the first call is in `interval` secs
func(*args)
Thread(target=loop).start()
return stopped.set
#call:
cancel_future_calls = call_repeatedly(0.125, run)
#stopping to app termination:
cancel_future_calls()
1 个回答
2
主要问题是大多数定时器都会出现漂移,也就是说它们的计时不太准确。而且,sleep
也不准确,甚至连 QTimer 也不例外。所以,如果你想要一个稳定的定时器(比如第100次滴答声接近12.5秒),你就得采取一些特别的措施。
import time
from multiprocessing import Condition
def infinite_heartbreat(cv: Condition):
next_beat = time.time()
while True:
next_beat += 0.125
time_to_sleep = next_beat - time.time()
if time_to_sleep > 0:
time.sleep(time_to_sleep)
with cv:
cv.notify_all()
你可以很容易地让所有进程在同一时间醒来,使用一种叫做 条件变量。不过,如果其中一个进程慢了几毫秒,你可能需要用到 multiprocessing.Value,这样可以确保它们只有在没有落后的情况下才会等待,具体做法如下:
import threading
import time
from multiprocessing import Condition, Value, Process, Event
def infinite_heartbreat(cv: Condition, frame: Value, quit_event: Event):
next_beat = time.time()
while True:
next_beat += 0.125
time_to_sleep = next_beat - time.time()
if time_to_sleep > 0:
time.sleep(time_to_sleep)
with cv:
frame.value += 1
cv.notify_all()
if quit_event.is_set():
return
def worker(cv, frame_number, worker_id, quit_event: Event):
current_frame = 0
while True:
with cv:
cv.wait_for(lambda: current_frame <= frame_number.value)
if quit_event.is_set():
return
print(f"processed frame {current_frame} in worker {worker_id}")
current_frame += 1
if __name__ == "__main__":
condition = Condition()
frame = Value('q', lock=False)
quit_event = Event()
processes = []
for i in range(4):
process = Process(target=worker, args=(condition, frame, i, quit_event))
process.start()
processes.append(process)
tr = threading.Thread(target=infinite_heartbreat, args=(condition,frame, quit_event))
tr.start()
time.sleep(5)
quit_event.set()
补充说明:添加了一个 quit_event
,它是以 multiprocessing.Event 的形式存在,因为它是原子的。
补充说明2:根据 @Booboo 的建议,把值改成了无锁的有符号 q
,这样可以节省一个文件描述符,并且允许负帧数(因为 -1 也是有用的)。