在服务器上使用tail -f日志,处理数据,然后通过Twisted服务给客户端

9 投票
1 回答
1195 浏览
提问于 2025-04-17 06:27

目标:在客户端的wxPython界面中显示来自服务器的数据

我是Twisted的新手。我在Windows 7的客户端上运行一个wxPython的图形界面,同时在Ubuntu服务器上有一个程序在生成日志。我现在的尝试是使用tail -f命令来实时查看日志,然后把输出通过Twisted服务器传送给客户端,只发送符合我正则表达式条件的数据。我已经打开了一个隧道,所以不需要再用SSH来复杂化事情。我已经让下面这段代码运行起来了,但它只发送了输入的第一行。我知道我需要不断检查输入是否有新行,然后把它写入传输中,但我不太确定怎么做才能不断开连接。

我找不到足够的信息来拼凑出一个完整的解决方案。我也尝试过使用套接字和文件输入输出的其他方法,但我觉得Twisted似乎是解决这个问题的好工具。我这样做是否正确?有什么建议都很感激。谢谢

#! /usr/bin/python

import optparse, os, sys

from twisted.internet.protocol import ServerFactory, Protocol

def parse_args():
    usage = """usage: %prog [options]
"""

    parser = optparse.OptionParser(usage)

    help = "The port to listen on. Default to a random available port."
    parser.add_option('--port', type='int', help=help)

    help = "The interface to listen on. Default is localhost."
    parser.add_option('--iface', help=help, default='localhost')

    options =parser.parse_args()

    return options#, log_file

class LogProtocol(Protocol):
    def connectionMade(self):
        for line in self.factory.log:
            self.transport.write(line)

class LogFactory(ServerFactory):
    protocol = LogProtocol

    def __init__(self,log):
        self.log = log

def main():
    log = sys.stdin.readline()
    options, log_file = parse_args()

    factory = LogFactory(log)

    from twisted.internet import reactor

    port = reactor.listenTCP(options.port or 0, factory,
                             interface=options.iface)

    print 'Serving %s on %s.' % (log_file, port.getHost())

    reactor.run()


if __name__ == '__main__':
    main()

针对第一个评论,我也尝试过直接在Python中读取日志,但程序挂起了。代码如下:

#! /usr/bin/python

import optparse, os, sys, time
from twisted.internet.protocol import ServerFactory, Protocol

def parse_args():
    usage = """ usage: %prog [options]"""

    parser = optparse.OptionParser(usage)

    help = "The port to listen on. Default to a random available port"
    parser.add_option('--port', type='int', help=help, dest="port")

    help = "The logfile to tail and write"
    parser.add_option('--file', help=help, default='log/testgen01.log',dest="logfile")

    options = parser.parse_args()
    return options

class LogProtocol(Protocol):
    def connectionMade(self):
        for line in self.follow():
            self.transport.write(line)
        self.transport.loseConnection()

    def follow(self):
        while True:
            line = self.factory.log.readline()
            if not line:
                time.sleep(0.1)
                continue
            yield line

class LogFactory(ServerFactory):
    protocol = LogProtocol

    def __init__(self,log):
        self.log = log

def main():
    options, log_file = parse_args()
    log = open(options.logfile)
    factory = LogFactory(log)

    from twisted.internet import reactor

    port = reactor.listenTCP(options.port or 0, factory)    #,interface=options.iface)

    print 'Serving %s on %s.' % (options.logfile, port.getHost())

    reactor.run()


if __name__ == '__main__':
    main()

1 个回答

7

你这里有几个容易分开的目标要实现。首先,我会讲讲如何查看日志文件。

你的生成器有几个问题。其中一个比较严重,就是它调用了 time.sleep(0.1)。这个 sleep 函数会让程序暂停指定的时间。在这段时间内,调用它的线程什么都做不了(这就是“阻塞”的大致意思)。你在同一个线程中遍历生成器,而 LogProtocol.connectionMade 也是在这个线程中被调用的(因为 connectionMade 调用了 follow)。LogProtocol.connectionMade 是在 Twisted 的反应器运行的同一个线程中被调用的,因为 Twisted 大致上是单线程的。

所以,你的 sleep 调用让反应器阻塞了。只要 sleep 在阻塞反应器,反应器就不能做任何事情,比如通过套接字发送数据。顺便说一下,阻塞是可以传递的。所以 LogProtocol.connectionMade 是个更大的问题:它会无限循环,睡眠和读取。这就让反应器无限期地阻塞。

你需要从文件中读取行,而不让程序阻塞。你可以通过轮询来做到这一点——这实际上就是你现在采用的方法——但要避免使用 sleep。可以使用 reactor.callLater 来安排未来的文件读取:

def follow(fObj):
    line = fObj.readline()
    reactor.callLater(0.1, follow, fObj)

follow(open(filename))

你也可以让 LoopingCall 处理这个让它永远循环的部分:

def follow(fObj):
    line = fObj.readline()

from twisted.internet.task import LoopingCall

loop = LoopingCall(follow, open(filename))
loop.start(0.1)

这两种方法都可以让你在不阻塞反应器的情况下,随着时间的推移从文件中读取新行。当然,它们在读取后都会把行丢掉。这就引出了第二个问题……

你需要对文件中新行的出现做出反应。你可能想把它写到你的连接中。这并不难:“反应”其实很简单,通常只需要调用一个函数或方法。在这种情况下,最简单的办法是让 LogProtocol 设置日志跟踪,并提供一个回调对象来处理出现的行。考虑对上面的 follow 函数做这个小调整:

def follow(fObj, gotLine):
    line = fObj.readline()
    if line:
        gotLine(line)

def printLine(line):
    print line

loop = LoopingCall(follow, open(filename), printLine)
loop.start(0.1)

现在你可以非阻塞地轮询日志文件以获取新行,并且知道什么时候有新行出现。这很容易与 LogProtocol 集成……

class LogProtocol(Protocol):
    def connectionMade(self):
        self.loop = LoopingCall(follow, open(filename), self._sendLogLine)
        self.loop.start()

    def _sendLogLine(self, line):
        self.transport.write(line)

最后一个细节是,当连接丢失时,你可能想停止监视文件:

    def connectionLost(self, reason):
        self.loop.stop()

所以,这个解决方案通过使用 LoopingCall 而不是 time.sleep 来避免阻塞,并在找到新行时通过简单的方法调用将行推送到协议中。

撰写回答