Python Socket 多进程工作池
我需要通过套接字接收连接,读取输入数据,进行复杂且耗时的计算,然后发送答案。同时可能会有很多请求(比如100个)。
我了解到,由于全局解释器锁(GIL)的原因,我不能使用普通的线程,所以我尝试用C++和boost库来创建线程,并在每个线程中运行Python的子解释器。但无论如何,这样做都无法让所有的核心同时100%利用。
因此,我决定使用多进程的方法,创建一个固定数量的工作进程来处理这些请求,并使用队列来管理。这样,我们就不需要浪费时间去创建新进程,也不会同时有100个或更多的进程,只会有固定数量的进程。
我对Python还很陌生,之前主要使用C++。
现在我有了这段代码,但它并没有正常工作。连接打开后就立即关闭,我不知道为什么:
#!/usr/bin/env python
import os
import sys
import SocketServer
import Queue
import time
import socket
import multiprocessing
from multiprocessing.reduction import reduce_handle
from multiprocessing.reduction import rebuild_handle
class MultiprocessWorker(multiprocessing.Process):
def __init__(self, sq):
self.SLEEP_INTERVAL = 1
# base class initialization
multiprocessing.Process.__init__(self)
# job management stuff
self.socket_queue = sq
self.kill_received = False
def run(self):
while not self.kill_received:
try:
h = self.socket_queue.get_nowait()
fd=rebuild_handle(h)
client_socket=socket.fromfd(fd,socket.AF_INET,socket.SOCK_STREAM)
#client_socket.send("hellofromtheworkerprocess\r\n")
received = client_socket.recv(1024)
print "Recieved on client: ",received
client_socket.close()
except Queue.Empty:
pass
#Dummy timer
time.sleep(self.SLEEP_INTERVAL)
class MyTCPHandler(SocketServer.BaseRequestHandler):
"""
The RequestHandler class for our server.
It is instantiated once per connection to the server, and must
override the handle() method to implement communication to the
client.
"""
def handle(self):
# self.request is the TCP socket connected to the client
#self.data = self.request.recv(1024).strip()
#print "{} wrote:".format(self.client_address[0])
#print self.data
# just send back the same data, but upper-cased
#self.request.sendall(self.data.upper())
#Either pipe it to worker directly like this
#pipe_to_worker.send(h) #instanceofmultiprocessing.Pipe
#or use a Queue :)
h = reduce_handle(self.request.fileno())
socket_queue.put(h)
if __name__ == "__main__":
#Mainprocess
address = ('localhost', 8082)
server = SocketServer.TCPServer(address, MyTCPHandler)
socket_queue = multiprocessing.Queue()
for i in range(5):
worker = MultiprocessWorker(socket_queue)
worker.start()
try:
server.serve_forever()
except KeyboardInterrupt:
sys.exit(0)
2 个回答
0
试试这个:
def handle(self):
h = reduce_handle(self.request.fileno())
socket_queue.put(h)
self.request.close()
注意这里加了 self.request.close() 这一行。
0
你有没有想过为什么不使用
def reduce_socket(s):
...
def rebuild_socket(ds):
...
呢?
看起来你可以这样做:
import copyreg
copyreg.pickle(type(socket.socket), reduce_socket, rebuild_socket)
然后把这个套接字传递给队列。
这些只是一些建议。这样说有没有帮助呢?