从主代码与多个线程(如果有套接字的话)进行通信

1 投票
1 回答
1561 浏览
提问于 2025-04-28 20:59

我最近在学习线程、套接字和队列,但还是对这些东西的工作原理有些困惑。目前我有一个多线程的服务器,它在某个端口上接受连接,然后为每个连接启动一个新的线程(并随机分配一个系统端口)来处理后续的工作。我想要做的是,从主线程与子线程进行通信,给它们发送需要完成的工作。下面是我服务器的一个简单示例:

import threading, time, socket

class ClientThread(threading.Thread):

    def __init__(self,ip,port,clientsocket):
        threading.Thread.__init__(self)
        self.ip = ip
        self.port = port
        self.csocket = clientsocket
        print ("[+] New thread started for "+ip+":"+str(port))

    def run(self):
        # Stuff happens here
        # keep alive
        while True:
            #send a keepalive to the network device
            data = 'someinfo'
            self.csocket.sendall(data)
            #receive keepalive
            data = self.csocket.recv(56)
            time.sleep(8)

while True:
    host = "127.0.0.1"
    port = 3000
    tcpsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    tcpsock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    tcpsock.bind((host,port))   
    tcpsock.listen(5)
    print ("\nListening for incoming connections...")
    (clientsock, (ip, port)) = tcpsock.accept()
    newthread = ClientThread(ip, port, clientsock)
    newthread.start()

这是我服务器的一个非常简单的例子,它可以接受传入的连接,并启动线程来保持这些连接活跃。现在我想做的是引入队列,这样我就可以向某个特定的线程发送“数据”,让它发送到网络设备,然后再从设备那里获取响应。通过这种方式,我可以通过每个线程/套接字与多个设备进行交互,所有操作都在我的主代码中完成。我只是需要一点帮助,找出最好的方法来实现这一点。

如果使用队列,我该如何告诉队列要与哪个线程进行通信呢?

提前谢谢你们!

[编辑 1]

为了举例说明,假设这些设备是简单的设备,用来打开一个继电器。每个设备都会连接到主线程,启动各自的线程,然后只需发送保持连接的响应。随时,我想要中断保持连接,或者在之后发送我的命令,比如打开或关闭继电器。设备会执行命令,然后回应(确认、错误等)。主线程会从子线程接收这个响应,然后提示输入新命令,或者在保持连接的同时等待。因此,我可以坐在终端上,随意打开和关闭继电器。

暂无标签

1 个回答

2

首先,你需要创建几个线程:一个线程用来处理用户输入,一个线程用来处理设备的连接,还有一个线程用来处理每个已连接的设备。

接下来,你可以为每个 ClientThread 实例创建一个单独的 Queue,用来接收来自用户输入线程的数据,并维护一个包含所有 Queue 实例的字典。这个字典需要用某种方式来标识设备——在下面的例子中,我使用的是每个连接的 <IP>:<PORT> 作为键。这样,用户输入线程就可以接收用户的请求,并将其发送到正确的线程。最后一步是返回响应。你可以在所有线程之间重用一个 Queue 实例来完成这项工作,前提是你希望在等待设备响应用户输入的请求时保持阻塞状态。

下面是一个未经测试的示例,演示了以上内容:

import threading
import socket
import Queue

class ClientThread(threading.Thread):

    def __init__(self,ip,port,clientsocket, in_q, out_q):
        threading.Thread.__init__(self)
        self.ip = ip
        self.port = port
        self.csocket = clientsocket
        self.in_q = in_q
        self.out_q = out_q
        self._tid = "{}:{}".format(ip, port)
        self.daemon = True # Assuming you want the client thread to shutdown when the main thread does
        print ("[+] New thread started for "+ip+":"+str(port))

    def run(self):
        # Stuff happens here
        while True:
            try:
                # Wait 8 seconds for an incoming command
                cmd = self.in_q.get(timeout=8)
                self.csocket.sendall(cmd)
                data = self.csocket.recv(56)
                # Send result to main thread.
                self.out_q.put({'tid' : self._tid, 'data' : data})
            except Queue.Empty:
                # No incoming command after 8 seconds, do a keep-alive instead.
                data = 'someinfo'
                self.csocket.sendall(data)
                data = self.csocket.recv(56)

def handle_socket_connections(resp_queue):
    """ Thread for handling connections from devices. """
    while True:
        host = "127.0.0.1"
        port = 3000
        tcpsock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        tcpsock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        tcpsock.bind((host,port))   
        tcpsock.listen(5)
        print ("\nListening for incoming connections...")
        (clientsock, (ip, port)) = tcpsock.accept()
        q = Queue.Queue() # Create a Queue for sending commands to this device.
        queues["{}:{}".format(ip,port)] = q
        newthread = ClientThread(ip, port, clientsock, q, resp_queue)
        newthread.start()


queues = {} # dict for managing input queues
resp_queue = Queue.Queue()  # Shared response queue
socket_handler = threading.Thread(target=handle_socket_connections, args=(resp_queue,))
socket_handler.daemon = True
socket_handler.start()

# Wait for user input
while True:
    # example input format: '1.2.3.4:55 ON'
    in_data = raw_input("Enter using the format - <IP>:<Port> <cmd>")
    if in_data == "EXIT":
        break
    ip_port, cmd = in_data.split()
    try:
        queues[ip_port].put(cmd)  # Send command to the appropriate thread.
        result = resp_queue.get()  # Wait for a response.
        print("Got '{data}' from {tid}".format(**result))
    except KeyError:
        print("No conection on {}".format(ip_port))

撰写回答