如何通过远程管理器传递Python对象?
我正在用Python开发一个简单的客户端-服务器应用程序。我使用一个管理器来设置共享队列,但我不知道怎么把一个任意对象从服务器传递到客户端。我觉得这可能和manager.register函数有关,但在multiprocessing文档中没有解释得很清楚。那里唯一的例子只用到了队列,没有其他内容。
这是我的代码:
#manager demo.py
from multiprocessing import Process, Queue, managers
from multiprocessing.managers import SyncManager
import time
class MyObject():
def __init__( self, p, f ):
self.parameter = p
self.processor_function = f
class MyServer():
def __init__(self, server_info, obj):
print '=== Launching Server ... ====='
(ip, port, pw) = server_info
self.object = obj #Parameters for task processing
#Define queues
self._process_queue = Queue() #Queue of tasks to be processed
self._results_queue = Queue() #Queue of processed tasks to be stored
#Set up IS_Manager class and register server functions
class IS_Manager(managers.BaseManager): pass
IS_Manager.register('get_processQ', callable=self.get_process_queue)
IS_Manager.register('get_resultsQ', callable=self.get_results_queue)
IS_Manager.register('get_object', callable=self.get_object)
#Initialize manager and server
self.manager = IS_Manager(address=(ip, port), authkey=pw)
self.server = self.manager.get_server()
self.server_process = Process( target=self.server.serve_forever )
self.server_process.start()
def get_process_queue(self): return self._process_queue
def get_results_queue(self): return self._results_queue
def get_object(self): return self.object
def runUntilDone(self, task_list):
#Fill the initial queue
for t in task_list:
self._process_queue.put(t)
#Main loop
total_tasks = len(task_list)
while not self._results_queue.qsize()==total_tasks:
time.sleep(.5)
print self._process_queue.qsize(), '\t', self._results_queue.qsize()
if not self._results_queue.empty():
print '\t', self._results_queue.get()
#Do stuff
pass
class MyClient():
def __init__(self, server_info):
(ip, port, pw) = server_info
print '=== Launching Client ... ====='
class IS_Manager(managers.BaseManager): pass
IS_Manager.register('get_processQ')
IS_Manager.register('get_resultsQ')
IS_Manager.register('get_object')
#Set up manager, pool
print '\tConnecting to server...'
manager = IS_Manager(address=(ip, port), authkey=pw)
manager.connect()
self._process_queue = manager.get_processQ()
self._results_queue = manager.get_resultsQ()
self.object = manager.get_object()
print '\tConnected.'
def runUntilDone(self):#, parameters):
print 'Starting client main loop...'
#Main loop
while 1:
if self._process_queue.empty():
print 'I\'m bored here!'
time.sleep(.5)
else:
task = self._process_queue.get()
print task, '\t', self.object.processor_function( task, self.object.parameter )
print 'Client process is quitting. Bye!'
self._clients_queue.get()
还有一个简单的服务器...
from manager_demo import *
def myProcessor( x, parameter ):
return x + parameter
if __name__ == '__main__':
my_object = MyObject( 100, myProcessor )
my_task_list = range(1,20)
my_server_info = ('127.0.0.1', 8081, 'my_pw')
my_crawl_server = MyServer( my_server_info, my_object )
my_crawl_server.runUntilDone( my_task_list )
还有一个简单的客户端...
from manager_demo import *
if __name__ == '__main__':
my_server_info = ('127.0.0.1', 8081, 'my_pw')
my_client = MyClient( my_server_info )
my_client.runUntilDone()
当我运行这个时,它在以下地方崩溃:
erin@Erin:~/Desktop$ python client.py
=== Launching Client ... =====
Connecting to server...
Connected.
Starting client main loop...
2 Traceback (most recent call last):
File "client.py", line 5, in <module>
my_client.runUntilDone()
File "/home/erin/Desktop/manager_demo.py", line 84, in runUntilDone
print task, '\t', self.object.processor_function( task, self.object.parameter )
AttributeError: 'AutoProxy[get_object]' object has no attribute 'parameter'
为什么Python在处理队列或processor_function时没有问题,但在处理对象参数时却出错呢?谢谢!
1 个回答
2
你遇到这个问题是因为你在 MyObject()
类中的 parameter
属性不是一个可以调用的东西。
根据文档,_exposed_
用来指定一系列方法名,这些方法名是这个类型的代理。如果没有指定暴露列表,那么共享对象的所有“公共方法”都会被访问到。(这里的“公共方法”指的是任何有 __call__() 方法的属性,并且名字不以 '_' 开头。)
所以,你需要手动将 MyObject
中的 parameter
属性暴露出来,可能是通过将其改成一个方法,来修改你的 MyObject()
:
class MyObject():
def __init__(self, p, f):
self._parameter = p
self.processor_function = f
def parameter(self):
return self._parameter
另外,你还需要将你的任务改成:
self.object.processor_function(task, self.object.parameter())
希望这对你有帮助。