在两个线程间访问数据

1 投票
2 回答
556 浏览
提问于 2025-04-18 07:24

我们正在尝试在两个线程之间访问数据,但一直没能成功。我们希望能找到一种简单(而优雅)的方法。

这是我们现在的代码。目标是:在第二个线程/进程完成后,实例 B 中的 listHolder 必须包含 2 个项目。

Class A:
   self.name = "MyNameIsBlah"

Class B:
   # Contains a list of A Objects. Is now empty.
   self.listHolder = []

   def add(self, obj):
      self.listHolder.append(obj)

   def remove(self, obj):
      self.listHolder.remove(obj)

def process(list):
    # Create our second instance of A in process/thread
    secondItem = A()
    # Add our new instance to the list, so that we can access it out of our process/thread.
    list.append(secondItem)

# Create new instance of B which is the manager. Our listHolder is empty here. 
manager = B()

# Create new instance of A which is our first item
firstItem = A()

# Add our first item to the manager. Our listHolder now contains one item now.
b.add(firstItem)

# Start a new seperate process.
p = Process(target=process, args=manager.listHolder)

# Now start the thread
p.start()

# We now want to access our second item here from the listHolder, which was initiated in the seperate process/thread.

print len(manager.listHolder) << 1
print manager.listHolder[1] << ERROR
  • 期望的输出:在 listHolder 中有 2 个 A 实例。
  • 实际输出:在 listHolder 中只有 1 个 A 实例。

我们该如何在管理器中访问我们的对象,使用分开的进程/线程,这样它们可以同时运行两个函数,而不会互相阻塞。

目前我们尝试用进程来实现这个目标,但如果线程能更简单地做到这一点,那也没问题。我们使用的是 Python 2.7。

更新 1:

@James Mills 提到可以使用 ".join()"。但是,这会阻塞主线程,直到第二个进程完成。我尝试过这个,但在这个例子中使用的进程永远不会停止执行(while True)。它会像一个计时器,必须能够 遍历一个列表从列表中移除对象

有没有人有建议,如何实现这个目标并修复当前的 cPickle 错误?

2 个回答

2

如果James Mills的回答对你没有帮助,这里有一篇关于如何使用队列来明确地在工作进程之间发送数据的说明:

#!/usr/bin/env python

import logging, multiprocessing, sys


def myproc(arg):
    return arg*2

def worker(inqueue, outqueue):
    logger = multiprocessing.get_logger()
    logger.info('start')
    while True:
        job = inqueue.get()
        logger.info('got %s', job)
        outqueue.put( myproc(job) )

def beancounter(inqueue):
    while True:
        print 'done:', inqueue.get()

def main():
    logger = multiprocessing.log_to_stderr(
            level=logging.INFO,
    )
    logger.info('setup')

    data_queue = multiprocessing.Queue()
    out_queue = multiprocessing.Queue()

    for num in range(5):
        data_queue.put(num)

    worker_p = multiprocessing.Process(
        target=worker, args=(data_queue, out_queue), 
        name='worker',
    )
    worker_p.start()

    bean_p = multiprocessing.Process(
        target=beancounter, args=(out_queue,),
        name='beancounter',
        )
    bean_p.start()

    worker_p.join()
    bean_p.join()
    logger.info('done')


if __name__=='__main__':
    main()

来源:Django多进程和放入后队列为空

另一个使用多进程管理器来处理数据的例子在这里:

http://johntellsall.blogspot.com/2014/05/code-multiprocessing-producerconsumer.html

0

在不同的程序之间共享数据,有一种非常简单的方法,就是使用 multiprocessing.Manager 这个类来同步数据。这个类内部其实是用一个叫 Queue 的东西来实现的。

示例:

from multiprocessing import Process, Manager

def f(d, l):
    d[1] = '1'
    d['2'] = 2
    d[0.25] = None
    l.reverse()

if __name__ == '__main__':
    manager = Manager()

    d = manager.dict()
    l = manager.list(range(10))

    p = Process(target=f, args=(d, l))
    p.start()
    p.join()

    print d
    print l

输出结果:

bash-4.3$ python -i foo.py 
{0.25: None, 1: '1', '2': 2}
[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]
>>> 

注意: 在共享和附加到你的 Process 类的对象时,要小心对象的类型,因为这可能会导致一些序列化的问题。想了解更多,可以查看这个链接: Python 多进程序列化错误

撰写回答