Python客户端在没有接收服务器数据时挂起,线程被阻塞无法发送

0 投票
1 回答
3500 浏览
提问于 2025-04-15 19:13

我正在尝试让我的客户端能够“同时”发送和接收数据,所以我使用了线程。我的问题是,根据我设置的方式,客户端在接收数据的函数(recieveFromServer)中会等待服务器的数据,而这个函数是在自己的线程里运行的,当没有数据发送时,它无法停止等待。另一种方式是,它只等待用户输入,然后发送数据到服务器,之后我再调用接收数据的函数,这样就不能实现流畅的通信,而且我也无法让它自动切换。请问我该如何在客户端没有数据要发送,或者服务器没有更多数据可接收时释放这个线程呢?

如果我尝试解释我所做的所有事情,内容会太长了。:)

谢谢。

客户端:

from socket import *
from threading import *
import thread
import time
from struct import pack,unpack
from networklingo import *
#from exception import *

HOST = '192.168.0.105'
PORT = 21567
BUFFSIZE = 1024
ADDR = (HOST,PORT)

lock = thread.allocate_lock()

class TronClient:

    def __init__(self,control=None):
        self.tcpSock = socket(AF_INET,SOCK_STREAM)
        #self.tcpSock.settimeout(.2)
        self.recvBuff = []

    def connect(self):
        self.tcpSock.connect(ADDR)
        self.clientUID = self.tcpSock.recv(BUFFSIZE)
        print 'My clientUID is ', self.clientUID
        t = Thread(target = self.receiveFromSrv())
        t.setDaemon(1)
        t.start()
        print 'going to main loop'
        self.mainLoop()
        #t = Thread(target = self.mainLoop())
        #t.setName('mainLoop')
        #t.setDaemon(1)
        #t.start()

    def receiveFromSrv(self):
        RECIEVING = 1
        while RECIEVING:
            #print 'Attempting to retrieve more data'
            #lock.acquire()
            #print 'Lock Aquired in recieveFromSrv'

            #try:
            data = self.tcpSock.recv(BUFFSIZE)
            #except socket.timeout,e:
                    #print 'Error recieving data, ',e
                    #continue
            #print data
            if not data: continue

            header = data[:6]
            msgType,msgLength,clientID = unpack("hhh",header)
            print msgType
            print msgLength
            print clientID,'\n'

            msg = data[6:]

            while len(msg) < msgLength:
                data = self.tcpSock.recv(BUFFSIZE)
                dataLen = len(data)

                if dataLen <= msgLength:
                    msg += data
                else:
                    remLen = msgLength-len(data) #we just need to retrieve first bit of data to complete msg 
                    msg += data[:remLen]
                    self.recvBuff.append(data[remLen:])

            print msg
            #else:
                #lock.release()
            #    print 'lock release in receiveFromSrv'
                #time.sleep(2)
            #RECIEVING = 0

    def disconnect(self,data=''):
        self.send(DISCONNECT_REQUEST,data)
        #self.tcpSock.close()

    def send(self,msgType,msg):
        header = pack("hhh",msgType,len(msg),self.clientUID)
        msg = header+msg
        self.tcpSock.send(msg)

    def mainLoop(self):
        while 1:
            try:
                #lock.acquire()
                #print 'lock aquired in mainLoop'
                data = raw_input('> ')
            except EOFError:            # enter key hit without any data (blank line) so ignore and continue
                continue                

            #if not data or data == '':  # no valid data so just continue
            #    continue

            if data=='exit':            # client wants to disconnect, so send request to server
                self.disconnect()
                break
            else:
                self.send(TRON_CHAT,data)

            #lock.release()
            #print 'lock released in main loop'
            #self.recieveFromSrv()
            #data = self.tcpSock.recv(BUFFSIZE)
            #t = Thread(target = self.receiveFromSrv())
            #t.setDaemon(1)
            #t.start()



if __name__ == "__main__":
    cli = TronClient()
    cli.connect()
    #t = Thread(target = cli.connect())
    #t.setName('connect')
    #t.setDaemon(1)
    #t.start()

服务器(在增加或减少客户端数量时使用锁):

from socket import *
from threading import *
import thread
from controller import *
from networklingo import *
from struct import pack,unpack

HOST = ''
PORT = 21567
BUFSIZE = 1024
ADDR = (HOST,PORT)

nclntlock = thread.allocate_lock()

class TronServer:

    def __init__(self,maxConnect=4,control=None):
        self.servSock = socket(AF_INET,SOCK_STREAM)

        # ensure that you can restart server quickly when it terminates
        self.servSock.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)

        self.servSock.bind(ADDR)
        self.servSock.listen(maxConnect)

        # keep track of number of connected clients
        self.clientsConnected = 0

        # give each client a unique identfier for this run of server
        self.clientUID = 0

        # list of all clients to cycle through for sending
        self.allClients = {}

        # keep track of threads
        self.cliThreads = {}

        #reference back to controller
        self.controller = control

        self.recvBuff = []

    def removeClient(self,clientID,addr):
        if clientID in self.allClients.keys():
            self.allClients[clientID].close()
            print "Disconnected from", addr
            nclntlock.acquire()
            self.clientsConnected -= 1
            nclntlock.release()
            del self.allClients[clientID]
        else:
            print 'ClientID is not valid'

    def recieve(self,clientsock,addr):
        RECIEVING = 1

        # loop serving the new client
        while RECIEVING: # while PLAYING???
            try:
                data = clientsock.recv(BUFSIZE)
            except:
                RECIEVING = 0
                continue

#            if not data: break  #no data was recieved

            if data != '':
                print 'Recieved msg from client: ',data

                header = data[:6]
                msgType,msgLength,clientID = unpack("hhh",header)
                print msgType
                print msgLength
                print clientID,'\n'

                if msgType == DISCONNECT_REQUEST:               #handle disconnect request
                    self.removeClient(clientID,addr)
                else:                                           #pass message type and message off to controller

                    msg = data[6:]

                    while len(msg) < msgLength:
                        data = self.tcpSock.recv(BUFSIZE)
                        dataLen = len(data)

                        if dataLen <= msgLength:
                            msg += data
                        else:
                            remLen = msgLength-len(data) #we just need to retrieve first bit of data to complete msg 
                            msg += data[:remLen]
                            self.recvBuff.append(data[remLen:])

                    print msg       
            # echo back the same data you just recieved
            #clientsock.sendall(data)
                    self.send(TRON_CHAT,msg,-1) #send to client 0


        for k in self.allClients.keys():
            if self.allClients[k] == clientsock:
                self.removeClient(k,addr)
                print 'deleted after hard exit from clientID ', k
                #self.cliThreads[k].join()
                #del self.cliThreads[k]
                # then tell controller to delete player with k
                break

    def send(self,msgType,msg,clientID=-1):
        header = pack("hhh",msgType,len(msg),clientID)
        msg = header+msg

        if clientID in self.allClients:
            self.allClients[clientID].send(msg)
        elif clientID==ALL_PLAYERS:
            for k in self.allClients.keys():
                self.allClients[k].send(msg)


    def mainLoop(self):
        global nclntlock

        try:
            while self.controller != None and self.controller.state == WAITING:
                print 'awaiting connections'
                clientsock, caddy = self.servSock.accept()

                nclntlock.acquire()                         
                self.clientsConnected += 1
                nclntlock.release()
                print 'Client ',self.clientUID,' connected from:',caddy
                clientsock.setblocking(0)
                clientsock.send(str(self.clientUID))
                self.allClients[self.clientUID] = clientsock
                t = Thread(target = self.recieve, args = [clientsock,caddy])
                t.setName('recieve-' + str(self.clientUID))
                self.cliThreads[self.clientUID] = t
                self.clientUID += 1
                # t.setDaemon(1)
                t.start()
        finally:
            self.servSock.close()

if __name__ == "__main__":
    serv = TronServer(control = LocalController(nPlayers = 3, fWidth = 70, fHeight = 10))
    t = Thread(target = serv.mainLoop())
    t.setName('mainLoop')
#   t.setDaemon(1)
    t.start()

1 个回答

2

我觉得你可以试着把套接字设置为非阻塞模式:

http://docs.python.org/library/socket.html#socket.socket.setblocking

设置套接字的阻塞或非阻塞模式:如果标志是0,套接字就会被设置为非阻塞模式;否则就是阻塞模式。最开始所有的套接字都是在阻塞模式下。在非阻塞模式下,如果调用recv()时没有找到任何数据,或者调用send()时无法立即处理数据,就会抛出一个错误;而在阻塞模式下,调用会一直等待,直到可以继续进行。s.setblocking(0)相当于s.settimeout(0);s.setblocking(1)相当于s.settimeout(None)。

另外,除了使用原始套接字,你有没有考虑过使用multiprocessing模块?这个模块提供了一个更高级的方式来进行网络输入输出。关于管道和队列的部分,专门讲的是如何在客户端和服务器之间发送和接收数据。

撰写回答