在Python服务器中共享进程间列表

15 投票
5 回答
12465 浏览
提问于 2025-04-18 18:14

我有一个简单的UDP服务器,它使用了多进程

我想创建一个列表,里面存放所有客户端的信息。

我使用了Manager,但是我不太明白怎么把信息添加到列表里——我需要把Manager的对象传递给处理程序,但该怎么做呢?我用新属性的方法不奏效。

import multiprocessing
from socketserver import UDPServer, ForkingMixIn, DatagramRequestHandler
from socket import socket, AF_INET, SOCK_DGRAM
from settings import host, port, number_of_connections

class ChatHandler(DatagramRequestHandler):

    def handle(self):
        cur_process = multiprocessing.current_process()
        data = self.request[0].strip()
        socket = self.request[1]
        ChatHandler.clients.append(self.client_address) # error here
        print(ChatHandler.clients)


class ChatServer(ForkingMixIn, UDPServer):
    pass


if __name__ == '__main__':
    server = ChatServer((host, port), ChatHandler)
    ChatHandler.clients = multiprocessing.Manager().list()
    server_process = multiprocessing.Process(target=server.serve_forever)
    server_process.daemon = False
    server_process.start()

怎么解决这个问题呢?谢谢!

输出:

Exception happened during processing of request from ('127.0.0.1', 55679)
Traceback (most recent call last):
  File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/managers.py", line 724, in _callmethod
    conn = self._tls.connection
AttributeError: 'ForkAwareLocal' object has no attribute 'connection'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/socketserver.py", line 584, in process_request
    self.finish_request(request, client_address)
  File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/socketserver.py", line 344, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/socketserver.py", line 665, in __init__
    self.handle()
  File "server.py", line 15, in handle
    ChatHandler.clients.append(self.client_address)
  File "<string>", line 2, in append
  File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/managers.py", line 728, in _callmethod
    self._connect()
  File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/managers.py", line 715, in _connect
    conn = self._Client(self._token.address, authkey=self._authkey)
  File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/connection.py", line 495, in Client
    c = SocketClient(address)
  File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/connection.py", line 624, in SocketClient
    s.connect(address)
FileNotFoundError: [Errno 2] No such file or directory

5 个回答

0

如果你因为某种原因不能使用现有的管理器,你也可以自己实现一个,按照你的需求来做。

我的单元测试被设置成如果有子进程没有正常关闭,就会停止所有这些子进程,这样就会破坏管理器。所以我需要一个可以随意启动和停止的东西,不会影响到测试。

import multiprocessing
import atexit
import select

class SharedDict:
    """Share a dictionary across processes."""
    def __init__(self):
        """Create a shared dictionary."""
        super().__init__()
        self.pipe = multiprocessing.Pipe()
        self.process = None
        atexit.register(self._stop)
        self._start()

    def _start(self):
        """Ensure the process to manage the dictionary is running."""
        if self.process is not None and self.process.is_alive():
            return

        # if the manager has already been running in the past but stopped
        # for some reason, the dictionary contents are lost
        self.process = multiprocessing.Process(target=self.manage)
        self.process.start()

    def manage(self):
        """Manage the dictionary, handle read and write requests."""
        shared_dict = dict()
        while True:
            message = self.pipe[0].recv()
            logger.spam('SharedDict got %s', message)

            if message[0] == 'stop':
                return

            if message[0] == 'set':
                shared_dict[message[1]] = message[2]

            if message[0] == 'get':
                self.pipe[0].send(shared_dict.get(message[1]))

    def _stop(self):
        """Stop the managing process."""
        self.pipe[1].send(('stop',))

    def get(self, key):
        """Get a value from the dictionary."""
        return self.__getitem__(key)

    def __setitem__(self, key, value):
        self.pipe[1].send(('set', key, value))

    def __getitem__(self, key):
        self.pipe[1].send(('get', key))

        # to avoid blocking forever if something goes wrong
        select.select([self.pipe[1]], [], [], 0.1)
        if self.pipe[1].poll():
            return self.pipe[1].recv()

        return None

    def __del__(self):
        self._stop()


shared_dict = SharedDict()

你可以在这个基础上添加各种方法,而且你可以随时停止和重启它(不过每次重启时字典里的内容会丢失)。管道会一直保持不变,所以所有的子进程也可以和重启后的管理器进行交流,而不需要新的管道文件描述符。

我可能会在这个基础上增加更多功能。如果我没有把那个类移动到自己的模块里,你可以在这里找到它:https://github.com/sezanzeb/key-mapper/blob/main/keymapper/injection/macros.py

0

你可以使用Python自带的库 multiprocessing.SharedMemory

或者这样:

import multiprocessing
manager = multiprocessing.Manager()
shared_list = manager.list()

def worker1(l):
    l.append(1)

def worker2(l):
    l.append(2)

process1 = multiprocessing.Process(
    target=worker1, args=[shared_list])
process2 = multiprocessing.Process(
    target=worker2, args=[shared_list])

process1.start()
process2.start()
process1.join()
process2.join()

print shared_list
0

如果你是像下面这样使用它的话,可能需要看看你传入的列表长度或者硬编码的工作者数量,可能会超过你机器的能力:

        pool = Pool(len(somelist))
        # call the function 'somefunction' in parallel for each somelist.
        pool.map(somefunction, somelist)

我减少了工作者的数量,这样就解决了我的问题。

0

下面是一个UDP服务器和共享列表的例子。

  • 父代码创建了一个管理器,一个被管理的列表,并把它传递给了start_server()这个函数。

  • 这个函数实际上启动了服务器,并存储了共享列表,以便服务器和它的处理程序可以访问这个列表。

  • 当有数据包到达时,handle()方法会被触发。这个方法通过self.server访问服务器,通过self.server.client_list访问共享列表,这个列表是ChatServer实例的一个属性。

我进行了测试,启动服务器后等了一秒,然后用netcat命令发送了一个UDP数据包“beer”。出于某种原因,它先发送了Xs,并且每次输出都是重复的。这是一个bug,但代码应该能给你指明方向。

源代码

import multiprocessing as mp, signal, sys
from SocketServer import (
    UDPServer, ForkingMixIn, DatagramRequestHandler
)

class ChatHandler(DatagramRequestHandler):
    def handle(self):
        data,_socket = self.request
        curproc = mp.current_process()
        print '{}: {}'.format(
            curproc,
            dict(
                data_len=len(data), 
                data=data.strip(),
                client=self.client_address,
            ))
        self.server.client_list.append(
            self.client_address)
        print('{}: {}'.format(
            curproc,
            dict(client_list=self.server.client_list),
        ))

class ChatServer(ForkingMixIn, UDPServer):
    client_list = None

def start_server(client_list):
    server = ChatServer(('', 9876), ChatHandler)
    server.client_list = client_list
    server.serve_forever()

if __name__ == '__main__':
    clist = mp.Manager().list()
    mp.Process(
        target=start_server, args=[clist],
        name='udpserver',
    ).start()

    signal.alarm(5)             # die in 5 seconds
    signal.pause()              # wait for control-C or alarm

测试运行

(sleep 1 ; echo beer | nc -vvu localhost 9876 ) &
python ./mshared.py

<Process(udpserver, started)>: {'data': 'X', 'client': ('127.0.0.1', 49399), 'data_len': 1}
<Process(udpserver, started)>: {'data': 'X', 'client': ('127.0.0.1', 49399), 'data_len': 1}
<Process(udpserver, started)>: {'client_list': <ListProxy object, typeid 'list' at 0x1774650>}
<Process(udpserver, started)>: {'client_list': <ListProxy object, typeid 'list' at 0x1774650>}
<Process(udpserver, started)>: {'data': 'X', 'client': ('127.0.0.1', 49399), 'data_len': 1}
<Process(udpserver, started)>: {'client_list': <ListProxy object, typeid 'list' at 0x1774650>}
<Process(udpserver, started)>: {'data': 'X', 'client': ('127.0.0.1', 49399), 'data_len': 1}
<Process(udpserver, started)>: {'client_list': <ListProxy object, typeid 'list' at 0x1774650>}
Connection to localhost 9876 port [udp/*] succeeded!
<Process(udpserver, started)>: {'data': 'X', 'client': ('127.0.0.1', 49399), 'data_len': 1}
<Process(udpserver, started)>: {'client_list': <ListProxy object, typeid 'list' at 0x1774650>}
<Process(udpserver, started)>: {'data': 'beer', 'client': ('127.0.0.1', 49399), 'data_len': 5}
<Process(udpserver, started)>: {'client_list': <ListProxy object, typeid 'list' at 0x1774650>}
21

问题在于,你让主程序在启动工作进程后立刻就结束了。当创建了 multiprocessing.Manager 的那个进程结束时,Manager 服务器也会关闭,这样你共享的列表对象就没用了。这是因为 Manager 对象会在 multiprocessing 模块中注册一个叫做 "finalizer" 的函数,这个函数会在进程退出前运行。下面是注册这个函数的代码,位于 BaseManager.__init__

    # register a finalizer
    self._state.value = State.STARTED
    self.shutdown = util.Finalize(
        self, type(self)._finalize_manager,
        args=(self._process, self._address, self._authkey,
              self._state, self._Client),
        exitpriority=0
        )

这是实际执行关闭的代码:

@staticmethod
def _finalize_manager(process, address, authkey, state, _Client):
    '''
    Shutdown the manager process; will be registered as a finalizer
    '''
    if process.is_alive():
        util.info('sending shutdown message to manager')
        try:
            conn = _Client(address, authkey=authkey)
            try:
                dispatch(conn, None, 'shutdown')
            finally:
                conn.close()
        except Exception:
            pass

        process.join(timeout=1.0)
        if process.is_alive():
            util.info('manager still alive')
            if hasattr(process, 'terminate'):
                util.info('trying to `terminate()` manager process')
                process.terminate()
                process.join(timeout=0.1)
                if process.is_alive():
                    util.info('manager still alive after terminate')

    state.value = State.SHUTDOWN
    try:
        del BaseProxy._address_to_local[address]
    except KeyError:
        pass

解决方法很简单——在启动运行 UDP 服务器的进程后,不要让主程序立刻结束,而是调用 server_process.join() 来等待这个进程完成:

import multiprocessing
from socketserver import UDPServer, ForkingMixIn, DatagramRequestHandler
from socket import socket, AF_INET, SOCK_DGRAM
from settings import host, port, number_of_connections

class ChatHandler(DatagramRequestHandler):

    def handle(self):
        cur_process = multiprocessing.current_process()
        data = self.request[0].strip()
        socket = self.request[1]
        ChatHandler.clients.append(self.client_address) # error here
        print(ChatHandler.clients)


class ChatServer(ForkingMixIn, UDPServer):
    pass


if __name__ == '__main__':
    server = ChatServer((host, port), ChatHandler)
    ChatHandler.clients = multiprocessing.Manager().list()
    server_process = multiprocessing.Process(target=server.serve_forever)
    server_process.daemon = False
    server_process.start()
    server_process.join() # This fixes the issue.

撰写回答