如何让多线程系统在Python中共享同一字典

0 投票
4 回答
2519 浏览
提问于 2025-04-18 17:58

我有一个系统,它通过一个套接字接收数据,然后把这些数据存储到一个字典里,字典就像是我的数据库。之后,我的其他模块(比如图形界面、数据分析、写日志等)会访问这个字典,做它们需要做的事情,比如创建小部件或者把字典的内容复制到日志文件里。不过,由于这些模块的工作速度不同,我决定让每个模块在自己的线程中运行,这样我可以控制它们的工作频率。

在主运行函数里,有类似这样的代码:

    from threading import Thread
    import data_collector
    import write_to_log_file

    def main():
        db = {}
        receive_data_thread = Thread(target=data_collector.main, arg=(db,))
        recieve_data_thread.start()  # writes to dictionary  @ 50 Hz
        log_data_thread = Thread(target=write_to_log_file.main, arg(db,))
        log_data_thread.start()  # reads dictionary  @ 1 Hz

但是看起来这两个模块并没有在同一个字典实例上工作,因为日志数据线程只打印出一个空字典,即使数据收集器显示它已经把数据插入到字典里。

字典只有一个写入者,所以我不需要担心线程之间会互相干扰,我只需要想办法让所有模块在写入字典的同时都能读取到当前的数据库内容。

4 个回答

0

抱歉,我自己解决了问题,真是笨。我的模块在同一个字典上工作,但我的日志记录器没有放在一个while True循环里,所以它只执行了一次就结束了线程,这样我的字典只被记录到磁盘一次。因此,我让write_to_log_file.main(db)不断地以1Hz的频率写入,永远不停,并且设置log_data_thread.deamon = True,这样一旦写入线程(它不会是一个守护线程)退出,就会结束。感谢大家对这种系统最佳实践的建议。

0

这应该不是个问题。我还假设你在使用线程模块。为了弄清楚为什么数据收集器和写入日志文件的功能不正常,我需要了解它们具体在做什么。

实际上,你甚至可以让多个线程同时写入,这也不会有问题,因为全局解释器锁(GIL)会处理所有需要的锁定。不过,要注意的是,你永远无法从中获得超过一个CPU的工作效率。

下面是一个简单的例子:

import threading, time

def addItem(d):
  c = 0
  while True:
    d[c]="test-%d"%(c)
    c+=1
    time.sleep(1)

def checkItems(d):
  clen = len(d)
  while True:
    if clen < len(d):
      print "dict changed", d
      clen = len(d)
    time.sleep(.5)

DICT = {}
t1 = threading.Thread(target=addItem, args=(DICT,))
t1.daemon = True
t2 = threading.Thread(target=checkItems, args=(DICT,))
t2.daemon = True
t1.start()
t2.start()

while True:
  time.sleep(1000)
0

使用一个队列(Queue.Queue)来把数据从多个读取线程传递给一个写入线程。把这个队列的实例传递给每个 data_collector.main 函数。这样它们就可以调用队列的 put 方法,把数据放进去。

与此同时,write_to_log_file.main 也应该接收同一个队列实例,这样它就可以调用队列的 get 方法,从队列中取出数据。当从队列中取出数据后,可以把这些数据添加到一个 dict(字典)里。

另外,看看 Alex Martelli 的文章,了解为什么 Queue.Queue 是 CPython 多线程的秘密武器

3

与其使用内置的 dict,你可以考虑使用 multiprocessing 库中的 Manager 对象:

from multiprocessing import Manager
from threading import Thread
from time import sleep

manager = Manager()
d = manager.dict()

def do_this(d):
    d["this"] = "done"

def do_that(d):
    d["that"] ="done"

thread0 = Thread(target=do_this,args=(d,))
thread1 = Thread(target=do_that,args=(d,))
thread0.start()
thread1.start()
thread0.join()
thread1.join()

print d

这样你就可以得到一个标准库里的线程安全的同步字典,这样在不改变设计的情况下,应该很容易就能替换到你现在的实现中。

撰写回答