通过多进程队列模块传递列表给函数时出错

2 投票
1 回答
1517 浏览
提问于 2025-04-17 04:40

我在使用队列和多进程时遇到了一个奇怪的错误。我有一个函数,它接收一个列表,然后对这个列表进行解析。

因为我需要做很多次这个操作,所以我想把任务分散到我电脑的多个核心上去处理。我尝试设置一个队列,但它没有正常工作。以下是我的代码:

# Establish communication queues
tasks = multiprocessing.Queue()

# Start consumers
num_consumers = multiprocessing.cpu_count() * 2
print 'Creating %d consumers' % num_consumers
worker = [ rules(tasks)
          for i in xrange(num_consumers) ]
for w in consumers:
    w.start()


def loadGraph(dayCurrent, day2Previous):



for dayCurrentCount  in graph[dayCurrent]:
    dayCurrentValue = graph[dayCurrent][dayCurrentCount]
    for day1Count  in graph[day2Previous]:
        day1Value = graph[day2Previous][day1Count]
        rulesDataToPass = [day1Count, day1Value, dayCurrentCount, dayCurrentValue, dayCurrent, day2Previous]

        tasks.put(rulesDataToPass)



tasks.close()
tasks.join_thread()

我不太确定自己哪里出错了。如果不使用队列,代码运行得很好。问题是我的函数需要多个变量才能运行(之前我是单独发送这些变量,但后来决定把它们放在一个列表里,这样可以一起放入队列中)。

请帮帮我!

提前谢谢你!

更新:根据要求,这里是错误追踪信息:

Traceback (most recent call last):
  File "/Users/lostsoul/Dropbox/code/learning/python/game.py", line 213, in <module>
    for i in xrange(num_consumers) ]
  File "Users/lostsoul/Dropbox/code/learning/python/game.py", line 191, in rules
    day1Count = dict[0]
TypeError: 'Queue' object does not support indexing

基本上,我把数据发送到一个叫做'rules'的函数,字典会被解析成 day1Count = dict[0],day1value = dict[1],等等。我基本上是在比较两个字典。队列是通过一个嵌套的for循环生成的,它将每个条目配对在一起(然后添加到队列中,我希望这些能在我的所有CPU上处理……现在单独运行可以,但用一个CPU要花15分钟)。

更新2:这是rules函数。它很简单,基本上是接收值,然后把它们传递给其他函数(这是我自己的规则引擎版本)。

def rules(dict):
    day1Count = dict[0]
    day1Value = dict[1]
    dayCurrentCount = dict[2] 
    dayCurrentValue = dict[3] 
    dayCurrent = dict[4] 
    day2Previous = dict[5]
    exactSame(day1Count, day1Value, dayCurrentCount, dayCurrentValue, dayCurrent, day2Previous)
    withinFivePercentChange(day1Count, day1Value, dayCurrentCount, dayCurrentValue, dayCurrent, day2Previous)
    deleteNonEdgeNodes(dayCurrentCount, dayCurrentValue, dayCurrent)

不使用队列时,这个函数运行得很好。

1 个回答

2

这行提示 TypeError: 'Queue' object does not support indexing 是在告诉你,你的 dict 变量实际上是一个 Queue 类型(这可能不是你想要的)。请仔细检查第191行及之前的代码,确保你正确地给 dict 变量赋值。


另外,绝对不要 使用类型名称作为变量名。dict 是一种类型,你不应该用它作为字典变量的名称。可以用 my_dict 或类似的名字。使用 dict 作为变量名可能会导致你把其他变量设置成一个字典对象,而不是给 dict 变量赋值。

想了解更多关于 dict 类的信息,可以查看 stdtypes


问题在于,你把 tasks(这是一个 Queue)传给了 rules(),而 rules() 期待的是一个可迭代的类型(比如列表):

tasks = multiprocessing.Queue()
worker = [rules(tasks) for i in xrange(num_consumers)]

def rules(dict):
    day1Count = dict[0]
    day1Value = dict[1]
    dayCurrentCount = dict[2] 
    dayCurrentValue = dict[3] 

撰写回答