在Python中,从另一个线程通过套接字发送数据不起作用

2 投票
2 回答
5363 浏览
提问于 2025-04-16 01:00

这是我的“游戏服务器”。其实没什么大不了的,我觉得这是学习一些关于Python和网络连接的好方法。

首先,服务器类会初始化服务器。然后,当有人连接上来时,我们会创建一个客户端线程。在这个线程里,我们会不断监听我们的网络连接。

一旦收到某个特定的指令(比如说I12345001001),就会再启动一个新的线程。

这个新线程的目的是向客户端发送更新信息。不过即使我看到服务器在执行这段代码,数据实际上并没有被发送出去。

有没有人能告诉我哪里出错了?感觉好像我必须先接收到一些东西才能发送出去。所以我想我在某个地方漏掉了什么。


#!/usr/bin/env python

"""
An echo server that uses threads to handle multiple clients at a time.
Entering any line of input at the terminal will exit the server.
"""

import select
import socket
import sys
import threading
import time
import Queue

globuser = {}
queue = Queue.Queue()

class Server:
    def __init__(self):
        self.host = ''
        self.port = 2000
        self.backlog = 5
        self.size = 1024
        self.server = None
        self.threads = []

    def open_socket(self):
        try:
            self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            self.server.bind((self.host,self.port))
            self.server.listen(5)
        except socket.error, (value,message):
            if self.server:
                self.server.close()
            print "Could not open socket: " + message
            sys.exit(1)

    def run(self):
        self.open_socket()
        input = [self.server,sys.stdin]
        running = 1
        while running:
            inputready,outputready,exceptready = select.select(input,[],[])

            for s in inputready:

                if s == self.server:
                    # handle the server socket
                    c = Client(self.server.accept(), queue)
                    c.start()
                    self.threads.append(c)

                elif s == sys.stdin:
                    # handle standard input
                    junk = sys.stdin.readline()
                    running = 0

        # close all threads

        self.server.close()
        for c in self.threads:
            c.join()

class Client(threading.Thread):
    initialized=0

    def __init__(self,(client,address), queue):
        threading.Thread.__init__(self)
        self.client = client
        self.address = address
        self.size = 1024
        self.queue = queue
        print 'Client thread created!'


    def run(self):
        running = 10
        isdata2=0
        receivedonce=0

        while running > 0:

            if receivedonce == 0:
                print 'Wait for initialisation message'
                data = self.client.recv(self.size)
                receivedonce = 1

            if self.queue.empty():
                print 'Queue is empty'
            else:
                print 'Queue has information'
                data2 = self.queue.get(1, 1)
                isdata2 = 1
                if data2 == 'Exit':
                    running = 0
                    print 'Client is being closed'
                    self.client.close()

            if data:
                print 'Data received through socket! First char: "' + data[0] + '"'
                if data[0] == 'I':
                    print 'Initializing user'
                    user = {'uid': data[1:6] ,'x': data[6:9], 'y': data[9:12]}
                    globuser[user['uid']] = user
                    print globuser
                    initialized=1
                    self.client.send('Beginning - Initialized'+';')
                    m=updateClient(user['uid'], queue)
                    m.start()
                else:
                    print 'Reset receivedonce'
                    receivedonce = 0

                print 'Sending client data'
                self.client.send('Feedback: ' +data+';')
                print 'Client Data sent: ' + data

            data=None

            if isdata2 == 1:
                print 'Data2 received: ' + data2
                self.client.sendall(data2)
                self.queue.task_done()
                isdata2 = 0

            time.sleep(1)
            running = running - 1

        print 'Client has stopped'


class updateClient(threading.Thread):

    def __init__(self,uid, queue):
        threading.Thread.__init__(self)
        self.uid = uid
        self.queue = queue
        global globuser
        print 'updateClient thread started!'

    def run(self):
        running = 20
        test=0
        while running > 0:
            test = test + 1
            self.queue.put('Test Queue Data #' + str(test))
            running = running - 1
            time.sleep(1)

        print 'Updateclient has stopped'


if __name__ == "__main__":
    s = Server()
    s.run() 

2 个回答

1

你遇到了三个主要问题。第一个问题可能就是你提问的原因。

阻塞(问题描述)

socket.recv 是一个阻塞操作。这意味着程序会停下来,线程会“睡觉”,直到它能从套接字中读取到数据。所以你的第三个更新线程只是把数据填满了队列,但只有在收到消息时,这个队列才会被清空。而且队列每次只会被一条消息清空。

这很可能就是为什么它不会发送数据,除非你先发送数据。

消息协议与流协议

你试图把套接字流当作消息流来使用。我的意思是你有:

 self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

SOCK_STREAM 表示这是一个流,而不是像 SOCK_DGRAM 这样的消息。然而,TCP并不支持消息。所以你需要构建消息,比如:

 data =struct.pack('I', len(msg)) + msg
 socket.sendall(data)

然后接收端会查找长度字段,并将数据读入一个缓冲区。一旦缓冲区里有足够的数据,它就可以提取出整个消息。

你现在的设置之所以能工作,是因为你的消息足够小,可以放在同一个数据包里,并且一起放入套接字缓冲区。但是,一旦你开始通过多次调用 socket.sendsocket.sendall 发送大数据,你就会开始遇到多个消息和部分消息被读取的问题,除非你在套接字字节流上实现一个消息协议。

线程

虽然线程在开始使用时可能更简单,但如果使用不当,它们会带来很多问题,并且可能会降低性能,尤其是在Python中。我很喜欢线程,所以不要误解我。Python还有一个全局解释器锁(GIL)的问题,所以当你使用CPU密集型的线程时,性能会很差。你目前的代码主要是I/O密集型,但将来可能会变成CPU密集型。此外,你还需要担心线程锁定。线程可以是一个快速的解决方案,但可能不是最好的解决方案。在某些情况下,线程确实是打破一些耗时过程的最简单方法。所以不要把线程当作邪恶或糟糕的东西。在Python中,线程被认为是糟糕的,主要是因为GIL,而在其他语言(包括Python)中则是因为并发问题,所以大多数人建议你在Python中使用多进程或异步代码。使用线程与否是一个非常复杂的问题,因为它取决于语言(代码运行的方式)、系统(单处理器或多处理器)和争用(尝试通过锁共享资源)等因素,但一般来说,异步代码更快,因为它利用了更多的CPU,开销更小,特别是当你不是CPU密集型时。

解决方案是使用Python中的 select 模块或类似的东西。它会告诉你何时套接字有数据可以读取,并且你可以设置一个超时参数。

通过进行异步工作(异步套接字),你可以获得更好的性能。要将套接字转换为异步模式,你只需调用 socket.settimeout(0),这样它就不会阻塞。然而,这样你会不断消耗CPU,等待数据。select 模块及其相关工具可以防止你这样做。

通常,为了提高性能,你希望尽可能多地进行异步操作(同一线程),然后再扩展更多的线程,这些线程也尽可能多地进行异步操作。然而,如前所述,Python是这个规则的例外,因为GIL(全局解释器锁)实际上可能会降低性能。如果你感兴趣,可以尝试写一个测试用例来找出答案!

你还应该查看 threading 模块中的线程锁定原语。它们是 LockRLockCondition。它们可以帮助多个线程共享数据而不会出现问题。

lock = threading.Lock()
def myfunction(arg):
    with lock:
        arg.do_something()

一些Python对象是线程安全的,而另一些则不是。

异步发送更新(改进)

与其使用第三个线程来发送更新,不如用客户端线程来发送更新,通过检查当前时间和上次发送更新的时间。这将消除对 QueueThread 的使用。此外,为了做到这一点,你必须将客户端代码转换为异步代码,并在 select 上设置超时,以便你可以定期检查当前时间,看看是否需要更新。

总结

我建议你重写代码,使用异步套接字代码。我还建议你为所有客户端和服务器使用一个线程。这将提高性能并减少延迟。这样调试也会更容易,因为你不会像使用线程那样遇到并发问题。此外,在失败之前,先修复你的消息协议。

3

我不太明白你的逻辑,特别是为什么你要让两个线程同时在同一个套接字上写数据(它们都叫做 self.client),而且没有任何同步或协调,这样的安排肯定会引发问题。

无论如何,你代码中的一个明显错误是你使用了 send 方法。你似乎认为它能保证发送所有的字符串内容,但实际上并不是这样,可以看看文档

返回发送的字节数。应用程序需要负责检查所有数据是否都已发送;如果只有部分数据被传输,应用程序需要尝试发送剩余的数据。

你可能想用的是sendall 方法:

与 send() 不同,这个方法会继续发送字符串中的数据,直到所有数据都发送完或者发生错误。

其他问题还包括 updateClient 似乎设计成永远不会结束(与其他两个线程类不同——当它们结束时,updateClient 实例不会结束,它们会一直运行下去,保持进程存活),还有多余的 global 声明(没什么实质影响,但让人困惑),一些线程在尝试读取一个字典(通过 iteritems 方法),而其他线程在修改它,同样没有任何锁定或协调等等——我相信可能还有更多的错误或问题,但在发现了几个之后,眼睛就开始发花了;-)。

撰写回答