Python Queue.Queue在多线程TCP流处理器中无法工作

2 投票
2 回答
2015 浏览
提问于 2025-04-18 16:50

我正在尝试制作一个可以让线程和主线程沟通的TCP流处理类,但是Queue.Queue并没有按照我想要的那样工作。服务器从另一个程序接收数据,我只是想把这些数据传递到主线程进行处理。以下是我目前的代码:

编辑:根据建议更新了代码,但队列仍然没有任何作用。在我执行self.queue.put(info)之后,连打印命令都无法正常工作,感觉整个程序就像死掉了一样。不过它没有给我任何错误提示,但数据流处理线程在我把信息放入队列后就停止了。

class ThreadedTCPStreamHandler(SocketServer.StreamRequestHandler):

    def __init__(self, request, client_address, server):
         SocketServer.StreamRequestHandler.__init__(self, request, client_address, server)
         self.queue = self.server.queue

    def handle(self):
                while True:
                    try:
                        self.data = self.rfile.readline().strip()
                        cur_thread = threading.current_thread()
                        command = self.data[0:2]
                        if command == "nr":
                            info = self.data[2:]
                            t1 = info.split("|")
                            title = t1[0]
                            self.queue.put(info)

class ThreadedTCPServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer):

        def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True, queue=None):
            self.queue = queue
            SocketServer.TCPServer.__init__(self, server_address, RequestHandlerClass, bind_and_activate=bind_and_activate)

在主线程类中,我有:

q = Queue.Queue()
server = ThreadedTCPServer((HOST, PORT), ThreadedTCPStreamHandler, queue=q)
ip, port = server.server_address

server_thread = threading.Thread(target=server.serve_forever)
server_thread.daemon = True
server_thread.start()
while True:
        try:
            item = q.get()
            print item
            q.task_done()
            q.join()
        except KeyboardInterrupt:
            server.shutdown()
            sys.exit(0)

但是没有任何输出,也无法传递数据。我知道我做错了什么,但就是不知道是什么。如果有人能提供一些帮助,我将非常感激,谢谢。

2 个回答

0

发现我需要对接受的答案做一些调整。这个例子在python2.7中是有效的(至少我测试是这样)。

启动服务器后,你可以通过不断发送数据来进行测试,使用netcat或者类似的工具:

echo "nrtest" | nc -4 127.0.0.1 50514

在Linux上,你也可以使用logger命令发送数据,但这样的话,处理程序需要查找其他字符串(这个例子检查行是否以“nr”开头,正如提问者的要求)。

logger -T -n 127.0.0.1 -P 50514 "Test message"

结果如下 - 为了更易读,我把它分成了几个部分。

导入模块:

"""
Based on https://stackoverflow.com/a/25246157/2045924
"""
import threading
import Queue
import sys
import SocketServer
from SocketServer import StreamRequestHandler
from SocketServer import TCPServer
from SocketServer import ThreadingMixIn

将队列添加到TCPServer:

class ThreadedTCPStreamServer(ThreadingMixIn, TCPServer):
    """ThreadedTCPStreamServer."""
    def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True,
                 queue=None):
        self.queue = queue
        TCPServer.__init__(self, server_address, RequestHandlerClass,
                           bind_and_activate=bind_and_activate)

将队列添加到StreamRequestHandler,告诉“handle”该做什么,让“finish”直接通过。

class ThreadedTCPStreamHandler(SocketServer.StreamRequestHandler):
    """ThreadedTCPStreamHandler."""
    def __init__(self, request, client_address, server):
        self.queue = server.queue
        StreamRequestHandler.__init__(self, request, client_address, server)

    def handle(self):
        while True:
            self.data = self.rfile.readline().strip()
            if not self.data:
                break
            __cur_thread = threading.current_thread()
            self.finish()
            command = self.data[0:2]
            if command == "nr":
                info = self.data[2:]
                __t1 = info.split("|")
                __title = __t1[0]
                self.queue.put(info)
                self.finish()
    def finish(self):
        pass

定义全局变量并启动。

HOST = '127.0.0.1'
PORT = 50514

# QUEUE must be defined _outside_.
QUEUE = Queue.Queue()
QTIMEOUT = 1

# Setup instance of my own ThreadedTCPStreamServer with my own ThreadedTCPStreamHandler.
SERVER = ThreadedTCPStreamServer((HOST, PORT), ThreadedTCPStreamHandler, queue=QUEUE)
__IP, __PORT = SERVER.server_address

# Start the server
SERVER_THREAD = threading.Thread(target=SERVER.serve_forever)
SERVER_THREAD.daemon = True
SERVER_THREAD.start()

while True:
    try:
        #ITEM = QUEUE.get(block=True, timeout=QTIMEOUT)
        ITEM = QUEUE.get()
        print 'qsize({s}): {i}'.format(s=QUEUE.qsize(), i=ITEM)
        QUEUE.task_done()
        QUEUE.join()
    except KeyboardInterrupt:
        SERVER.shutdown()
        sys.exit(0)
    #except Queue.Empty:
    #    print 'Got Queue.Empty after waiting for {t}. Continuing...'.format(t=QTIMEOUT)
    #    continue

现在开始用nc或类似工具发送一些数据。

要结束程序,按<ctrl>+C

KeyboardInterrupt会在你发送数据包时“生效”。

  1. 按 +C
  2. 发送一个额外的数据包
  3. 好了,程序结束
3

为了让这个功能正常工作,流处理器和主线程必须使用同一个 Queue 对象。你现在在不同的地方创建了不同的队列对象。你可以把 queue 作为全局变量使用,如果客户端和服务器的代码在同一个文件里,或者可以这样做:

class ThreadedTCPStreamServer(ThreadingMixin, TCPServer):
    def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True,
                 queue=None):
        self.queue = queue
        TCPServer.__init__(self, server_address, RequestHandlerClass,
                           bind_and_activate=bind_and_activate)

class ThreadedTCPStreamHandler(SocketServer.StreamRequestHandler):
    def __init__(self, request, client_address, server):
        self.queue = server.queue
        StreamRequestHandler.__init__(self, request, client_address, server)

    def handle(self):
        while True:
            self.data = self.rfile.readline().strip()
            if not self.data:
                break
            cur_thread = threading.current_thread()
            command = self.data[0:2]
            if command == "nr":
                info = self.data[2:]
                t1 = info.split("|")
                title = t1[0]
                self.queue.put(info)
                self.finish()


q = Queue.Queue()
server = ThreadedTCPServer((HOST, PORT), ThreadedTCPStreamHandler, queue=q)
ip, port = server.server_address

server_thread = threading.Thread(target=server.serve_forever)
server_thread.daemon = True
server_thread.start()
while True:
        try:
            item = q.get()
            print item
            q.task_done()
            q.join()
        except KeyboardInterrupt:
            server.shutdown()
            sys.exit(0)

撰写回答