Python HTTP服务器:崩溃

2024-05-15 22:38:13 发布

您现在位置:Python中文网/ 问答频道 /正文

我有python http服务器。此http服务器侦听传入的post请求(100-150个同时连接)。 我的python服务器在工作一天后崩溃并关闭。在

这是我的代码:

import http.server
import socketserver
import queue
import socket
import gzip
from urllib.parse import parse_qs
import json
import time
import _thread as thread
import threading
import globals
from logs import LOG
import conf
from os import getcwd
from DBConnection import DB

mutex = None
MAX_THREADS = 25


class Handler(http.server.BaseHTTPRequestHandler):
    def setup(self):
        http.server.BaseHTTPRequestHandler.setup(self)
        self.request.settimeout(30)

    #def do_HEAD(self):
    #    self.send_response(200)
    #    self.send_header("Content-type", "text/html")
    #   self.end_headers()

    def do_POST(self):

        global mutex
        data = {'result': 'error'}
        length = int(self.headers['Content-Length'])
        print('length:%s' % length)
        content = self.headers['Content-Encoding']
        print('Content-Encoding:%s' % content)

        try:
            post_data = self.rfile.read(length)
        except socket.error as e:
            print(e)

        with mutex:
            print('active threads:%d' % threading.active_count())
            print(threading.current_thread().getName())

        #print(post_data)
        if content == "gzip":
            try:
                post_data = gzip.decompress(post_data)
            except Exception as e:
                LOG.log('gzip error:%s, data:%s' % (e, post_data))
                data = json.dumps(data)
                self.send_response(200)
                self.send_header("Content-type", "application/json")
                self.end_headers()
                self.wfile.write(data.encode())
                return

        post_data = parse_qs(post_data.decode('utf-8'))
        #parse data
        records = []
        gpselement = {}
        imei = ''
        try:
            if content == "gzip":
                N = len(post_data.get('lat[]'))
            else:
                N = 1
            for i in range(N):
                for key in post_data.keys():
                    if key in ['m', 'imei']:
                        continue
                    #if key == 'gpstime[]':
                    gpselement[key.replace('[]', '')] = post_data.get(key)[i]

                io = {}
                io[1] = gpselement['status']
                gpselement['metrage'] = 0
                gpselement['voltage'] = 0
                gpselement['io'] = io

                records.append(gpselement)
            imei = post_data.get('imei')[0]
        except Exception as e:
            print(e)
            LOG.log('error:%s' % e)

        with mutex:
            #    print(records)
            s='imei: %s, len:%d , coords:%s' % (imei, len(records), str(records))
            #LOG.log(s)
            print(s)

        db = DB()
        #search imei
        with mutex:
            deviceData = db.getDevice(imei)
            #deviceData = None
        #insert gps data to base
        if deviceData:
            with mutex:
                if db.addDataRecords(records, deviceData):
                    data['result'] = 'success'
        else:
            LOG.log('device not found, imei:%s' % imei)

        #send response
        try:
            data = json.dumps(data)
            self.send_response(200)
            self.send_header("Content-type", "application/json")
            self.end_headers()
            self.wfile.write(data.encode())
            print('%s closed.' % threading.current_thread().getName())
        except Exception as e:
            LOG.log('http response error:%s' % e)


class ThreadPoolMixIn(socketserver.ThreadingMixIn):
    '''
    use a thread pool instead of a new thread on every request
    '''

    numThreads = MAX_THREADS
    allow_reuse_address = True  # seems to fix socket.error on server restart

    def serve_forever(self):
        '''
        Handle one request at a time until doomsday.
        '''
        # set up the threadpool
        self.requests = queue.Queue(self.numThreads)
        for x in range(self.numThreads):
            t = threading.Thread(target=self.process_request_thread)
            t.setDaemon(True)
            t.start()

        # server main loop
        while True:
            self.handle_request()

        self.server_close()

    def process_request_thread(self):
        '''
        obtain request from queue instead of directly from server socket
        '''
        while True:
            socketserver.ThreadingMixIn.process_request_thread(self, *self.requests.get())

    def handle_request(self):
        '''
        simply collect requests and put them on the queue for the workers.
        '''
        try:
            request, client_address = self.get_request()
        except socket.error:
            return
        if self.verify_request(request, client_address):
            self.requests.put((request, client_address))


class ThreadedTCPServer(ThreadPoolMixIn, socketserver.TCPServer):
    """Handle requests in a separate thread."""
    #pass


def loadIni():
    inipath = getcwd() + '/' + 'conf.ini'
    config = conf.loadSettings(inipath)
    globals.DB_HOST = str(config['Database']['host'])
    globals.DB_USER = str(config['Database']['user'])
    globals.DB_PWD = str(config['Database']['pwd'])
    globals.DB_NAME = str(config['Database']['db'])
    globals.LISTEN_PORT = int(config['PORTS']['listenPort'])
    conf.saveSettings(inipath, config)


def checkBase():
    #reconnect to database
    if not DB.checkConnection():
        DB.close()
        DB.connect(globals.DB_HOST, globals.DB_USER, globals.DB_PWD, globals.DB_NAME)
    time.sleep(600)
    #print('check Base')
    checkBase()


def start_server():
    global mutex
    try:
        #Загрузка из конфигурации
        loadIni()
        #start server
        if DB.connect(globals.DB_HOST, globals.DB_USER, globals.DB_PWD, globals.DB_NAME):
            httpd = ThreadedTCPServer(("", globals.LISTEN_PORT), Handler)
            print("Start server at port", globals.LISTEN_PORT)
            mutex = thread.allocate_lock()
            thread.start_new_thread(checkBase, ())
            httpd.serve_forever()
    except Exception as e:
        print(e)
        print('Retrying connect to server.')
        time.sleep(5)
        start_server()

if __name__ == '__main__':
    start_server()

如何解决这个问题?还有其他方法可以做http服务器吗?在


Tags: importselfhttpdbdataifserverrequest