Python多线程套接字侦听器错误,线程未释放

2024-05-16 00:14:33 发布

您现在位置:Python中文网/ 问答频道 /正文

世界上有500多个设备连接到我的服务器并转储数据。到目前为止,我一直在使用PHP脚本作为socket侦听器,但是我需要多线程,因为负载不断增加,PHP无法跟上。我对Python还很陌生,决定把它用于我的新平台,在过去的几天里,我努力尝试了许多例子,但都没有成功。在我的搜索中,我遇到了一些试图回答这个问题的问题,但没有一个。我会附上我的代码。在

问题:当单元连接到服务器并且服务器接受它时,服务器会创建一个新线程来处理连接,当单元断开连接时,问题就来了,线程保持打开和活动状态,线程总数增加了,这与系统限制“打开的文件数”有关,我可以增加这个限制但这只会使它成为定时炸弹,它不能解决这个问题。在

请帮忙。在

#! /usr/bin/python

import multiprocessing
import socket
import sys
import pprint
import datetime
import MySQLdb
import time
import datetime
import re
import select
import resource
import threading

max_connections = 1024
max_connections_set = max_connections
resource.setrlimit(resource.RLIMIT_NOFILE, (max_connections_set, max_connections_set))



#the incomming port
the_global_port = xxxx #(any port)

#display id
the_global_id = "UNIT TYPE"




class ConnectionObject(object):
  the_id = None

  the_socket = None
  the_socket_address = None
  the_socket_address_ip = None
  the_socket_address_port = None

  the_server = None

  the_process = None
  the_process_id = None
  the_process_name = None

  the_imei = None
  identifier = ""

  # The class "constructor" - It's actually an initializer 
  def __init__(self, in_process_nr, in_process , in_socket , in_socket_address, in_server):
    self.the_id = in_process_nr
    self.the_process = in_process
    self.the_process_id = self.the_process.exitcode
    self.the_process_name = self.the_process.name


    self.the_socket = in_socket
    self.the_socket_address = in_socket_address
    self.the_socket_address_ip = self.the_socket_address[0]
    self.the_socket_address_port = self.the_socket_address[1]


    self.the_server = in_server

    self.identifier = str(self.the_id) + " " + str(self.the_process_name) + " " + str(self.the_socket_address_ip) + " " + str(self.the_socket_address_port) + " "
  #end def init
#end def class  

def processData(the_connection_object , the_data):
  def mysql_open_connection_inside():
    try:
      the_conn = MySQLdb.connect(host= "127.0.0.1",
                          user="user",
                          passwd="password",
                          db="mydb")
    except MySQLdb.Error, e:
      print "Error %d: %s" % (e.args[0], e.args[1])
      time.sleep(30)
      try:
        the_conn = MySQLdb.connect(host= "127.0.0.1",
                            user="user",
                            passwd="password",
                            db="mydb")
      except MySQLdb.Error, e:
        print "Error %d: %s" % (e.args[0], e.args[1])
        print "Unexpected error:", sys.exc_info()[0]
        raise
        sys.exit(0) 
      #end time 2  
    #end try except

    return the_conn
  #end def mysql_open_connection                    



  conn = mysql_open_connection_inside()
  x = conn.cursor()

  add_rawdata = ("INSERT INTO RawData"
                 "(ID,RawData,Type) "
                 "VALUES (%s, %s, %s)")

  data_raw = ('123456', 'ABCD','')



  records_inserted = 0


  the_connection_object.the_imei = ""


  #global clients
  imei = ""
  try:
    thedata = ""
    thedata = " ".join("{:02x}".format(ord(c)) for c in the_data)

    record_to_save = ' '.join(thedata)


    seqtoreply = ""
    seqtoreply = "OK"

    #reply part
    if (seqtoreply != ""): #reply to unit
      try:
        the_connection_object.the_socket.sendall(seqtoreply)
        #echoout(the_connection_object.identifier+"We Replyed With : " + seqtoreply)
      except:
        echoout(the_connection_object.identifier+"Send Reply Error : " + str(sys.exc_info()[1]))
      #end except
    #end of if 


    the_id = "some generated id"
    data_raw  = (the_id, werk_data, 'UNIT')
    try:
      x.execute(add_rawdata, data_raw)
      conn.commit()
      echoout(the_connection_object.identifier+"Raw Data Saved.")
    except:
      conn.rollback()
      echoout(the_connection_object.identifier+" Raw Data NOT Saved : " + str(sys.exc_info()[1]))
    #end of data save insert  
    #echoout("=============================")
    endme = 1
    echoout("")
    conn.close()
  #end try
  except:
    conn.close()
    echoout(the_connection_object.identifier+"Error : " + str(sys.exc_info()[1]))
  #end  try except
#end  def handel function



def handle_p(processnr, server, connection, address):
  this_connection = ConnectionObject(processnr,multiprocessing.current_process(), connection, address, server)
  thisprocess = multiprocessing.current_process()
  this_connection.the_id = ""
  the_address = this_connection.the_socket_address_ip
  the_port = this_connection.the_socket_address_port

  try:
    echoout("New connection from : "+str(the_address)+" on port "+str(the_port))
    close_the_socket = False
    while True:
      #--------------------- recive part -------------------------------------------------
      data = connection.recv(512)
      thedata = ""
      thedata = " ".join("{:02x}".format(ord(c)) for c in data)
      if ((thedata == "") or (thedata == " ") or (data == False)):
        echoout("Socket Closed Remotely : No Data")
        close_the_socket = True
        break
      #end - if data blank
      else :
        processData(this_connection, data)
      #end there is data  
      echoout("=============================")
    #end if while true
  #end try
  except:
    print "handling request, Error : " + str(sys.exc_info()[1])
    close_the_socket = True
    connection.close()
  finally:
    close_the_socket = True
    echoout("Closing socket")
    connection.close()
  #end  try finally
#end  def handel function

def mysql_update(update_statement, update_values):
  conn_update = MySQLdb.connect(host= "127.0.0.1",
                    user="user",
                    passwd="password",
                    db="mydb")
  x_update = conn_update.cursor(MySQLdb.cursors.DictCursor)

  rawdata_data = (update_statement)
  data_rawdata = (update_values)
  allupdateok = False
  #end if there is more  
  try:
    x_update.execute(rawdata_data, data_rawdata)
    conn_update.commit()
    allupdateok = True
    conn_update.close()
  except:
    conn_update.rollback()
    allupdateok = False
    conn_update.close()
    print "Unexpected error:", sys.exc_info()[0]
    raise
  #end of data save insert  
  if (allupdateok == False):
    echoout("Update Raw Data Table Error")
  #end if update
  return allupdateok
#end def mysqlupdate



def echoout(thestring):
  datestring = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
  if (thestring != ""):
    outstring = datestring + " " + thestring
    print outstring
  else :
    outstring = thestring
    print outstring
#end - def echoout




class Server(object):
  threads = []
  all_threads = []
  high_proc = ""
  def __init__(self, hostname, port):
    self.hostname = hostname
    self.port = port

  def start(self):
    echoout("Listening for conncetions")
    self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    self.socket.bind((self.hostname, self.port))
    self.socket.listen(10)
    process_number = 1

    inputs = [self.socket]
    while True:
      inready, outready, excready = select.select(inputs, [], [], 30);
      for s in inready:
        if s == self.socket:
          conn, address = self.socket.accept()
          high_processname = ""
          echoout("Got a connection...")
          process = threading.Thread(target=handle_p, args=(process_number,self, conn, address))
          high_processname = process.name
          self.high_proc = high_processname
          process.daemon = True
          process.start()
          self.all_threads.append((process,conn))
          ts = time.time()
          st = datetime.datetime.fromtimestamp(ts).strftime('%Y-%m-%d %H:%M:%S')
          self.threads.append((process_number,conn,st,0,process))
          process_number = process_number + 1

          print "ACTIVE Threads = " + str(threading.activeCount())

          the_total_count = 0 
          dead_count = 0
          alive_count = 0
          for the_thread in self.all_threads :
            if (the_thread[0].is_alive() == True):
              alive_count = alive_count + 1
            else :
              dead_count = dead_count + 1
              the_thread[1].close()
              the_thread[0].join(0.3)

              self.all_threads.pop(the_total_count)
            #end if alive else 
            the_total_count = the_total_count + 1
          #end for threads  

          print "LIVE Threads = " + str(alive_count)
          print "DEAD Threads = " + str(dead_count)
          print "TOTAL Threads = " + str(the_total_count)
          print ""


        #end if s = socke, new connection
      #end for loop
    #end while truw
    self.socket.close()
  #end def start

#main process part
if __name__ == "__main__":

  start_ip = "0.0.0.0"
  start_port = the_global_port

  #start server
  server = Server(start_ip, start_port)
  try:
    print "Listening on " , start_port
    server.start()
  except:
    print "unexpected, Error : " + str(sys.exc_info()[1])
  finally:
    print "shutting down"

    active_clients = 0
    for process in multiprocessing.active_children():
      try:
        active_clients = active_clients + 1
        process.terminate()
        #process.join()
      except:
        print "Process not killed = " + str(sys.exc_info()[1])
      #end try except 
    #close mysql connection
    print "Active clients  = " + str(active_clients)
  #end try finally
  server.socket.close()
  server.threads = []
  server = None
  print "All done."
#end def main

Tags: theselfdataportaddressdefsocketconnection
1条回答
网友
1楼 · 发布于 2024-05-16 00:14:33

首先,当你有500多个连接的客户机时使用线程是愚蠢的,你应该使用异步方式——例如看gevent 一个非常好的库,或者至少使用select(参见Python文档)。在

那么,关闭handle_p中的套接字的代码看起来不错,实际上 recv()调用返回一个空字符串,它表示远程 end断开,所以您断开了while,很好。在

但是,看起来遥控器关闭了连接,但事实并非如此 在您这边检测到(recv()不返回)。最好是 然后有一种心跳来知道什么时候可以关闭 连接。在

相关问题 更多 >