无法序列化<class '__main__.JobQueueManager'>

1 投票
5 回答
7212 浏览
提问于 2025-04-19 11:09

我在这个代码中遇到了一个“可序列化”的问题(下面也附上了代码)。我看过一些相关的帖子[1] [2],但找不到有用的对应信息。你能帮我解释一下这个错误或者给我一个解决方案吗?

下面是返回错误的代码部分:

pickle.PicklingError: Can't pickle <class '__main__.JobQueueManager'>: it's not found as __main__.JobQueueManager

谢谢!

def make_server_manager(port, authkey):

    job_q = Queue.Queue()
    result_q = Queue.Queue()

    class JobQueueManager(SyncManager):
        pass

    JobQueueManager.register('get_job_q', callable=lambda: job_q)
    JobQueueManager.register('get_result_q', callable=lambda: result_q)

    manager = JobQueueManager(address=('', port), authkey=authkey)
    manager.start()
    print 'Server started at port %s' % port
    return manager

PS: Python 2.7.7,Win 7

5 个回答

0

我之前遇到了一些可选性的问题,但相比之下,主线程上的运行队列需要一个主线程和一个运行者来提供队列中的项目,还有一个客户端来订阅和处理这些工作。虽然需要稍微整理一下,但运行得很顺利。

我花了大约两天时间来计划和编写脚本,但看到它能正常工作后,觉得这一切都是值得的。

from queue import Queue

if __name__ == '__main__':

    try:

        server_address = ('localhost', 8624)
        get_queue = result_queue = Queue()
        authkey = bytes("769ac424-adb6-5a73-83b0-d22eb27e543b", 'utf-8')  #comment out this line for random apikey

        '''w = Worker(queue) #QueueSupplyWorker(authkey, queue) #a worker object using the same queue
        w.start()'''

        threading.Thread(target=start_supplyrunner, args=[get_queue,result_queue]).start()
        #register queues statically
        DistibutedServer.register('get_queue', callable=lambda: get_queue)
        DistibutedServer.register('result_queue', callable=lambda: result_queue)

       

        ds = DistibutedServer(address=server_address, authkey=authkey)

        s = ds.get_server()
        
        
        #serve_forever must be the last since it enters the loop anything after this won't execute until keyboardinterrupt
        s.serve_forever()

    except KeyboardInterrupt:
        print("winding up..")
        time.sleep(2)
        print('bye, bye!')

这个完整的工作示例可以在这里找到,queue-server

0

试试这个:

class JobQueueManager(SyncManager):
        pass

def make_server_manager(port, authkey):

    job_q = Queue.Queue()
    result_q = Queue.Queue()

    JobQueueManager.register('get_job_q', callable=lambda: job_q)
    JobQueueManager.register('get_result_q', callable=lambda: result_q)

    manager = JobQueueManager(address=('', port), authkey=authkey)
    manager.start()
    print 'Server started at port %s' % port
    return manager

把类的定义放到pickle能找到的地方,这样就可以进行序列化了。pickle会在__main__模块里寻找这个类,但你的代码里它找不到,因为这个类是在函数里面定义的。不过,正如评论中提到的,管理器其实不需要被序列化,所以可能是其他对象把它带进来了,比如一个包含管理器的全局变量的函数。

1

multiprocessing库为你提供了一个现成的解决方案——multiprocessing.Queue。这个队列在任何地方都应该可以自动被处理,包括Windows系统(而且它在早到2.7版本时也能使用)。

我觉得让Queue.Queue可以被处理并不是个好主意。因为你不会得到一个可以在两个不同进程中使用的队列,而是会在另一个进程中得到这个队列的独立副本。

如果你想在另一个进程中拥有当前队列的状态,实际上更简单的方法是把队列里的所有元素提取出来,变成一个普通的列表(如果所有元素都可以被处理的话),然后把这个列表发送过去,再在另一边重新创建一个新的Queue.Queue

另外,正如你现在可能已经发现的,你不能处理本地的lambda函数——这怎么可能呢?相反,你应该创建一个在那个命名空间中全局可用的函数,然后把这个全局函数和所需的数据一起发送过去。

1

你需要确保 Queue.Queue 可以被序列化,也就是可以保存和恢复,同时你的 lambda 函数和 JobQueueManager 也要能被序列化。

为了做到这一点,我觉得可以很简单,只需要安装 dill 这个包,然后用 import dill 导入它就行了。

我没有在 Windows 上测试过,但应该可以像下面这样使用。你可以在这里找到 dillhttps://github.com/uqfoundation

>>> import dill
>>> import Queue
>>> from multiprocessing.managers import SyncManager
>>> 
>>> def make_server_manager(port, authkey):
...   job_q = Queue.Queue()
...   result_q = Queue.Queue()
...   class JobQueueManager(SyncManager):
...     pass
...   JobQueueManager.register('get_job_q', callable=lambda: job_q)
...   JobQueueManager.register('get_result_q', callable=lambda: result_q)
...   manager = JobQueueManager(address=('',port), authkey=authkey)
...   manager.start()
...   print "server started at port %s" % port
...   return manager
... 
>>> sm = make_server_manager(12345, 'foofoo')
server started at port 12345
3

根据我的理解,要让这个模式在Windows上正常工作,你需要创建一个可以被“序列化”的queue.Queue。你可以通过创建一个Queue的子类来实现,在这个子类中定义__setstate____getstate__这两个方法。这样做的目的是只序列化我们需要在进程之间传递的状态部分,而把其他不需要的东西(比如内部锁)排除在外。

我们还需要做其他一些修改,比如把自定义的Manager类的定义放到最上面,并且不要把lambda函数作为callable的参数。相反,我们使用partial和一个顶层函数,因为这样可以被序列化。下面是最终的代码:

import sys
from multiprocessing.managers import SyncManager
from functools import partial
import multiprocessing
from Queue import Queue as _Queue

class Queue(_Queue):
    """ A picklable queue. """   
    def __getstate__(self):
        # Only pickle the state we care about
        return (self.maxsize, self.queue, self.unfinished_tasks)

    def __setstate__(self, state):
        # Re-initialize the object, then overwrite the default state with
        # our pickled state.
        Queue.__init__(self)
        self.maxsize = state[0]
        self.queue = state[1]
        self.unfinished_tasks = state[2]


def get_q(q):
    return q

class JobQueueManager(SyncManager):
    pass


def make_server_manager(port, authkey):
    job_q = Queue()
    result_q = Queue()

    job_q.put("hey")
    JobQueueManager.register('get_job_q', callable=partial(get_q, job_q))
    JobQueueManager.register('get_result_q', callable=partial(get_q, result_q))

    manager = JobQueueManager(address=('', port), authkey=authkey)
    #manager.start()
    print('Server started at port %s' % port)
    return manager

def make_client_manager(port, authkey):
    JobQueueManager.register('get_job_q')
    JobQueueManager.register('get_result_q')
    manager = JobQueueManager(address=('localhost', port), authkey=authkey)
    manager.connect()
    queue = manager.get_job_q()
    print("got queue {}".format(queue))
    print(queue.get_nowait())

if __name__ == "__main__":
    if len(sys.argv) > 1 and sys.argv[1] == "--client":
        make_client_manager(50000, 'abcdefg')
    else:
        manager = make_server_manager(50000, "abcdefg")
        server = manager.get_server()
        server.serve_forever()

撰写回答