处理两个传入数据流并在python中组合它们?

2024-06-07 16:51:18 发布

您现在位置:Python中文网/ 问答频道 /正文

我一直在研究python中的线程、多处理异步等各种选项,作为处理两个传入流并将其组合的方法。关于这方面的信息很多,但示例往往错综复杂,更常见的是将单个任务拆分为多个线程或进程,以加速任务的最终结果

我有一个数据流通过一个套接字(目前使用UDP作为另一个应用程序在本地运行在我的PC上,但是如果应用程序需要在一个单独的PC上运行,将来可能会考虑切换到TCP),并且一个串行流通过RS232适配器进入,并且我需要组合这些流。然后在另一个套接字上重新传输这个新流

问题是它们以不同的速率输入(串行数据以125hz的频率输入,套接字数据以60-120hz的频率输入),因此我想将最新的串行数据添加到套接字数据中

我的问题本质上是,根据其他人以前的经验,处理这个问题的最佳方式是什么。由于这本质上是一个I/O任务,它更倾向于线程化(我知道GIL限制了并发性),但由于高输入率,我想知道多处理是否是一种方式

如果使用线程,我想访问每个共享资源的最佳方法是使用锁将串行数据写入对象,并且每当有新的套接字数据时,在单独的线程中获取锁,访问对象中的最新串行数据,处理它,然后将其发送到另一个套接字。但是,主线程在每个新传入的套接字消息之间有很多工作要做

通过多处理,我可以使用管道从另一个进程请求和接收最新的串行数据,但这只会减轻串行数据处理的负担,而且仍然会为主进程留下很多时间


Tags: 数据对象方法信息应用程序示例进程选项
3条回答

我可以建议这里使用的方法-https://stackoverflow.com/a/641488/4895189。如果您有一个用于通过套接字和串行接口接收的数据的结构,那么您可以将这些带有时间戳的结构写入各个管道对象

根据我的经验,我更喜欢多处理而不是线程。我使用pyserial进行UART的读写,其中主线程用于写,另一个线程用于读。由于我找不到的原因,如果我在连续写入调用之间没有添加相当大的延迟(~1000ms),那么我在输入和输出中都会丢失帧。通常,我发现将pyserial与Python的线程一起使用会有奇怪的行为。目前,我不确定这是由于pyserial的实现还是Python的GIL

也就是说,我认为您可以根据我上面链接的答案使用以下结构进行设置:

子进程1-从套接字读取数据并使用时间戳写入管道
子进程2-使用pyserial读取数据并使用时间戳写入管道
主进程-按您选择的间隔对两个管道对象执行选择,合并流并传输到输出套接字

我认为使用select非常简单。它告诉您哪个套接字具有要读取的数据(或EOF

事实上,以前也有人问过类似的问题: Python - Server listening from two UDP sockets

请注意select返回的套接字中只有一次读取保证不会阻塞。继续阅读前再次检查。这意味着,如果您正在读取数据流,请将其读入缓冲区,直到您收到可以处理的整行或其他数据单元

您的问题与链接问题不同,因为您需要从网络和串行接口读取。Linux没有问题,任何文件描述符都可以与select一起使用。但是在Windows上,只有套接字可以与select一起使用。我不使用Windows,但看起来您需要一个专用线程来读取串行线

您确定这里需要多线程吗?如果不是严格需要,我肯定会避免它

  • 我最近没有针对串行端口和套接字进行太多编程,但据我所知,因为这两个数据都是由硬件/中间件缓冲的,因此从这个角度来看,每个传入流不需要线程
  • 关于有很多工作要做的主线程:您确定不能在执行I/O的线程中组合这些工作吗

如果可行,我将编写一个循环,交替读取两个流,处理/组合它并将其写入out套接字:

while True:
  serial_data_in = serial_in.read()
  socket_data_in = socket_in.read()
  socket_out.write(combine(serial_data_in, socket_data_in))

可能需要对read()的超时进行一些调整,以避免在一个上丢失数据,如果另一个上没有数据传入

如果那样不行,我仍然会保留尽可能少的线程。例如,您可以使用一个线程进行读取(如上所述),并使用Queue与处理和写入out套接字的线程通信:

q = queue.Queue()

def worker_1:
  while True:
    serial_data_in = serial_in.read()
    socket_data_in = socket_in.read()
    q.put((serial_data_in, socket_data_in))

def worker_2:
  while True:
    (serial_data_in, socket_data_in) = q.get()
    socket_out.write(combine(serial_data_in, socket_data_in))
    q.task_done()

Queues消除了锁定对象较低级别的同步复杂性

相关问题 更多 >

    热门问题