在Python服务器中共享进程间列表
我有一个简单的UDP服务器,它使用了多进程。
我想创建一个列表,里面存放所有客户端的信息。
我使用了Manager,但是我不太明白怎么把信息添加到列表里——我需要把Manager的对象传递给处理程序,但该怎么做呢?我用新属性的方法不奏效。
import multiprocessing
from socketserver import UDPServer, ForkingMixIn, DatagramRequestHandler
from socket import socket, AF_INET, SOCK_DGRAM
from settings import host, port, number_of_connections
class ChatHandler(DatagramRequestHandler):
def handle(self):
cur_process = multiprocessing.current_process()
data = self.request[0].strip()
socket = self.request[1]
ChatHandler.clients.append(self.client_address) # error here
print(ChatHandler.clients)
class ChatServer(ForkingMixIn, UDPServer):
pass
if __name__ == '__main__':
server = ChatServer((host, port), ChatHandler)
ChatHandler.clients = multiprocessing.Manager().list()
server_process = multiprocessing.Process(target=server.serve_forever)
server_process.daemon = False
server_process.start()
怎么解决这个问题呢?谢谢!
输出:
Exception happened during processing of request from ('127.0.0.1', 55679)
Traceback (most recent call last):
File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/managers.py", line 724, in _callmethod
conn = self._tls.connection
AttributeError: 'ForkAwareLocal' object has no attribute 'connection'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/socketserver.py", line 584, in process_request
self.finish_request(request, client_address)
File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/socketserver.py", line 344, in finish_request
self.RequestHandlerClass(request, client_address, self)
File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/socketserver.py", line 665, in __init__
self.handle()
File "server.py", line 15, in handle
ChatHandler.clients.append(self.client_address)
File "<string>", line 2, in append
File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/managers.py", line 728, in _callmethod
self._connect()
File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/managers.py", line 715, in _connect
conn = self._Client(self._token.address, authkey=self._authkey)
File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/connection.py", line 495, in Client
c = SocketClient(address)
File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/connection.py", line 624, in SocketClient
s.connect(address)
FileNotFoundError: [Errno 2] No such file or directory
5 个回答
如果你因为某种原因不能使用现有的管理器,你也可以自己实现一个,按照你的需求来做。
我的单元测试被设置成如果有子进程没有正常关闭,就会停止所有这些子进程,这样就会破坏管理器。所以我需要一个可以随意启动和停止的东西,不会影响到测试。
import multiprocessing
import atexit
import select
class SharedDict:
"""Share a dictionary across processes."""
def __init__(self):
"""Create a shared dictionary."""
super().__init__()
self.pipe = multiprocessing.Pipe()
self.process = None
atexit.register(self._stop)
self._start()
def _start(self):
"""Ensure the process to manage the dictionary is running."""
if self.process is not None and self.process.is_alive():
return
# if the manager has already been running in the past but stopped
# for some reason, the dictionary contents are lost
self.process = multiprocessing.Process(target=self.manage)
self.process.start()
def manage(self):
"""Manage the dictionary, handle read and write requests."""
shared_dict = dict()
while True:
message = self.pipe[0].recv()
logger.spam('SharedDict got %s', message)
if message[0] == 'stop':
return
if message[0] == 'set':
shared_dict[message[1]] = message[2]
if message[0] == 'get':
self.pipe[0].send(shared_dict.get(message[1]))
def _stop(self):
"""Stop the managing process."""
self.pipe[1].send(('stop',))
def get(self, key):
"""Get a value from the dictionary."""
return self.__getitem__(key)
def __setitem__(self, key, value):
self.pipe[1].send(('set', key, value))
def __getitem__(self, key):
self.pipe[1].send(('get', key))
# to avoid blocking forever if something goes wrong
select.select([self.pipe[1]], [], [], 0.1)
if self.pipe[1].poll():
return self.pipe[1].recv()
return None
def __del__(self):
self._stop()
shared_dict = SharedDict()
你可以在这个基础上添加各种方法,而且你可以随时停止和重启它(不过每次重启时字典里的内容会丢失)。管道会一直保持不变,所以所有的子进程也可以和重启后的管理器进行交流,而不需要新的管道文件描述符。
我可能会在这个基础上增加更多功能。如果我没有把那个类移动到自己的模块里,你可以在这里找到它:https://github.com/sezanzeb/key-mapper/blob/main/keymapper/injection/macros.py
你可以使用Python自带的库 multiprocessing.SharedMemory
或者这样:
import multiprocessing
manager = multiprocessing.Manager()
shared_list = manager.list()
def worker1(l):
l.append(1)
def worker2(l):
l.append(2)
process1 = multiprocessing.Process(
target=worker1, args=[shared_list])
process2 = multiprocessing.Process(
target=worker2, args=[shared_list])
process1.start()
process2.start()
process1.join()
process2.join()
print shared_list
如果你是像下面这样使用它的话,可能需要看看你传入的列表长度或者硬编码的工作者数量,可能会超过你机器的能力:
pool = Pool(len(somelist))
# call the function 'somefunction' in parallel for each somelist.
pool.map(somefunction, somelist)
我减少了工作者的数量,这样就解决了我的问题。
下面是一个UDP服务器和共享列表的例子。
父代码创建了一个管理器,一个被管理的列表,并把它传递给了
start_server()
这个函数。这个函数实际上启动了服务器,并存储了共享列表,以便服务器和它的处理程序可以访问这个列表。
当有数据包到达时,
handle()
方法会被触发。这个方法通过self.server
访问服务器,通过self.server.client_list
访问共享列表,这个列表是ChatServer实例的一个属性。
我进行了测试,启动服务器后等了一秒,然后用netcat
命令发送了一个UDP数据包“beer”。出于某种原因,它先发送了Xs,并且每次输出都是重复的。这是一个bug,但代码应该能给你指明方向。
源代码
import multiprocessing as mp, signal, sys
from SocketServer import (
UDPServer, ForkingMixIn, DatagramRequestHandler
)
class ChatHandler(DatagramRequestHandler):
def handle(self):
data,_socket = self.request
curproc = mp.current_process()
print '{}: {}'.format(
curproc,
dict(
data_len=len(data),
data=data.strip(),
client=self.client_address,
))
self.server.client_list.append(
self.client_address)
print('{}: {}'.format(
curproc,
dict(client_list=self.server.client_list),
))
class ChatServer(ForkingMixIn, UDPServer):
client_list = None
def start_server(client_list):
server = ChatServer(('', 9876), ChatHandler)
server.client_list = client_list
server.serve_forever()
if __name__ == '__main__':
clist = mp.Manager().list()
mp.Process(
target=start_server, args=[clist],
name='udpserver',
).start()
signal.alarm(5) # die in 5 seconds
signal.pause() # wait for control-C or alarm
测试运行
(sleep 1 ; echo beer | nc -vvu localhost 9876 ) &
python ./mshared.py
<Process(udpserver, started)>: {'data': 'X', 'client': ('127.0.0.1', 49399), 'data_len': 1}
<Process(udpserver, started)>: {'data': 'X', 'client': ('127.0.0.1', 49399), 'data_len': 1}
<Process(udpserver, started)>: {'client_list': <ListProxy object, typeid 'list' at 0x1774650>}
<Process(udpserver, started)>: {'client_list': <ListProxy object, typeid 'list' at 0x1774650>}
<Process(udpserver, started)>: {'data': 'X', 'client': ('127.0.0.1', 49399), 'data_len': 1}
<Process(udpserver, started)>: {'client_list': <ListProxy object, typeid 'list' at 0x1774650>}
<Process(udpserver, started)>: {'data': 'X', 'client': ('127.0.0.1', 49399), 'data_len': 1}
<Process(udpserver, started)>: {'client_list': <ListProxy object, typeid 'list' at 0x1774650>}
Connection to localhost 9876 port [udp/*] succeeded!
<Process(udpserver, started)>: {'data': 'X', 'client': ('127.0.0.1', 49399), 'data_len': 1}
<Process(udpserver, started)>: {'client_list': <ListProxy object, typeid 'list' at 0x1774650>}
<Process(udpserver, started)>: {'data': 'beer', 'client': ('127.0.0.1', 49399), 'data_len': 5}
<Process(udpserver, started)>: {'client_list': <ListProxy object, typeid 'list' at 0x1774650>}
问题在于,你让主程序在启动工作进程后立刻就结束了。当创建了 multiprocessing.Manager
的那个进程结束时,Manager
服务器也会关闭,这样你共享的列表对象就没用了。这是因为 Manager
对象会在 multiprocessing
模块中注册一个叫做 "finalizer" 的函数,这个函数会在进程退出前运行。下面是注册这个函数的代码,位于 BaseManager.__init__
:
# register a finalizer
self._state.value = State.STARTED
self.shutdown = util.Finalize(
self, type(self)._finalize_manager,
args=(self._process, self._address, self._authkey,
self._state, self._Client),
exitpriority=0
)
这是实际执行关闭的代码:
@staticmethod
def _finalize_manager(process, address, authkey, state, _Client):
'''
Shutdown the manager process; will be registered as a finalizer
'''
if process.is_alive():
util.info('sending shutdown message to manager')
try:
conn = _Client(address, authkey=authkey)
try:
dispatch(conn, None, 'shutdown')
finally:
conn.close()
except Exception:
pass
process.join(timeout=1.0)
if process.is_alive():
util.info('manager still alive')
if hasattr(process, 'terminate'):
util.info('trying to `terminate()` manager process')
process.terminate()
process.join(timeout=0.1)
if process.is_alive():
util.info('manager still alive after terminate')
state.value = State.SHUTDOWN
try:
del BaseProxy._address_to_local[address]
except KeyError:
pass
解决方法很简单——在启动运行 UDP 服务器的进程后,不要让主程序立刻结束,而是调用 server_process.join()
来等待这个进程完成:
import multiprocessing
from socketserver import UDPServer, ForkingMixIn, DatagramRequestHandler
from socket import socket, AF_INET, SOCK_DGRAM
from settings import host, port, number_of_connections
class ChatHandler(DatagramRequestHandler):
def handle(self):
cur_process = multiprocessing.current_process()
data = self.request[0].strip()
socket = self.request[1]
ChatHandler.clients.append(self.client_address) # error here
print(ChatHandler.clients)
class ChatServer(ForkingMixIn, UDPServer):
pass
if __name__ == '__main__':
server = ChatServer((host, port), ChatHandler)
ChatHandler.clients = multiprocessing.Manager().list()
server_process = multiprocessing.Process(target=server.serve_forever)
server_process.daemon = False
server_process.start()
server_process.join() # This fixes the issue.