如何在与主程序不同的线程中编写socket服务器(使用gevent)?
我正在开发一个Flask/gevent的WSGI服务器,这个服务器需要在后台通过两个套接字与一个硬件设备进行通信,使用的是XML格式。
其中一个套接字是由客户端(也就是我的应用程序)发起的,我可以向设备发送XML指令。设备会在另一个端口上回复,并发送一些信息,我的应用程序需要确认这些信息。所以我的应用程序必须监听这个第二个端口。
到目前为止,我的做法是发出一个指令,打开第二个端口作为服务器,等待设备的回复,然后关闭这个端口。
问题是,设备可能会发送多个需要我确认的回复。所以我想的解决办法是保持端口一直打开,继续响应传入的请求。然而,最终设备发送完请求后,我的应用程序仍在监听(我不知道设备什么时候发送完),这就导致了其他操作都被阻塞。
这让我觉得使用线程是个不错的主意,这样我的应用程序就可以在一个单独的线程中启动一个监听服务器。因为我已经在使用gevent作为Flask的WSGI服务器,所以可以利用greenlets。
但问题是,我找不到一个好的例子来说明如何实现这个功能,网上的例子大多是处理单个套接字服务器的多线程处理。我并不需要在套接字服务器上处理很多连接,但我需要在一个单独的线程中启动它,这样它可以监听和处理传入的消息,而我的主程序可以继续发送消息。
我遇到的第二个问题是,在服务器中,我需要使用我“主”类中的一些方法。作为一个相对新手的Python用户,我不太确定该如何组织代码才能实现这一点。
class Device(object):
def __init__(self, ...):
self.clientsocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
def _connect_to_device(self):
print "OPEN CONNECTION TO DEVICE"
try:
self.clientsocket.connect((self.ip, 5100))
except socket.error as e:
pass
def _disconnect_from_device(self):
print "CLOSE CONNECTION TO DEVICE"
self.clientsocket.close()
def deviceaction1(self, ...):
# the data that is sent is an XML document that depends on the parameters of this method.
self._connect_to_device()
self._send_data(XMLdoc)
self._wait_for_response()
return True
def _send_data(self, data):
print "SEND:"
print(data)
self.clientsocket.send(data)
def _wait_for_response(self):
print "WAITING FOR REQUESTS FROM DEVICE (CHANNEL 1)"
self.serversocket.bind(('10.0.0.16', 5102))
self.serversocket.listen(5) # listen for answer, maximum 5 connections
connection, address = self.serversocket.accept()
# the data is of a specific length I can calculate
if len(data) > 0:
self._process_response(data)
self.serversocket.close()
def _process_response(self, data):
print "RECEIVED:"
print(data)
# here is some code that processes the incoming data and
# responds to the device
# this may or may not result in more incoming data
if __name__ == '__main__':
machine = Device(ip="10.0.0.240")
Device.deviceaction1(...)
这是我现在的做法(大致上,我省略了敏感信息)。可以看到,所有操作都是顺序进行的。如果有人能提供一个在单独线程中监听的服务器的例子(最好是使用greenlets),以及一种从监听服务器与主线程进行通信的方法,那将非常有帮助。
谢谢。
编辑: 在尝试了几种方法后,我决定使用Python的默认select()方法来解决这个问题。这种方法有效,所以我关于使用线程的问题就不再相关了。感谢那些提供建议的人,感谢你们的时间和努力。
1 个回答
希望这能帮到你。在这个例子中,如果我们调用 tenMessageSender
函数,它会启动一个异步线程,这样主程序就不会被阻塞。同时,_zmqBasedListener
会在一个单独的端口上开始监听,直到这个线程还在运行。无论我们的 tenMessageSender
函数发送什么消息,客户端都会接收到这些消息,并且会回复给 zmqBasedListener
。
服务器端
import threading
import zmq
import sys
class Example:
def __init__(self):
self.context = zmq.Context()
self.publisher = self.context.socket(zmq.PUB)
self.publisher.bind('tcp://127.0.0.1:9997')
self.subscriber = self.context.socket(zmq.SUB)
self.thread = threading.Thread(target=self._zmqBasedListener)
def _zmqBasedListener(self):
self.subscriber.connect('tcp://127.0.0.1:9998')
self.subscriber.setsockopt(zmq.SUBSCRIBE, "some_key")
while True:
message = self.subscriber.recv()
print message
sys.exit()
def tenMessageSender(self):
self._decideListener()
for message in range(10):
self.publisher.send("testid : %d: I am a task" %message)
def _decideListener(self):
if not self.thread.is_alive():
print "STARTING THREAD"
self.thread.start()
客户端
import zmq
context = zmq.Context()
subscriber = context.socket(zmq.SUB)
subscriber.connect('tcp://127.0.0.1:9997')
publisher = context.socket(zmq.PUB)
publisher.bind('tcp://127.0.0.1:9998')
subscriber.setsockopt(zmq.SUBSCRIBE, "testid")
count = 0
print "Listener"
while True:
message = subscriber.recv()
print message
publisher.send('some_key : Message received %d' %count)
count+=1
你也可以用 greenlet 等其他方式来代替线程。