线程队列在Python中挂起

2024-05-15 10:30:28 发布

您现在位置:Python中文网/ 问答频道 /正文

我试图通过一个队列使解析器成为多线程的。似乎行得通,但我的队伍挂了。如果有人能告诉我如何解决这个问题,我将不胜感激,因为我很少编写多线程代码。在

此代码从Q:

from silk import *
import json
import datetime
import pandas
import Queue
from threading import Thread

l = []
q = Queue.Queue()

def parse_record():
    d = {}
    while not q.empty():
        rec = q.get()
        d['timestamp'] = rec.stime.strftime("%Y-%m-%d %H:%M:%S")
        # ... many ops like this
        d['dport'] = rec.dport
        l.append(d) # l is global

这就是Q:

^{pr2}$

我只在main中调用parse_records()。它永远不会结束。在


Tags: 代码fromimportjson解析器pandasdatetime队列
1条回答
网友
1楼 · 发布于 2024-05-15 10:30:28

Queue.empty doc上写着:

...if empty() returns False it doesn’t guarantee that a subsequent call to get() will not block.

作为最低要求,您应该使用get_nowait或冒数据丢失的风险。但更重要的是,只有当所有排队的项目都用Queue.task_done调用标记为完成时,联接才会释放:

If a join() is currently blocking, it will resume when all items have been processed (meaning that a task_done() call was received for every item that had been put() into the queue).

作为补充说明,l.append(d)不是原子的,应该用锁进行保护。在

from silk import *
import json
import datetime
import pandas
import Queue
from threading import Thread, Lock

l = []
l_lock = Lock()
q = Queue.Queue()

def parse_record():
    d = {}
    while 1:
        try:
            rec = q.getnowait()
            d['timestamp'] = rec.stime.strftime("%Y-%m-%d %H:%M:%S")
            # ... many ops like this
            d['dport'] = rec.dport
            with l_lock():
                l.append(d) # l is global
            q.task_done()
        except Queue.Empty:
            return

通过使用标准库中的线程池,可以大大缩短代码。在

^{pr2}$

相关问题 更多 >