Python multiprocessing.Queue 修改对象

5 投票
1 回答
2600 浏览
提问于 2025-04-17 11:00

我有一个应用程序,它在Python中实现了一种类似责任链的机制。有一个进程通过multiprocessing.Queue()将对象传递给其他进程,这些进程会对这些对象执行操作。同时,跟踪传递对象的最后修改时间也很重要,这样只有在对象被修改时才能采取相应的行动。

我遇到的问题是,从队列中提取对象后,_modified属性似乎会随机变化。然而,_mtime属性总是正确的。下面的例子会运行并(故意)随机修改DummyObject,然后将其放入每个处理进程的队列中。每个处理进程会打印它们收到的对象中的_modified和_mtime值。我期望command_func和处理函数中的_modified值是相同的,但通常不是这样。如果我去掉DummyObject的Object_w_mtime继承,那么我就看不到发送和接收对象之间的任何差异。

我对Python还比较陌生。根据我的理解,每次将对象放入队列时,它会被序列化(pickled),然后通过管道发送到接收进程,接收进程再将对象反序列化(unpickled)。这样理解对吗?在对象被序列化和反序列化时,是否有可能导致对象的继承关系出现问题?

我在Ubuntu 11.10上测试了Python 2.7.2和2.6.7,以及在Ubuntu 11.04上测试了Python 2.7.1。有时你需要让它运行一分钟左右才能看到这种行为,因为它似乎是随机的。

我在这里抓住一些稻草,提前谢谢你。

import multiprocessing
import time
import traceback
import os
import random

class Object_w_mtime(object):
    '''
    Parent object that tracks the last time an attribute was modified
    '''
    def __setattr__(self,a_name,a_value):
        if ((a_name not in ('_mtime','_modified')) and
            (a_value != getattr(self,a_name,None))
        ):
            object.__setattr__(self, '_modified', True)
            object.__setattr__(self, '_mtime', time.time())
        object.__setattr__(self, a_name, a_value)
        return True
    #END def

    def reset(self):
        self._modified = False
#END class

class DummyObject(Object_w_mtime):
    def __init__(self):
        self.value = 10

def handler(in_queue = None, handler_id = None):
    print 'PID:' + str(os.getpid()) + ':handler{0}:<RUN>'.format(handler_id)
    while True:
        try:
            obj = in_queue.get(True,61)
            print 'handler{} - _modified'.format(handler_id), obj._modified, ' \t_mtime', obj._mtime
        except multiprocessing.queues.Empty:
            break
        except KeyboardInterrupt:
            break
        except Exception as e:
            print traceback.format_exc()
            break
    return True
#END def

def command_func(next_links = None):
    print 'PID:' + str(os.getpid()) + ':command_func:<RUN>'
    obj = DummyObject()
    while True:
        try:
            # randomly assign a different value to test with a modified and unmodified object
            obj.value = random.randint(0,1)
            print '**************** obj.value = {0} ***************'.format(obj.value)
            print 'command_ - _modified', obj._modified, ' \t_mtime', obj._mtime
            for each in next_links:
                each.put(obj,False)
        except multiprocessing.queues.Empty:
            break
        except KeyboardInterrupt:
            break
        except Exception as e:
            print e
            print traceback.format_exc()
            break
        obj.reset()
        time.sleep(3)
    return True
#END def


if __name__ == '__main__':
    handler_queues = list()
    handler_processes = list()
    # Create a queue and process object for each command handler
    for handler_id in range(1,4):
        queue = multiprocessing.Queue()
        process = multiprocessing.Process(target=handler, args=(queue, handler_id))
        handler_queues.append(queue)
        handler_processes.append(process)

    try:
        # spawn handler processes
        for process in handler_processes:
            process.start()
        # Start sending commands to handlers
        command_func(handler_queues)

    # exit on keyboard interrupt
    except KeyboardInterrupt:
        for process in handler_processes:
            process.join()
    except Exception:
        traceback.print_exc()

1 个回答

7

简单来说,就是你在把obj放到队列里之后又对它进行了修改。

看一下这个链接,在第285行,put()这个方法只是把对象放进一个内部的队列里,如果这个队列没有在运行,它会启动一个后台线程来处理队列里的对象。所以在你的代码中,each.put(obj,False)obj.reset()之间就会出现竞争。

你最好只用不可变的(或者说是对象的副本)来使用队列。

撰写回答