无法序列化<class '__main__.JobQueueManager'>
我在这个代码中遇到了一个“可序列化”的问题(下面也附上了代码)。我看过一些相关的帖子[1] [2],但找不到有用的对应信息。你能帮我解释一下这个错误或者给我一个解决方案吗?
下面是返回错误的代码部分:
pickle.PicklingError: Can't pickle <class '__main__.JobQueueManager'>: it's not found as __main__.JobQueueManager
谢谢!
def make_server_manager(port, authkey):
job_q = Queue.Queue()
result_q = Queue.Queue()
class JobQueueManager(SyncManager):
pass
JobQueueManager.register('get_job_q', callable=lambda: job_q)
JobQueueManager.register('get_result_q', callable=lambda: result_q)
manager = JobQueueManager(address=('', port), authkey=authkey)
manager.start()
print 'Server started at port %s' % port
return manager
PS: Python 2.7.7,Win 7
5 个回答
我之前遇到了一些可选性的问题,但相比之下,主线程上的运行队列需要一个主线程和一个运行者来提供队列中的项目,还有一个客户端来订阅和处理这些工作。虽然需要稍微整理一下,但运行得很顺利。
我花了大约两天时间来计划和编写脚本,但看到它能正常工作后,觉得这一切都是值得的。
from queue import Queue
if __name__ == '__main__':
try:
server_address = ('localhost', 8624)
get_queue = result_queue = Queue()
authkey = bytes("769ac424-adb6-5a73-83b0-d22eb27e543b", 'utf-8') #comment out this line for random apikey
'''w = Worker(queue) #QueueSupplyWorker(authkey, queue) #a worker object using the same queue
w.start()'''
threading.Thread(target=start_supplyrunner, args=[get_queue,result_queue]).start()
#register queues statically
DistibutedServer.register('get_queue', callable=lambda: get_queue)
DistibutedServer.register('result_queue', callable=lambda: result_queue)
ds = DistibutedServer(address=server_address, authkey=authkey)
s = ds.get_server()
#serve_forever must be the last since it enters the loop anything after this won't execute until keyboardinterrupt
s.serve_forever()
except KeyboardInterrupt:
print("winding up..")
time.sleep(2)
print('bye, bye!')
这个完整的工作示例可以在这里找到,queue-server
试试这个:
class JobQueueManager(SyncManager):
pass
def make_server_manager(port, authkey):
job_q = Queue.Queue()
result_q = Queue.Queue()
JobQueueManager.register('get_job_q', callable=lambda: job_q)
JobQueueManager.register('get_result_q', callable=lambda: result_q)
manager = JobQueueManager(address=('', port), authkey=authkey)
manager.start()
print 'Server started at port %s' % port
return manager
把类的定义放到pickle能找到的地方,这样就可以进行序列化了。pickle会在__main__
模块里寻找这个类,但你的代码里它找不到,因为这个类是在函数里面定义的。不过,正如评论中提到的,管理器其实不需要被序列化,所以可能是其他对象把它带进来了,比如一个包含管理器的全局变量的函数。
multiprocessing
库为你提供了一个现成的解决方案——multiprocessing.Queue
。这个队列在任何地方都应该可以自动被处理,包括Windows系统(而且它在早到2.7版本时也能使用)。
我觉得让Queue.Queue
可以被处理并不是个好主意。因为你不会得到一个可以在两个不同进程中使用的队列,而是会在另一个进程中得到这个队列的独立副本。
如果你想在另一个进程中拥有当前队列的状态,实际上更简单的方法是把队列里的所有元素提取出来,变成一个普通的列表(如果所有元素都可以被处理的话),然后把这个列表发送过去,再在另一边重新创建一个新的Queue.Queue
。
另外,正如你现在可能已经发现的,你不能处理本地的lambda函数——这怎么可能呢?相反,你应该创建一个在那个命名空间中全局可用的函数,然后把这个全局函数和所需的数据一起发送过去。
你需要确保 Queue.Queue
可以被序列化,也就是可以保存和恢复,同时你的 lambda
函数和 JobQueueManager
也要能被序列化。
为了做到这一点,我觉得可以很简单,只需要安装 dill
这个包,然后用 import dill
导入它就行了。
我没有在 Windows 上测试过,但应该可以像下面这样使用。你可以在这里找到 dill
:https://github.com/uqfoundation。
>>> import dill
>>> import Queue
>>> from multiprocessing.managers import SyncManager
>>>
>>> def make_server_manager(port, authkey):
... job_q = Queue.Queue()
... result_q = Queue.Queue()
... class JobQueueManager(SyncManager):
... pass
... JobQueueManager.register('get_job_q', callable=lambda: job_q)
... JobQueueManager.register('get_result_q', callable=lambda: result_q)
... manager = JobQueueManager(address=('',port), authkey=authkey)
... manager.start()
... print "server started at port %s" % port
... return manager
...
>>> sm = make_server_manager(12345, 'foofoo')
server started at port 12345
根据我的理解,要让这个模式在Windows上正常工作,你需要创建一个可以被“序列化”的queue.Queue
。你可以通过创建一个Queue
的子类来实现,在这个子类中定义__setstate__
和__getstate__
这两个方法。这样做的目的是只序列化我们需要在进程之间传递的状态部分,而把其他不需要的东西(比如内部锁)排除在外。
我们还需要做其他一些修改,比如把自定义的Manager
类的定义放到最上面,并且不要把lambda
函数作为callable
的参数。相反,我们使用partial
和一个顶层函数,因为这样可以被序列化。下面是最终的代码:
import sys
from multiprocessing.managers import SyncManager
from functools import partial
import multiprocessing
from Queue import Queue as _Queue
class Queue(_Queue):
""" A picklable queue. """
def __getstate__(self):
# Only pickle the state we care about
return (self.maxsize, self.queue, self.unfinished_tasks)
def __setstate__(self, state):
# Re-initialize the object, then overwrite the default state with
# our pickled state.
Queue.__init__(self)
self.maxsize = state[0]
self.queue = state[1]
self.unfinished_tasks = state[2]
def get_q(q):
return q
class JobQueueManager(SyncManager):
pass
def make_server_manager(port, authkey):
job_q = Queue()
result_q = Queue()
job_q.put("hey")
JobQueueManager.register('get_job_q', callable=partial(get_q, job_q))
JobQueueManager.register('get_result_q', callable=partial(get_q, result_q))
manager = JobQueueManager(address=('', port), authkey=authkey)
#manager.start()
print('Server started at port %s' % port)
return manager
def make_client_manager(port, authkey):
JobQueueManager.register('get_job_q')
JobQueueManager.register('get_result_q')
manager = JobQueueManager(address=('localhost', port), authkey=authkey)
manager.connect()
queue = manager.get_job_q()
print("got queue {}".format(queue))
print(queue.get_nowait())
if __name__ == "__main__":
if len(sys.argv) > 1 and sys.argv[1] == "--client":
make_client_manager(50000, 'abcdefg')
else:
manager = make_server_manager(50000, "abcdefg")
server = manager.get_server()
server.serve_forever()