串行通信和队列的问题
我在创建一个多进程的串口日志记录器时遇到了一些问题。我的计划是:有一个单独的进程从串口读取数据,然后把这些数据放到一个队列里。主进程在一段时间后读取整个队列,并处理这些数据。
但是我不确定这样做是否正确,因为有时候数据的顺序不对。对于慢速通信来说,这个方法还算有效。
我需要锁定什么东西吗?有没有更聪明的办法呢?
import time
import serial
from multiprocessing import Process, Queue
def myProcess(q):
with serial.Serial("COM2", 115200, 8, "E", 1, timeout=None) as ser:
while True:
q.put("%02X" % ser.read(1)[0])
if __name__=='__main__':
try:
q = Queue()
p = Process(target=myProcess, args=(q,))
p.daemon = True
p.start()
data = []
while True:
print(q.qsize()) #!debug
while not q.empty(): #get all data from queue
data.append(q.get())
#proc_data(data) #data processing
time.sleep(1) #emulate data processing
del data[:] #clear buffer
except KeyboardInterrupt:
print("clean-up") #!debug
p.join()
更新:我尝试了另一种基于线程的版本(见下面的代码),但效果和问题还是一样。数据的转移部分工作得很好,但在转移和新数据之间总是会丢失一个字节——当主进程读取队列时,脚本会错过这个字节?
import time, serial, threading, queue
def read_port(q):
with serial.Serial("COM2", 19200, 8, "E", 1, timeout=None) as ser:
while t.is_alive():
q.put("%02X" % ser.read(1)[0])
def proc_data(data, crc):
#processing data here
carry = data[len(data)/2:] #DEBUG: emulate result (return last half of data)
return carry
if __name__=='__main__':
try:
q = queue.Queue()
t = threading.Thread(target=read_port, args=(q,))
t.daemon = True
t.start()
data = []
while True:
try:
while True:
data.append(q.get_nowait()) #get all data from queue
except queue.Empty:
pass
print(data) #DEBUG: show carry-over + new data
data = proc_data(data) #process data and store carry-over
print(data) #DEBUG: show new carry-over
time.sleep(1) #DEBUG: emulate processing time
except KeyboardInterrupt:
print("clean-up")
t.join(0)
1 个回答
0
考虑以下代码。
1) 这两个进程是兄弟进程;父进程只是把它们设置好,然后等待用户按下控制-C来中断一切。
2) 一个进程把原始字节放到共享队列里。
3) 另一个进程会等待第一个字节的数据。当它收到第一个字节后,就会获取剩下的数据,把它以十六进制的形式输出,然后继续处理。
4) 父进程只是设置好其他进程,然后通过 signal.pause()
等待中断。
注意,使用 multiprocessing
时,qsize()
(还有可能的 empty()
)函数是不可靠的——因此上面的代码能可靠地获取你的数据。
来源
import signal, time
import serial
from multiprocessing import Process, Queue
def read_port(q):
with serial.Serial("COM2", 115200, 8, "E", 1, timeout=None) as ser:
while True:
q.put( ser.read(1)[0] )
def show_data(q):
while True:
# block for first byte of data
data = [ q.get() ]
# consume more data if available
try:
while True:
data.append( q.get_nowait() )
except Queue.Empty:
pass
print 'got:', ":".join("{:02x}".format(ord(c)) for c in data)
if __name__=='__main__':
try:
q = Queue()
Process(target=read_port, args=(q,)).start()
Process(target=show_data, args=(q,)).start()
signal.pause() # wait for interrupt
except KeyboardInterrupt:
print("clean-up") #!debug