如何通过远程管理器传递Python对象?

2 投票
1 回答
4331 浏览
提问于 2025-04-16 13:42

我正在用Python开发一个简单的客户端-服务器应用程序。我使用一个管理器来设置共享队列,但我不知道怎么把一个任意对象从服务器传递到客户端。我觉得这可能和manager.register函数有关,但在multiprocessing文档中没有解释得很清楚。那里唯一的例子只用到了队列,没有其他内容。

这是我的代码:

#manager demo.py
from multiprocessing import Process, Queue, managers
from multiprocessing.managers import SyncManager
import time

class MyObject():
    def __init__( self, p, f ):
        self.parameter = p
        self.processor_function = f

class MyServer():
    def __init__(self, server_info, obj):
        print '=== Launching Server ... ====='
        (ip, port, pw) = server_info
        self.object = obj       #Parameters for task processing

        #Define queues
        self._process_queue = Queue()       #Queue of tasks to be processed
        self._results_queue = Queue()       #Queue of processed tasks to be stored

        #Set up IS_Manager class and register server functions
        class IS_Manager(managers.BaseManager): pass
        IS_Manager.register('get_processQ', callable=self.get_process_queue)
        IS_Manager.register('get_resultsQ', callable=self.get_results_queue)
        IS_Manager.register('get_object', callable=self.get_object)

        #Initialize manager and server
        self.manager = IS_Manager(address=(ip, port), authkey=pw)
        self.server = self.manager.get_server()

        self.server_process = Process( target=self.server.serve_forever )
        self.server_process.start()

    def get_process_queue(self): return self._process_queue
    def get_results_queue(self): return self._results_queue
    def get_object(self): return self.object

    def runUntilDone(self, task_list):
        #Fill the initial queue
        for t in task_list:
            self._process_queue.put(t)

        #Main loop
        total_tasks = len(task_list)
        while not self._results_queue.qsize()==total_tasks:
            time.sleep(.5)
            print self._process_queue.qsize(), '\t', self._results_queue.qsize()
            if not self._results_queue.empty():
                print '\t', self._results_queue.get()
            #Do stuff
            pass

class MyClient():
    def __init__(self, server_info):
        (ip, port, pw) = server_info
        print '=== Launching Client ... ====='

        class IS_Manager(managers.BaseManager): pass

        IS_Manager.register('get_processQ')
        IS_Manager.register('get_resultsQ')
        IS_Manager.register('get_object')

        #Set up manager, pool
        print '\tConnecting to server...'
        manager = IS_Manager(address=(ip, port), authkey=pw)
        manager.connect()

        self._process_queue = manager.get_processQ()
        self._results_queue = manager.get_resultsQ()
        self.object = manager.get_object()

        print '\tConnected.'

    def runUntilDone(self):#, parameters):
        print 'Starting client main loop...'

        #Main loop
        while 1:
            if self._process_queue.empty():
                print 'I\'m bored here!'
                time.sleep(.5)
            else:
                task = self._process_queue.get()
                print task, '\t', self.object.processor_function( task, self.object.parameter )

        print 'Client process is quitting.  Bye!'
        self._clients_queue.get()

还有一个简单的服务器...

from manager_demo import *

def myProcessor( x, parameter ):
    return x + parameter

if __name__ == '__main__':
    my_object = MyObject( 100, myProcessor )
    my_task_list = range(1,20)
    my_server_info = ('127.0.0.1', 8081, 'my_pw')

    my_crawl_server = MyServer( my_server_info, my_object )
    my_crawl_server.runUntilDone( my_task_list )

还有一个简单的客户端...

from manager_demo import *
if __name__ == '__main__':
    my_server_info = ('127.0.0.1', 8081, 'my_pw')
    my_client = MyClient( my_server_info )
    my_client.runUntilDone()

当我运行这个时,它在以下地方崩溃:

erin@Erin:~/Desktop$ python client.py 
=== Launching Client ... =====
    Connecting to server...
    Connected.
Starting client main loop...
2   Traceback (most recent call last):
  File "client.py", line 5, in <module>
    my_client.runUntilDone()
  File "/home/erin/Desktop/manager_demo.py", line 84, in runUntilDone
    print task, '\t', self.object.processor_function( task, self.object.parameter )
AttributeError: 'AutoProxy[get_object]' object has no attribute 'parameter'

为什么Python在处理队列或processor_function时没有问题,但在处理对象参数时却出错呢?谢谢!

1 个回答

2

你遇到这个问题是因为你在 MyObject() 类中的 parameter 属性不是一个可以调用的东西。

根据文档_exposed_ 用来指定一系列方法名,这些方法名是这个类型的代理。如果没有指定暴露列表,那么共享对象的所有“公共方法”都会被访问到。(这里的“公共方法”指的是任何有 __call__() 方法的属性,并且名字不以 '_' 开头。)

所以,你需要手动将 MyObject 中的 parameter 属性暴露出来,可能是通过将其改成一个方法,来修改你的 MyObject()

class MyObject():
    def __init__(self, p, f):
        self._parameter = p
        self.processor_function = f

    def parameter(self):
        return self._parameter

另外,你还需要将你的任务改成:

 self.object.processor_function(task, self.object.parameter())

希望这对你有帮助。

撰写回答