尝试模拟恒定字节率。time.sleep 结果混淆
背景
我在自己的电脑上使用Windows 7,在学校的电脑上使用Linux(Debian),通过ssh远程控制这台电脑。
我想模拟一个麦克风的稳定字节率,从一个波形文件中读取数据,就像有人在说话一样。问题是,实际的字节率低于目标值。
我选择了32KB/s的速率和0.020秒的捕获时间。
我用time.sleep来实现模拟麦克风,每0.020秒发送一块数据。但得到的速率大约是27KB/s,而不是32KB/s。
问题
我决定测试一下Linux机器上time.sleep的精确度,参考了这个问题的思路。
我做了两种测试:1)忙等待 2)正常等待
从我得到的样本来看,平均下来,Linux机器的睡眠分辨率是4毫秒,而Windows的分辨率小于或等于1毫秒。
问题列表
- 是什么可能限制了Linux机器的睡眠分辨率?
- (在Linux上)为什么忙等待的分辨率和time.sleep一样?
- 我该如何成功地从波形文件中模拟一个麦克风?
代码
import time
def busy_sleep(t):
s=time.time()
while time.time() - s < t:
pass
e=time.time()
return e-s
def normal_sleep(t):
s=time.time()
time.sleep(t)
e=time.time()
return e-s
def test(fun):
f = lambda x: sum(fun(x) for d in range(10))/10
print("0.100:{}".format(f(0.100)))
print("0.050:{}".format(f(0.050)))
print("0.025:{}".format(f(0.025)))
print("0.010:{}".format(f(0.010)))
print("0.009:{}".format(f(0.010)))
print("0.008:{}".format(f(0.008)))
print("0.007:{}".format(f(0.007)))
print("0.006:{}".format(f(0.006)))
print("0.005:{}".format(f(0.005)))
print("0.004:{}".format(f(0.004)))
print("0.003:{}".format(f(0.003)))
print("0.002:{}".format(f(0.002)))
print("0.001:{}".format(f(0.001)))
if __name__=="__main__":
print("Testing busy_sleep:")
test(busy_sleep)
print("Testing normal_sleep:")
test(normal_sleep)
结果
"""
Debian
Testing busy_sleep:
0.100:0.10223722934722901
0.050:0.051996989250183104
0.025:0.027996940612792967
0.020:0.02207831859588623
0.010:0.011997451782226562
0.009:0.011997222900390625
0.008:0.009998440742492676
0.007:0.007997279167175292
0.006:0.0079974365234375
0.005:0.007997465133666993
0.004:0.005918483734130859
0.003:0.003997836112976074
0.002:0.0039977550506591795
0.001:0.003997611999511719
Testing normal_sleep:
0.100:0.1020797061920166
0.050:0.051999988555908205
0.025:0.028000001907348634
0.020:0.02192000865936279
0.010:0.011999979019165039
0.009:0.012000055313110351
0.008:0.010639991760253906
0.007:0.008000001907348633
0.006:0.00799997329711914
0.005:0.008000059127807617
0.004:0.006159958839416504
0.003:0.004000000953674317
0.002:0.00399998664855957
0.001:0.004000091552734375
$ uname -a
Linux 3.2.0-4-amd64 #1 SMP Debian 3.2.57-3+deb7u2 x86_64 GNU/Linux
"""
"""
Windows 7
Testing busy_sleep:
0.100:0.10000572204589844
0.050:0.05000288486480713
0.025:0.0250014066696167
0.010:0.010500597953796388
0.009:0.010500597953796388
0.008:0.008000493049621582
0.007:0.00740041732788086
0.006:0.006400299072265625
0.005:0.005400300025939942
0.004:0.004700303077697754
0.003:0.003200197219848633
0.002:0.002700185775756836
0.001:0.0016000032424926759
Testing normal_sleep:
0.100:0.10000579357147217
0.050:0.0500028133392334
0.025:0.02500150203704834
0.010:0.01000049114227295
0.009:0.0100006103515625
0.008:0.008000493049621582
0.007:0.007000398635864257
0.006:0.006000304222106934
0.005:0.00500030517578125
0.004:0.0040001869201660155
0.003:0.0030002117156982424
0.002:0.0020000934600830078
0.001:0.0010000944137573243
"""
真实代码
import os
import wave
import sys
import io
import time
FORMAT = 8 #get_format_from_width(2)
NCHANNELS = 1
FRAMERATE = 16000 # samples per second
SAMPWIDTH = 2 # bytes in a sample
BYTE_RATE = FRAMERATE*SAMPWIDTH
CHUNK_DURATION = 0.020
CHUNK_BYTES = int(CHUNK_DURATION*BYTE_RATE)
class StreamSimulator:
def __init__(self):
wf = wave.open("Kalimba.wav","rb")
buf = io.BytesIO()
buf.write(wf.readframes(wf.getnframes()))
wf.close()
buf.seek(0)
self.buf = buf
self.step = time.time()
def delay(self):
#delay
delta = time.time() - self.step
self.step=time.time()
delay = CHUNK_DURATION - delta
if delay > 0.001:
time.sleep(delay)
def read(self):
buf = self.buf
data = buf.read(CHUNK_BYTES)
if len(data) == 0:
buf.seek(0)
data = buf.read(CHUNK_BYTES)
self.delay()
return data
def close(self):
self.buf.close()
class DynamicPainter:
def __init__(self):
self.l=0
def paint(self,obj):
str1=str(obj)
l1=len(str1)
bs="\b"*self.l
clean=" "*self.l
total = bs+clean+bs+str1
sys.stdout.write(total)
sys.stdout.flush()
self.l=l1
if __name__=="__main__":
painter = DynamicPainter()
stream = StreamSimulator()
produced = 0
how_many = 0
painted = time.time()
while True:
while time.time()-painted < 1:
d = stream.read()
produced += len(d)
how_many += 1
producing_speed = int(produced/(time.time()-painted))
painter.paint("Producing speed: {} how many: {}".format(producing_speed,how_many))
produced=0
how_many=0
painted = time.time()
编辑
修改了“真实代码”,增加了包括睡眠时间的时间测量。
但现在我的字节率是原来的两倍:产生速度:63996,数量:100
这让我非常困惑。我尝试了不同的字节率,结果每次都是两倍。
结论
感谢@J.F.Sebastian 和他的代码,我了解到:
- 使用截止时间作为时间参考比每次循环创建新的参考更好。
- 使用截止时间可以“平摊”time.sleep的不精确性,虽然在目标比特率附近波动,但最终得到的平均值是正确且更加稳定的。
- 你只需要使用一次time.time(),这意味着计算的不精确性更少。
结果,我得到了一个稳定的32000 B/s,有时波动到31999,很少波动到31745。
现在我可以毫无延迟或抖动地听音乐了!
最终代码
def read(self):
buf = self.buf
data = buf.read(CHUNK_BYTES)
if len(data) == 0:
buf.seek(0)
data = buf.read(CHUNK_BYTES)
self.deadline += CHUNK_DURATION
delay = self.deadline - time.time()
if delay > 0:
time.sleep(delay)
return data
2 个回答
在你提到的问题中,大家都知道,程序的休眠时间并没有固定的保证,具体时间会根据操作系统的不同而有很大差异。不过,如果你每20毫秒发送一次数据,4毫秒的精度应该是足够的。其实,你不需要去提高sleep()的准确性。
在Debian系统上,当输入时间大约是0.02秒时,你的电脑大约会在你要求的时间里休眠12/10的时间。计算一下,10/12乘以32就是26.6,所以如果你只收到27 KB/s的数据,这个结果是合理的。
与其让程序休眠0.02秒,不如使用一种自适应的休眠时间。你可以测量一下上一次操作花了多长时间(包括休眠和发送数据的时间),然后缩短你的休眠时间,这样整个操作就能控制在0.02秒内。
总结
感谢 @J.F.Sebastian 和他的代码,我了解到:
- 用一个截止时间作为时间参考比每次循环都创建一个新的参考要好。
- 使用截止时间可以“平摊” time.sleep 的不准确性,虽然在想要的比特率附近会有些波动,但最终得到的平均值是正确的(而且更稳定)。
- 你只需要用一次 time.time(),这意味着计算的不准确性会更少。
结果是,我得到了一个稳定的32000 B/s,有时会波动到31999,很少会降到31745。
现在我可以听音乐而没有任何延迟或卡顿!
我尝试使用 @J.F.Sebastian 的实现,只用 %
运算符来处理剩余时间,但KB/s波动得很奇怪,所以我决定继续使用截止时间的实现,虽然这种方法在不断加浮点数时会有不准确性。不过,整体结果对我来说已经足够了。
谢谢大家。
最终代码
def read(self):
self.deadline += 0.020
delay = self.deadline - time.perf_counter()
if delay > 0:
time.sleep(delay)
return self._read()