如何在Python中通过HTTP提供UDP流数据?

6 投票
3 回答
5520 浏览
提问于 2025-04-16 04:27

我现在正在把一个旧系统的数据通过网络展示出来。我有一个(旧的)服务器应用程序,它通过UDP发送和接收数据。这个软件使用UDP以接近实时的方式发送一组变量的顺序更新(每5-10毫秒更新一次)。所以,我不需要捕捉所有的UDP数据,获取最新的更新就足够了。

为了把这些数据放到网上,我在考虑建立一个轻量级的网页服务器,它可以读取和写入UDP数据,并通过HTTP展示这些数据。

因为我对Python比较熟悉,所以我打算用它来实现。

我的问题是:我该如何用Python持续读取UDP数据,并根据需要通过TCP/HTTP发送这些数据的快照呢? 基本上,我想做一个“UDP转HTTP”的适配器,这样就不需要去改动旧的代码了。

如果能有符合WSGI标准的解决方案,那就更好了。当然,任何建议都非常欢迎,我会非常感激!

3 个回答

6

这里有一个简单的“概念验证”应用,使用的是twisted框架。这个应用假设一个旧版的UDP服务正在本地的8000端口监听,并会在收到包含“Send me data”的数据包后开始发送UDP数据。发送的数据是三个32位的整数。此外,它还会在2080端口响应“HTTP GET /”的请求。

你可以通过运行 twistd -noy example.py 来启动这个应用:

example.py

from twisted.internet import protocol, defer
from twisted.application import service
from twisted.python import log
from twisted.web import resource, server as webserver

import struct

class legacyProtocol(protocol.DatagramProtocol):
    def startProtocol(self):
        self.transport.connect(self.service.legacyHost,self.service.legacyPort)
        self.sendMessage("Send me data")
    def stopProtocol(self):
        # Assume the transport is closed, do any tidying that you need to.
        return
    def datagramReceived(self,datagram,addr):
        # Inspect the datagram payload, do sanity checking.
        try:
            val1, val2, val3 = struct.unpack("!iii",datagram)
        except struct.error, err:
            # Problem unpacking data log and ignore
            log.err()
            return
        self.service.update_data(val1,val2,val3)
    def sendMessage(self,message):
        self.transport.write(message)

class legacyValues(resource.Resource):
    def __init__(self,service):
        resource.Resource.__init__(self)
        self.service=service
        self.putChild("",self)
    def render_GET(self,request):
        data = "\n".join(["<li>%s</li>" % x for x in self.service.get_data()])
        return """<html><head><title>Legacy Data</title>
            <body><h1>Data</h1><ul>
            %s
            </ul></body></html>""" % (data,)

class protocolGatewayService(service.Service):
    def __init__(self,legacyHost,legacyPort):
        self.legacyHost = legacyHost # 
        self.legacyPort = legacyPort
        self.udpListeningPort = None
        self.httpListeningPort = None
        self.lproto = None
        self.reactor = None
        self.data = [1,2,3]
    def startService(self):
        # called by application handling
        if not self.reactor:
            from twisted.internet import reactor
            self.reactor = reactor
        self.reactor.callWhenRunning(self.startStuff)
    def stopService(self):
        # called by application handling
        defers = []
        if self.udpListeningPort:
            defers.append(defer.maybeDeferred(self.udpListeningPort.loseConnection))
        if self.httpListeningPort:
            defers.append(defer.maybeDeferred(self.httpListeningPort.stopListening))
        return defer.DeferredList(defers)
    def startStuff(self):
        # UDP legacy stuff
        proto = legacyProtocol()
        proto.service = self
        self.udpListeningPort = self.reactor.listenUDP(0,proto)
        # Website
        factory = webserver.Site(legacyValues(self))
        self.httpListeningPort = self.reactor.listenTCP(2080,factory)
    def update_data(self,*args):
        self.data[:] = args
    def get_data(self):
        return self.data

application = service.Application('LegacyGateway')
services = service.IServiceCollection(application)
s = protocolGatewayService('127.0.0.1',8000)
s.setServiceParent(services)

补充说明

这个设计不是WSGI的。这个程序的想法是让它作为一个后台服务运行,并将它的HTTP端口设置在一个本地IP上,然后用apache或类似的工具来代理请求。它可以重新调整为WSGI格式。这样做更快,也更容易调试。

6

Twisted 在这里非常合适。它支持很多协议,比如 UDP 和 HTTP。而且它的异步特性让你可以直接把 UDP 数据流传输到 HTTP,这样就不用担心会因为使用(阻塞)线程代码而出错。它还支持 wsgi。

4

这个软件使用UDP协议来快速发送一组变量的更新,几乎是实时的(每5到10毫秒更新一次)。所以,我不需要捕捉所有的UDP数据,只要获取最新的更新就可以了。

你需要做的事情如下。

第一步。

创建一个Python应用程序,收集UDP数据并把它存储到一个文件里。可以使用XML、CSV或JSON格式来创建这个文件。

这个程序可以独立运行,就像一个后台服务。这就是你的监听器或收集器。

把文件写入一个目录,这样Apache或其他网页服务器就能轻松下载。选择合适的文件名和目录路径,这样就完成了。

完成。

如果你想要更复杂的结果,可以做更多的事情。但其实你已经完成了。

第二步。

创建一个网页应用,让别人可以请求这些由UDP监听器收集的数据。

可以使用像Django这样的网页框架来实现。尽量少写代码。Django可以直接提供你监听器生成的静态文件。

再次完成。

有些人认为关系型数据库很重要。如果你也这么认为,可以继续做这一步。尽管你已经完成了。

第三步。

修改你的数据收集方式,创建一个数据库,让Django的ORM(对象关系映射)可以查询。这需要一些学习和调整,以便得到一个整洁简单的ORM模型。

然后写你的最终Django应用,提供由你的监听器收集并加载到Django数据库中的UDP数据。

撰写回答