I/O 密集型串口应用:从线程、队列设计迁移到异步(如 Twisted)

4 投票
1 回答
6483 浏览
提问于 2025-04-16 10:11

我最近在为一个客户开发一个应用程序,这个程序需要通过串口(RS-232)“主控”与无线设备进行通信。我现在已经用线程写好了这个应用的核心部分(见下文)。不过,我在#python上看到大家普遍认为不应该使用线程,而是应该用Twisted的异步通信功能。

我找不到关于如何用Twisted进行串口异步输入输出通信的好例子。不过,我发现了Dave Peticolas的《Twisted入门》(感谢nosklo),我正在学习这个,但它是用套接字而不是串口通信(不过异步的概念解释得非常清楚)。

我该如何将这个应用从使用线程和队列转换到使用Twisted呢?这样做有什么优缺点吗?我注意到,有时候如果一个线程挂掉了,系统会出现蓝屏死机(BSOD)。

代码(msg_poller.py)

from livedatafeed import LiveDataFeed
from msg_build import build_message_to_send
from utils import get_item_from_queue
from protocol_wrapper import ProtocolWrapper, ProtocolStatus
from crc16 import *
import time
import Queue
import threading
import serial
import gc

gc.enable()
PROTOCOL_HEADER = '\x01'
PROTOCOL_FOOTER = '\x0D\x0A'
PROTOCOL_DLE = '\x90'

INITIAL_MODBUS = 0xFFFF


class Poller:
    """
    Connects to the serial port and polls nodes for data.
    Reads response from node(s) and loads that data into queue.
    Parses qdata and writes that data to database.
    """

    def __init__(self,
            port,
            baudrate,
            parity,
            rtscts,
            xonxoff,
            echo=False):
        try:
            self.serial = serial.serial_for_url(port,
                    baudrate,
                    parity=parity,
                    rtscts=rtscts,
                    xonxoff=xonxoff,
                    timeout=.01)
        except AttributeError:
            self.serial = serial.Serial(port,
                    baudrate,
                    parity=parity,
                    rtscts=rtscts,
                    xonxoff=xonxoff,
                    timeout=.01)
            self.com_data_q = None
        self.com_error_q = None
        self.livefeed = LiveDataFeed()
        self.timer = time.time()
        self.dtr_state = True
        self.rts_state = True
        self.break_state = False

    def start(self):
        self.data_q = Queue.Queue()
        self.error_q = Queue.Queue()
        com_error = get_item_from_queue(self.error_q)
        if com_error is not None:
            print 'Error %s' % (com_error)
        self.timer = time.time()
        self.alive = True

        # start monitor thread
        #
        self.mon_thread = threading.Thread(target=self.reader)
        self.mon_thread.setDaemon(1)
        self.mon_thread.start()

        # start sending thread
        #
        self.trans_thread = threading.Thread(target=self.writer)
        self.trans_thread.setDaemon(1)
        self.trans_thread.start()

    def stop(self):
        try:
            self.alive = False
            self.serial.close()
        except (KeyboardInterrupt, SystemExit):
            self.alive = False

    def reader(self):
        """
        Reads data from the serial port using self.mon_thread.
        Displays that data on the screen.
        """
        from rmsg_format import message_crc, message_format
        while self.alive:
            try:
                while self.serial.inWaiting() != 0:

                # Read node data from the serial port. Data should be 96B.

                    data = self.serial.read(96)
                    data += self.serial.read(self.serial.inWaiting())

                    if len(data) > 0:

                        # Put data in to the data_q object
                        self.data_q.put(data)
                        if len(data) == 96:
                            msg = self.data_q.get()

                            pw = ProtocolWrapper(
                                        header=PROTOCOL_HEADER,
                                        footer=PROTOCOL_FOOTER,
                                        dle=PROTOCOL_DLE)
                            status = map(pw.input, msg)

                            if status[-1] == ProtocolStatus.IN_MSG:
                                # Feed all the bytes of 'msg' sequentially into pw.input

                                # Parse the received CRC into a 16-bit integer
                                rec_crc = message_crc.parse(msg[-4:]).crc

                                # Compute the CRC on the message
                                calc_crc = calcString(msg[:-4], INITIAL_MODBUS)
                                from datetime import datetime
                                ts = datetime.now().strftime('%Y/%m/%d %H:%M:%S')
                                if rec_crc != calc_crc:
                                    print ts
                                    print 'ERROR: CRC Mismatch'
                                    print msg.encode('hex')
                                else:
                                    #msg = message_format.parse(msg[1:])
                                    #print msg.encode('hex') + "\r\n"
                                    msg = message_format.parse(msg[1:])
                                    print msg
                                    #return msg
                                    gc.collect()
                    time.sleep(.2)
            except (KeyboardInterrupt, SystemExit, Exception, TypeError):
                self.alive = False
                self.serial.close()
                raise

    def writer(self):
        """
        Builds the packet to poll each node for data.
        Writes that data to the serial port using self.trans_thread
        """
        import time
        try:
            while self.alive:
                try:
                    dest_module_code = ['DRILLRIG',
                            'POWERPLANT',
                            'GENSET',
                            'MUDPUMP']
                    dest_ser_no = lambda x: x + 1
                    for code in dest_module_code:
                        if code != 'POWERPLANT':
                            msg = build_message_to_send(
                                    data_len=0x10,
                                    dest_module_code='%s' % (code),
                                    dest_ser_no=dest_ser_no(0),
                                    dest_customer_code='*****',
                                    ret_ser_no=0x01,
                                    ret_module_code='DOGHOUSE',
                                    ret_customer_code='*****',
                                    command='POLL_NODE',
                                    data=[])
                            self.serial.write(msg)
                            time.sleep(.2)
                            gc.collect()
                        elif code == 'POWERPLANT':
                            msg = build_message_to_send(
                                    data_len=0x10,
                                    dest_module_code='POWERPLANT',
                                    dest_ser_no=dest_ser_no(0),
                                    dest_customer_code='*****',
                                    ret_ser_no=0x01,
                                    ret_module_code='DOGHOUSE',
                                    ret_customer_code='*****',
                                    command='POLL_NODE',
                                    data=[])
                            self.serial.write(msg)
                            time.sleep(.2)
                            gc.collect()
                            msg = build_message_to_send(
                                    data_len=0x10,
                                    dest_module_code='POWERPLANT',
                                    dest_ser_no=dest_ser_no(1),
                                    dest_customer_code='*****',
                                    ret_ser_no=0x01,
                                    ret_module_code='DOGHOUSE',
                                    ret_customer_code='*****',
                                    command='POLL_NODE',
                                    data=[])
                            self.serial.write(msg)
                            time.sleep(.2)
                            gc.collect()
                except (KeyboardInterrupt, SystemExit):
                    self.alive = False
                    self.serial.close()
                    raise
        except (KeyboardInterrupt, SystemExit):
            self.alive = False
            self.serial.close()
            raise


def main():
    poller = Poller(
            port='COM4',
            baudrate=115200,
            parity=serial.PARITY_NONE,
            rtscts=0,
            xonxoff=0,
            )
    poller.start()
    poller.reader()
    poller.writer()
    poller.stop()
if __name__ == '__main__':
    main()                                                                      

1 个回答

7

直接把线程/队列的方法和使用twisted的方法一一对应起来写程序是非常困难的,甚至可以说是不可能的。

我建议你先熟悉一下twisted和它的反应器(reactor)模式,了解一下协议(Protocol)和一些特定的方法。可以把它想象成,以前你用线程和队列写的那些异步操作,现在在使用twisted的deferred时,都是自动处理好的,你不需要再手动写了。

twisted确实支持通过它的反应器来使用串口(SerialPort),使用的是SerialPort传输类,基本结构大概是这样的。

from twisted.internet import reactor
from twisted.internet.serialport import SerialPort

SerialPort(YourProtocolClass(), Port, reactor, baudrate=baudrate))
reactor.run() 

在YourProtocolClass()里,你会处理与串口通信相关的各种事件。你可以在doc/core/examples目录下找到一些示例,比如gpsfix.py和mouse.py。

撰写回答