在通过进程发送绘图数据时出现多进程破损管道错误
我做了一个用pyqt5制作的应用程序,用来测试麦克风。我使用了pydub和pyaudio这两个模块来实现这个功能。同时,我还用matplotlib来绘制麦克风的数据。我有一个QDialog窗口,它通过一个发射器与QDialog进行通信,并使用多进程来读取pyaudio的输入流。当我在界面上选择对麦克风声音进行归一化处理时,如果我不说话,就会有噪音。而且在这种情况下,过了一段时间,应用程序会崩溃,并出现以下错误:
Traceback (most recent call last):
File "C:\Users\chris\Documents\My Projects\papinhio-player\src\python+\main-window\../..\python+\menu-1\manage-input-and-output-sound-devices\microphone-input-device-settings\microphone-input-device-setting.py", line 866, in run
self.to_emitter.send({"type":"plot_data","plot_data":[self.x_vals,self.y_vals],"normalized_value":normalized_value})
File "C:\Python\Lib\multiprocessing\connection.py", line 206, in send
self._send_bytes(_ForkingPickler.dumps(obj))
File "C:\Python\Lib\multiprocessing\connection.py", line 301, in _send_bytes
nwritten, err = ov.GetOverlappedResult(True)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
BrokenPipeError: [WinError 109] Η διοχέτευση έχει τερματιστεί
Process Manage_Input_Device_Child_Proc-1:
Traceback (most recent call last):
File "C:\Users\chris\Documents\My Projects\papinhio-player\src\python+\main-window\../..\python+\menu-1\manage-input-and-output-sound-devices\microphone-input-device-settings\microphone-input-device-setting.py", line 866, in run
self.to_emitter.send({"type":"plot_data","plot_data":[self.x_vals,self.y_vals],"normalized_value":normalized_value})
File "C:\Python\Lib\multiprocessing\connection.py", line 206, in send
self._send_bytes(_ForkingPickler.dumps(obj))
File "C:\Python\Lib\multiprocessing\connection.py", line 301, in _send_bytes
nwritten, err = ov.GetOverlappedResult(True)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
BrokenPipeError: [WinError 109] Η διοχέτευση έχει τερματιστεί
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "C:\Python\Lib\multiprocessing\process.py", line 314, in _bootstrap
self.run()
File "C:\Users\chris\Documents\My Projects\papinhio-player\src\python+\main-window\../..\python+\menu-1\manage-input-and-output-sound-devices\microphone-input-device-settings\microphone-input-device-setting.py", line 875, in run
self.to_emitter.send({"type":"error","error_message":error_message})
File "C:\Python\Lib\multiprocessing\connection.py", line 206, in send
self._send_bytes(_ForkingPickler.dumps(obj))
File "C:\Python\Lib\multiprocessing\connection.py", line 289, in _send_bytes
ov, err = _winapi.WriteFile(self._handle, buf, overlapped=True)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
BrokenPipeError: [WinError 232] Η διοχέτευση κλείνει
Process finished with exit code -1073741571 (0xC00000FD)
这个错误和绘图数据有关。如果我把这一行注释掉:self.to_emitter.send({"type":"plot_data","plot_data":[self.x_vals,self.y_vals],"normalized_value":normalized_value})
,那么应用程序就不会崩溃。
相关代码:
class Manage_Input_Device_Emitter(QThread):
try:
plot_data_signal = pyqtSignal(list,float)
save_finished = pyqtSignal()
devices_settings = pyqtSignal(list,int,float,float,float,float,float)
error_signal = pyqtSignal(str)
except:
pass
def __init__(self, from_process: Pipe):
try:
super().__init__()
self.data_from_process = from_process
except:
pass
def run(self):
try:
while True:
data = self.data_from_process.recv()
if data["type"]=="plot_data":
self.plot_data_signal.emit(data["plot_data"],data["normalized_value"])
elif data["type"]=="save_finished":
self.save_finished.emit()
elif data["type"]=="available_devices":
self.devices_settings.emit(data["devices"],data["device_index"],data["volume"],data["is_normalized"],data["pan"],data["low frequency"],data["high frequency"])
elif data["type"] == "error":
self.error_signal.emit(data["error_message"])
except:
error_message = traceback.format_exc()
self.error_signal.emit(error_message)
class Manage_Input_Device_Child_Proc(Process):
def __init__(self, to_emitter, from_mother):
try:
super().__init__()
self.daemon = False
self.to_emitter = to_emitter
self.data_from_mother = from_mother
#local argument(s) save
except:
try:
error_message = str(traceback.format_exc())
to_emitter.send({"type":"error","error_message":error_message})
except:
pass
def run(self):
try:
self.fetch_input_settings()
self.bit_rate = 128*1024 #128 kb/sec
self.packet_time = 125 #125 msec
#self.packet_time = 125*44100/32768
self.packet_size = int(16384/4)
#self.new_sample_rate = 32768
self.new_sample_rate = 44100
self.TIME_WINDOW = 3000
self.format = pyaudio.paInt16
self.channels = 2
self.input_stream = None
self.output_stream = None
self.play_status = "stopped"
self.process_terminated = False
while(self.process_terminated == False):
if self.play_status == "stopped":
data = self.data_from_mother.get()
else:
q_size = self.data_from_mother.qsize()
if q_size>0:
data = self.data_from_mother.get()
else:
data = None
if data is not None:
if data["type"] == "stop-process":
self.process_terminated = True
return 1
if data["type"] == "save":
device_name = data["device_name"]
volume = data["volume"]
is_normalized = data["is_normalized"]
pan = data["pan"]
low_frequency = data["low_frequency"]
high_frequency = data["high_frequency"]
self.save(device_name,volume,is_normalized,pan,low_frequency,high_frequency)
break
elif data["type"] == "test":
self.output_stream = self.p.open(format=pyaudio.paInt16,channels=self.channels,rate=self.new_sample_rate,output=True,output_device_index=self.output_device_index,frames_per_buffer=self.packet_size)
#self.output_stream = self.p.open(format=pyaudio.paInt16,channels=self.channels,rate=self.new_sample_rate,output=True,frames_per_buffer=self.packet_size)
self.output_stream.start_stream()
self.input_device_name = data["content"]
for input_device in self.input_devices:
if(data["content"]==input_device[2]):
self.input_device_index = input_device[1]
self.input_stream = self.p.open(format=pyaudio.paInt16,channels=1,rate=self.new_sample_rate,input=True,input_device_index=self.input_device_index,frames_per_buffer=self.packet_size)
self.input_stream.start_stream()
self.input_channels = 1
'''
try:
self.input_stream = self.p.open(format=pyaudio.paInt16,channels=self.channels,rate=self.new_sample_rate,input=True,input_device_index=self.input_device_index,frames_per_buffer=self.packet_size)
self.input_stream.start_stream()
self.input_channels = self.channels
except Exception as e:
#self.input_stream = self.p.open(format=pyaudio.paInt16,channels=1,rate=self.new_sample_rate,input=True,input_device_index=self.input_device_index,frames_per_buffer=self.packet_size)
self.input_stream = self.p.open(format=pyaudio.paInt16,channels=1,rate=self.new_sample_rate,input=True,input_device_index=self.input_device_index,frames_per_buffer=self.packet_size)
self.input_stream.start_stream()
self.input_channels = 1
'''
self.play_status = "playing"
self.chunk_number = 0
self.current_duration_milliseconds = 0
self.now = datetime.now()
self.x_vals = np.array([])
self.y_vals = np.array([])
elif data["type"] == "stop":
self.play_status = "stopped"
self.chunk_number = 0
self.current_duration_milliseconds = 0
try:
self.output_stream.stop_stream()
self.output_stream.close()
self.input_stream.stop_stream()
self.input_stream.close()
except:
pass
self.now = datetime.now()
self.x_vals = np.array([])
self.y_vals = np.array([])
elif data["type"] == "volume":
self.volume = data["value_base_100"]
elif data["type"] == "is_normalized":
self.is_normalized = data["boolean_value"]
elif data["type"] == "pan":
self.pan = data["pan_value"]
elif data["type"] == "low_frequency":
self.low_frequency = data["low_frequency_value"]
elif data["type"] == "high frequency":
self.high_frequency = data["high_frequency_value"]
if self.play_status=="playing":
in_data = self.input_stream.read(self.packet_size,exception_on_overflow = False)
if self.input_channels == 2:
slice = AudioSegment(in_data, sample_width=2, frame_rate=self.new_sample_rate, channels=2)
else:
slice = AudioSegment(in_data, sample_width=2, frame_rate=self.new_sample_rate, channels=1)
slice = AudioSegment.from_mono_audiosegments(slice, slice)
if self.pan!=0:
slice = slice.pan(self.pan/100)
if self.low_frequency>20:
slice = effects.high_pass_filter(slice, self.low_frequency)
if self.high_frequency>20000:
slice = effects.low_pass_filter(slice, self.high_frequency)
if(self.volume==0):
db_volume = -200
else:
db_volume = 20*math.log10(self.volume/100)
slice = slice+db_volume
if self.is_normalized:
slice = self.normalize_method(slice,0.1)
self.output_stream.write(slice.raw_data)
free = self.output_stream.get_write_available()
if free > self.packet_size: # Is there a lot of space in the buffer?
tofill = free - self.packet_size
silence = chr(0)*tofill*self.channels*2
self.output_stream.write(silence) # Fill it with silence
#free = self.output_stream.get_write_available()
#print(free)
chunk_time = len(slice)
samples = slice.get_array_of_samples()
left_samples = samples[::2]
right_samples = samples[1::2]
left_audio_data = np.frombuffer(left_samples, np.int16)[::16] #down sampling
right_audio_data = np.frombuffer(right_samples, np.int16)[::16] #down sampling
audio_data = np.vstack((left_audio_data,right_audio_data)).ravel('F')
time_data = np.array([])
for i in range(0,len(audio_data)):
time_data = np.append(time_data, self.now)
self.now = self.now+timedelta(milliseconds=chunk_time/len(audio_data))
self.x_vals = np.concatenate((self.x_vals, time_data))
self.y_vals = np.concatenate((self.y_vals, audio_data))
if(self.x_vals.size>audio_data.size*(self.TIME_WINDOW/chunk_time)):
self.x_vals = self.x_vals[audio_data.size:]
self.y_vals = self.y_vals[audio_data.size:]
average_data_value = slice.max
normalized_value = abs(average_data_value)/slice.max_possible_amplitude
if normalized_value>1:
normalized_value = 1
if self.play_status == "stopped":
normalized_value = 0
self.to_emitter.send({"type":"plot_data","plot_data":[self.x_vals,self.y_vals],"normalized_value":normalized_value})
self.now = datetime.now()
self.chunk_number += 1
self.current_duration_milliseconds += chunk_time
except:
error_message = str(traceback.format_exc())
print(error_message)
self.to_emitter.send({"type":"error","error_message":error_message})
def normalize_method(self,seg, headroom):
try:
peak_sample_val = seg.max
# if the max is 0, this audio segment is silent, and can't be normalized
if peak_sample_val == 0:
return seg
target_peak = seg.max_possible_amplitude * utils.db_to_float(-headroom)
#target_peak = seg.max_possible_amplitude * (percent_headroom)
needed_boost = utils.ratio_to_db(target_peak / peak_sample_val)
return seg.apply_gain(needed_boost)
except:
error_message = traceback.format_exc()
self.to_emitter.send({"type":"error","error_message":error_message})
return seg
1 个回答
0
我对QThread进行了修改,变成了:
if self.data_from_process.poll():
data = self.data_from_process.recv()
else:
time.sleep(0.1)
continue