Python 线程和队列处理无限数据输入(流)
我想用线程来处理一个不断流入的数据。
我该怎么修改下面的代码,让它能处理一个无限的数据输入,比如用 itertools.count 来生成数据呢?
下面的代码在以下情况下可以正常工作:将 'for i in itertools.count():' 替换成 'for i in xrange(5):'
from threading import Thread
from Queue import Queue, Empty
import itertools
def do_work(q):
while True:
try:
x = q.get(block=False)
print (x)
except Empty:
break
if __name__ == "__main__":
work_queue = Queue()
for i in itertools.count():
work_queue.put(i)
threads = [Thread(target=do_work, args=(work_queue,)) for i in range(8)]
for t in threads: t.start()
for t in threads: t.join()
3 个回答
1
我可能理解错了,但难道在for
循环之前创建并启动线程就这么简单吗?
而且,让线程在没有工作的情况下就结束,听起来不是个好主意,因为将来可能会有更多的工作出现。你肯定希望它们能一直等待,直到有工作可做吧?
2
你需要用一个线程来填充队列。还需要管理队列的大小,特别是当处理这些项目的工作者需要时间的时候。你还得标记队列里的项目已经完成。如果这和你之前问的关于推特和“极快”输入有关,那你在数据库插入方面还有很多事情要做。
你的问题有点模糊,涉及的内容也比较复杂。看起来你对自己想要实现的目标理解得不够,甚至不知道这并不简单。我建议你在描述你想做的事情时,尽量具体一些。
这里有一个用线程来填充和消费队列的例子。这个例子里没有管理队列的大小。
from threading import Thread
from Queue import Queue, Empty, Full
import itertools
from time import sleep
def do_work(q,wkr):
while True:
try:
x = q.get(block=True,timeout=10)
q.task_done()
print "Wkr %s: Consuming %s" % (wkr,x)
sleep(0.01)
except Empty:
print "Wkr %s exiting, timeout/empty" % (wkr)
break
sleep(0.01)
def fill_queue(q,limit=1000):
count = itertools.count()
while True:
n = count.next()
try:
q.put(n,block=True,timeout=10)
except Full:
print "Filler exiting, timeout/full"
break
if n >= limit:
print "Filler exiting, reached limit - %s" % limit
break
sleep(0.01)
work_queue = Queue()
threads = [Thread(target=do_work, args=(work_queue,i)) for i in range(2)]
threads.insert(0,Thread(target=fill_queue,args=(work_queue,100)))
for t in threads:
t.start()
for t in threads:
t.join()
Wkr 0: Consuming 0
Wkr 1: Consuming 1
Wkr 0: Consuming 2
Wkr 1: Consuming 3
....
Wkr 1: Consuming 99
Filler exiting, reached limit - 100
Wkr 0: Consuming 100
Wkr 1 exiting, timeout/empty
Wkr 0 exiting, timeout/empty
2
这个问题是因为 itertools.count
会生成一个无限的序列。这就意味着,使用这个序列的循环永远不会结束。你应该把它放在一个单独的函数里,并让它在一个独立的线程中运行。这样的话,你就可以在一个线程中不断增加队列的内容,而其他工作线程则可以从这个队列中取数据。