Python Queue.Queue在多线程TCP流处理器中无法工作
我正在尝试制作一个可以让线程和主线程沟通的TCP流处理类,但是Queue.Queue并没有按照我想要的那样工作。服务器从另一个程序接收数据,我只是想把这些数据传递到主线程进行处理。以下是我目前的代码:
编辑:根据建议更新了代码,但队列仍然没有任何作用。在我执行self.queue.put(info)之后,连打印命令都无法正常工作,感觉整个程序就像死掉了一样。不过它没有给我任何错误提示,但数据流处理线程在我把信息放入队列后就停止了。
class ThreadedTCPStreamHandler(SocketServer.StreamRequestHandler):
def __init__(self, request, client_address, server):
SocketServer.StreamRequestHandler.__init__(self, request, client_address, server)
self.queue = self.server.queue
def handle(self):
while True:
try:
self.data = self.rfile.readline().strip()
cur_thread = threading.current_thread()
command = self.data[0:2]
if command == "nr":
info = self.data[2:]
t1 = info.split("|")
title = t1[0]
self.queue.put(info)
class ThreadedTCPServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True, queue=None):
self.queue = queue
SocketServer.TCPServer.__init__(self, server_address, RequestHandlerClass, bind_and_activate=bind_and_activate)
在主线程类中,我有:
q = Queue.Queue()
server = ThreadedTCPServer((HOST, PORT), ThreadedTCPStreamHandler, queue=q)
ip, port = server.server_address
server_thread = threading.Thread(target=server.serve_forever)
server_thread.daemon = True
server_thread.start()
while True:
try:
item = q.get()
print item
q.task_done()
q.join()
except KeyboardInterrupt:
server.shutdown()
sys.exit(0)
但是没有任何输出,也无法传递数据。我知道我做错了什么,但就是不知道是什么。如果有人能提供一些帮助,我将非常感激,谢谢。
2 个回答
0
发现我需要对接受的答案做一些调整。这个例子在python2.7中是有效的(至少我测试是这样)。
启动服务器后,你可以通过不断发送数据来进行测试,使用netcat或者类似的工具:
echo "nrtest" | nc -4 127.0.0.1 50514
在Linux上,你也可以使用logger
命令发送数据,但这样的话,处理程序需要查找其他字符串(这个例子检查行是否以“nr”开头,正如提问者的要求)。
logger -T -n 127.0.0.1 -P 50514 "Test message"
结果如下 - 为了更易读,我把它分成了几个部分。
导入模块:
"""
Based on https://stackoverflow.com/a/25246157/2045924
"""
import threading
import Queue
import sys
import SocketServer
from SocketServer import StreamRequestHandler
from SocketServer import TCPServer
from SocketServer import ThreadingMixIn
将队列添加到TCPServer:
class ThreadedTCPStreamServer(ThreadingMixIn, TCPServer):
"""ThreadedTCPStreamServer."""
def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True,
queue=None):
self.queue = queue
TCPServer.__init__(self, server_address, RequestHandlerClass,
bind_and_activate=bind_and_activate)
将队列添加到StreamRequestHandler,告诉“handle”该做什么,让“finish”直接通过。
class ThreadedTCPStreamHandler(SocketServer.StreamRequestHandler):
"""ThreadedTCPStreamHandler."""
def __init__(self, request, client_address, server):
self.queue = server.queue
StreamRequestHandler.__init__(self, request, client_address, server)
def handle(self):
while True:
self.data = self.rfile.readline().strip()
if not self.data:
break
__cur_thread = threading.current_thread()
self.finish()
command = self.data[0:2]
if command == "nr":
info = self.data[2:]
__t1 = info.split("|")
__title = __t1[0]
self.queue.put(info)
self.finish()
def finish(self):
pass
定义全局变量并启动。
HOST = '127.0.0.1'
PORT = 50514
# QUEUE must be defined _outside_.
QUEUE = Queue.Queue()
QTIMEOUT = 1
# Setup instance of my own ThreadedTCPStreamServer with my own ThreadedTCPStreamHandler.
SERVER = ThreadedTCPStreamServer((HOST, PORT), ThreadedTCPStreamHandler, queue=QUEUE)
__IP, __PORT = SERVER.server_address
# Start the server
SERVER_THREAD = threading.Thread(target=SERVER.serve_forever)
SERVER_THREAD.daemon = True
SERVER_THREAD.start()
while True:
try:
#ITEM = QUEUE.get(block=True, timeout=QTIMEOUT)
ITEM = QUEUE.get()
print 'qsize({s}): {i}'.format(s=QUEUE.qsize(), i=ITEM)
QUEUE.task_done()
QUEUE.join()
except KeyboardInterrupt:
SERVER.shutdown()
sys.exit(0)
#except Queue.Empty:
# print 'Got Queue.Empty after waiting for {t}. Continuing...'.format(t=QTIMEOUT)
# continue
现在开始用nc
或类似工具发送一些数据。
要结束程序,按<ctrl>+C
。
KeyboardInterrupt
会在你发送数据包时“生效”。
- 按 +C
- 发送一个额外的数据包
- 好了,程序结束
3
为了让这个功能正常工作,流处理器和主线程必须使用同一个 Queue
对象。你现在在不同的地方创建了不同的队列对象。你可以把 queue
作为全局变量使用,如果客户端和服务器的代码在同一个文件里,或者可以这样做:
class ThreadedTCPStreamServer(ThreadingMixin, TCPServer):
def __init__(self, server_address, RequestHandlerClass, bind_and_activate=True,
queue=None):
self.queue = queue
TCPServer.__init__(self, server_address, RequestHandlerClass,
bind_and_activate=bind_and_activate)
class ThreadedTCPStreamHandler(SocketServer.StreamRequestHandler):
def __init__(self, request, client_address, server):
self.queue = server.queue
StreamRequestHandler.__init__(self, request, client_address, server)
def handle(self):
while True:
self.data = self.rfile.readline().strip()
if not self.data:
break
cur_thread = threading.current_thread()
command = self.data[0:2]
if command == "nr":
info = self.data[2:]
t1 = info.split("|")
title = t1[0]
self.queue.put(info)
self.finish()
q = Queue.Queue()
server = ThreadedTCPServer((HOST, PORT), ThreadedTCPStreamHandler, queue=q)
ip, port = server.server_address
server_thread = threading.Thread(target=server.serve_forever)
server_thread.daemon = True
server_thread.start()
while True:
try:
item = q.get()
print item
q.task_done()
q.join()
except KeyboardInterrupt:
server.shutdown()
sys.exit(0)