如何在Python中迭代dict代理?

2024-03-29 09:06:57 发布

您现在位置:Python中文网/ 问答频道 /正文

我使用Python的multiprocessing.Manager来共享对一个进程将生成而其他进程将查看的数据集的访问。但是,我遇到的问题是,manager.dict()返回的dict代理不支持iteritems()

我可以遍历items(),但这意味着构建dict中所有项的新元组,这是一个很大的数字。有没有一种方法可以在不构造中间列表/元组的情况下这样做,从而只使用恒定数量的额外内存?

注意:如果解决方案要求生成过程暂停迭代,则可以。


Tags: 数据方法代理列表数量进程manager情况
3条回答

您可以使用SyncManager类来注册您自己的类型。然后可以在该类型上实现方法,例如,只从dict获取有限数量的项

下面是一个让您开始的示例:

import multiprocessing
from multiprocessing import managers


class TakerDict(dict):
    """Like a dict, but allows taking a limited number of items."""

    def take(self, items=1):
        """Take the first `items` items."""
        return [item for _, item in zip(range(items), self.items())]


# NOTE: add other dict methods to the tuple if you need them.
TakerProxy = managers.MakeProxyType('TakerProxy', ('take',))

managers.SyncManager.register('taker', TakerDict, TakerProxy)


if __name__ == '__main__':
    manager = multiprocessing.Manager()
    taker = manager.taker()
    # in other processes, use e.g. taker.take(5)

因此,要限制内存使用,您必须反复调用管理器进程以获取下一批元素。

但是,要做到这一点,dict必须支持索引(这样就可以从特定的偏移量恢复)。由于您不能访问dict中元素的底层顺序,因此最好使用列表(例如manager.list())。然后在您的子流程中,请求列表的len(),并按片索引以获得适当大小的批处理—您不需要为此注册任何代理类型。

您可以迭代keys()以减少内存占用。你得防止钥匙被删除。

否则,这里有一个具有两种不同方式的示例,可以让您迭代dict中的项。此示例中的iteritems()方法仅适用于创建manager对象的进程和manager对象创建的子进程。这是因为需要manager对象来创建新的代理,而其他进程无权访问它。iteritems2()方法从其他进程工作,因为它不依赖于在这些进程中创建新代理。

import multiprocessing as mp
import multiprocessing.managers

class mydict(dict):
    def __init__(self, *args, **kwargs):
        dict.__init__(self, *args, **kwargs)
        self.iters = {}

    def iteritems(self):
        print "iteritems", mp.current_process()
        return dict.iteritems(self)

    def _iteritems_start(self):
        print "_iteritems_start", mp.current_process()
        i = dict.iteritems(self)
        self.iters[id(i)] = i
        return id(i)

    def _iteritems_next(self, iter_id):
        try:
            return self.iters[iter_id].next()
        except StopIteration:
            del self.iters[iter_id]
            return None

class mydict_proxy(mp.managers.DictProxy):
    def iteritems(self):
        print "iteritems proxy", mp.current_process()
        return self._callmethod("iteritems")

    def iteritems2(self):
        print "iteritems2 proxy", mp.current_process()
        iter_id = self._callmethod("_iteritems_start")
        def generator():
            while True:
                a = self._callmethod("_iteritems_next", 
                             (iter_id,))
                if a == None:
                    return
                yield a
        return generator()

    _method_to_typeid_ = { "iteritems": "Iterator" }
    _exposed_ = mp.managers.DictProxy._exposed_
    _exposed_ += ("iteritems", "_iteritems_start", "_iteritems_next")

class mymanager(mp.managers.BaseManager):
    pass
mymanager.register("mydict", mydict, mydict_proxy)
mymanager.register("Iterator", proxytype = mp.managers.IteratorProxy,
           create_method = False)

def other(d):
    for k, v in d.iteritems2():
        d[k] = v.lower()
    for k, v in d.iteritems():
        d[k] = ord(v)

def main():
    manager = mymanager()
    manager.start()
    d = manager.mydict(list(enumerate("ABCDEFGHIJKLMNOP")))
    for (k, v) in d.iteritems():
        print k, v
    proc = mp.Process(target = other, args = (d,))
    proc.start()
    proc.join()
    for (k, v) in d.iteritems():
        print k, v

if __name__ == "__main__":
    main()

请注意,虽然这段代码可能更节省内存,但可能会慢得多。

iteritems()用于列表dict。可以使用for循环。或者可以说sorted(),它将返回排序列表中的键,然后遍历该列表并执行dict[key]。希望能有所帮助。如果有更好的办法。一定要和我分享。我很想知道。

相关问题 更多 >