添加、删除、读取到python Queu的意外行为

2024-06-06 11:58:47 发布

您现在位置:Python中文网/ 问答频道 /正文

我想在python队列中存储一个列表。与其将列表结构存储为单个队列项,不如尝试将列表中的每个元素逐个添加到队列中。你知道吗

from multiprocessing import Process,Queue;
import time;
import sys;
def something1(q):
    q.put("hello");
    time.sleep(2);
    q.put("hello4");

def something2(q):
    q.put("hello2");
    time.sleep(1);
    q.put("hello3");


def run():
    queue = Queue();
    t1=Process(target=something1,args=(queue,));
    t1.start();
    t2=Process(target=something2,args=(queue,));
    t2.start();
    time.sleep(3);
    #queue.task_done();
    lol = [];
    while(queue.qsize() !=0):
        lol.append(queue.get(False));

    for l in lol:
        print("inside lol",l);
        queue.put(l);
    print("Queue size",queue.qsize());
    sys.stdout.flush();
    while(queue.qsize() !=0):
        print("inside queue",queue.get(False));
run();

我认为测试代码说明了一切。。。我只想从一个队列(实际上是一个pop)中“获取”,然后在完成我正在做的事情之后,我想将整个列表重新添加到队列中,以便其他进程可以使用该结构。你知道吗

人们会期望:

('inside lol', 'hello')
('inside lol', 'hello2')
('inside lol', 'hello3')
('inside lol', 'hello4')
('Queue size', 4)
('inside queue', 'hello')
('inside queue', 'hello2')
('inside queue', 'hello3')
('inside queue', 'hello4')

但我得到的却是:

('inside lol', 'hello')
('inside lol', 'hello2')
('inside lol', 'hello3')
('inside lol', 'hello4')
('Queue size', 4L)
Traceback (most recent call last):
  File "mpTest.py", line 34, in <module>
    run();
  File "mpTest.py", line 33, in run
    print("inside queue",queue.get(False));
  File "/usr/lib/python2.7/multiprocessing/queues.py", line 134, in get
    raise Empty
Queue.Empty

队列大小是4L?呵呵?你知道吗

第二个让人困惑的是,我的Queue对象没有函数“task\u done”,这很荒谬,因为在docs中它肯定存在。你知道吗

最让人困惑的是,为什么我不能像这样放,放,放?这对于我正在进行的一个更大的项目来说是非常糟糕的,也就是说,这只是一个简单的测试脚本来帮助我理解我做错了什么。你知道吗


Tags: runinhello列表gettime队列queue
1条回答
网友
1楼 · 发布于 2024-06-06 11:58:47

每当遇到这样的错误时,都需要添加一个快速睡眠来让队列刷新内存。python multiprocessing文档似乎暗示了这一点:

Warning As mentioned above, if a child process has put items on a queue (and it has not used JoinableQueue.cancel_join_thread), then that process will not terminate until all buffered items have been flushed to the pipe. This means that if you try joining that process you may get a deadlock unless you are sure that all items which have been put on the queue have been consumed. Similarly, if the child process is non-daemonic then the parent process may hang on exit when it tries to join all its non-daemonic children.

所以,你的代码对我来说运行良好:

from multiprocessing import Process,Queue;
import time;
import sys;
def something1(q):
    q.put("hello");
    time.sleep(2);
    q.put("hello4");

def something2(q):
    q.put("hello2");
    time.sleep(1);
    q.put("hello3");


def run():
    queue = Queue();
    t1=Process(target=something1,args=(queue,));
    t1.start();
    t2=Process(target=something2,args=(queue,));
    t2.start();
    time.sleep(3);
    #queue.task_done();
    lol = [];
    while(queue.qsize() !=0):
        lol.append(queue.get(False));

    for l in lol:
        print("inside lol",l);
        queue.put(l);
    print("Queue size",queue.qsize());
    sys.stdout.flush();
    time.sleep(0.1) # added by me
    while(queue.qsize() !=0):
        print("inside queue",queue.get(False));
run();

请注意,关键行是

time.sleep(0.1)

在再次排队之前。你知道吗

相关问题 更多 >