从主代码与多个线程(如果有套接字的话)进行通信
我最近在学习线程、套接字和队列,但还是对这些东西的工作原理有些困惑。目前我有一个多线程的服务器,它在某个端口上接受连接,然后为每个连接启动一个新的线程(并随机分配一个系统端口)来处理后续的工作。我想要做的是,从主线程与子线程进行通信,给它们发送需要完成的工作。下面是我服务器的一个简单示例:
import threading, time, socket
class ClientThread(threading.Thread):
def __init__(self,ip,port,clientsocket):
threading.Thread.__init__(self)
self.ip = ip
self.port = port
self.csocket = clientsocket
print ("[+] New thread started for "+ip+":"+str(port))
def run(self):
# Stuff happens here
# keep alive
while True:
#send a keepalive to the network device
data = 'someinfo'
self.csocket.sendall(data)
#receive keepalive
data = self.csocket.recv(56)
time.sleep(8)
while True:
host = "127.0.0.1"
port = 3000
tcpsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
tcpsock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
tcpsock.bind((host,port))
tcpsock.listen(5)
print ("\nListening for incoming connections...")
(clientsock, (ip, port)) = tcpsock.accept()
newthread = ClientThread(ip, port, clientsock)
newthread.start()
这是我服务器的一个非常简单的例子,它可以接受传入的连接,并启动线程来保持这些连接活跃。现在我想做的是引入队列,这样我就可以向某个特定的线程发送“数据”,让它发送到网络设备,然后再从设备那里获取响应。通过这种方式,我可以通过每个线程/套接字与多个设备进行交互,所有操作都在我的主代码中完成。我只是需要一点帮助,找出最好的方法来实现这一点。
如果使用队列,我该如何告诉队列要与哪个线程进行通信呢?
提前谢谢你们!
[编辑 1]
为了举例说明,假设这些设备是简单的设备,用来打开一个继电器。每个设备都会连接到主线程,启动各自的线程,然后只需发送保持连接的响应。随时,我想要中断保持连接,或者在之后发送我的命令,比如打开或关闭继电器。设备会执行命令,然后回应(确认、错误等)。主线程会从子线程接收这个响应,然后提示输入新命令,或者在保持连接的同时等待。因此,我可以坐在终端上,随意打开和关闭继电器。
1 个回答
首先,你需要创建几个线程:一个线程用来处理用户输入,一个线程用来处理设备的连接,还有一个线程用来处理每个已连接的设备。
接下来,你可以为每个 ClientThread
实例创建一个单独的 Queue
,用来接收来自用户输入线程的数据,并维护一个包含所有 Queue
实例的字典。这个字典需要用某种方式来标识设备——在下面的例子中,我使用的是每个连接的 <IP>:<PORT>
作为键。这样,用户输入线程就可以接收用户的请求,并将其发送到正确的线程。最后一步是返回响应。你可以在所有线程之间重用一个 Queue
实例来完成这项工作,前提是你希望在等待设备响应用户输入的请求时保持阻塞状态。
下面是一个未经测试的示例,演示了以上内容:
import threading
import socket
import Queue
class ClientThread(threading.Thread):
def __init__(self,ip,port,clientsocket, in_q, out_q):
threading.Thread.__init__(self)
self.ip = ip
self.port = port
self.csocket = clientsocket
self.in_q = in_q
self.out_q = out_q
self._tid = "{}:{}".format(ip, port)
self.daemon = True # Assuming you want the client thread to shutdown when the main thread does
print ("[+] New thread started for "+ip+":"+str(port))
def run(self):
# Stuff happens here
while True:
try:
# Wait 8 seconds for an incoming command
cmd = self.in_q.get(timeout=8)
self.csocket.sendall(cmd)
data = self.csocket.recv(56)
# Send result to main thread.
self.out_q.put({'tid' : self._tid, 'data' : data})
except Queue.Empty:
# No incoming command after 8 seconds, do a keep-alive instead.
data = 'someinfo'
self.csocket.sendall(data)
data = self.csocket.recv(56)
def handle_socket_connections(resp_queue):
""" Thread for handling connections from devices. """
while True:
host = "127.0.0.1"
port = 3000
tcpsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
tcpsock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
tcpsock.bind((host,port))
tcpsock.listen(5)
print ("\nListening for incoming connections...")
(clientsock, (ip, port)) = tcpsock.accept()
q = Queue.Queue() # Create a Queue for sending commands to this device.
queues["{}:{}".format(ip,port)] = q
newthread = ClientThread(ip, port, clientsock, q, resp_queue)
newthread.start()
queues = {} # dict for managing input queues
resp_queue = Queue.Queue() # Shared response queue
socket_handler = threading.Thread(target=handle_socket_connections, args=(resp_queue,))
socket_handler.daemon = True
socket_handler.start()
# Wait for user input
while True:
# example input format: '1.2.3.4:55 ON'
in_data = raw_input("Enter using the format - <IP>:<Port> <cmd>")
if in_data == "EXIT":
break
ip_port, cmd = in_data.split()
try:
queues[ip_port].put(cmd) # Send command to the appropriate thread.
result = resp_queue.get() # Wait for a response.
print("Got '{data}' from {tid}".format(**result))
except KeyError:
print("No conection on {}".format(ip_port))