Python队列折衷处理

2024-04-20 14:22:11 发布

您现在位置:Python中文网/ 问答频道 /正文

我有一个包含100个数字的队列,从1到100。首先我有一个进程来填充打印Queue filled的队列。接下来有两个函数打印队列的当前值。我尝试在进程之间打印队列的值。这是我的密码:

import multiprocessing as mp

def fillQueue(lookup,q):
    list(map(q.put,lookup))
    print('Queue filled')

def printQueue1(q):
    while not q.empty():
        print('Process 1:', (q.get()))
    print('Process 1: Queue is empty!')

def printQueue2(q):
    while not q.empty():
        print('Process 2:', (q.get()))
    print('Process 2: Queue is empty!')

if __name__ == "__main__":
    pool = mp.Pool(processes=3)
    manager = mp.Manager()
    q = manager.Queue()

    lookup = []
    count = 1
    while count < 101:
        lookup.append(count)
        count = count + 1

    p2 = pool.apply_async(printQueue1,(q,))
    p3 = pool.apply_async(printQueue2,(q,))
    p1 = pool.apply_async(fillQueue,(lookup,q))

    pool.close()
    pool.join()

这给了我:

Process 1: 1
Process 1: 2
Process 1: 3
Process 1: 4
Process 1: 5
Process 2: 6
Process 1: 7
Process 2: 8
Process 1: 9
Process 2: 10

我想得到的是:

Queue filled
Process 1: 1
Process 2: 2
Process 1: 3
Process 2: 4
Process 1: 5

有什么办法可以做到这一点吗?每次我运行程序都会得到不同的结果,所以奇怪的事情就发生了。谢谢


Tags: async队列queue进程defcountmplookup
2条回答

您可以为每个进程创建一个Queue对象,作为一个“指挥棒”,指示哪个进程从主队列中退出下一个项目,然后在每个worker函数的主循环中,它应该首先尝试从自己的“指挥棒”队列中退出,然后再尝试从主队列中退出,在这之后,它应该通过将一个项目排队到下一个进程的“指挥棒”队列来“传递指挥棒”到下一个进程。排队进程应该通过将一个项目排队到应该首先运行的进程的“指挥棒”队列来启动出列进程。这是因为Queue.get阻塞直到队列中有一个项目:

import multiprocessing as mp
import time

def fillQueue(lookup, q, baton_first):
    list(map(q.put,lookup))
    print('Queue filled')
    baton_first.put(None)

def printQueue(id, q, baton_self, baton_other):
    while True:
        baton_self.get()
        try:
            if q.empty():
                break
            print('Process %s:' % id, (q.get()))
        # use finally to always pass on the baton whether the loop breaks or not
        finally:
            baton_other.put(None)
        time.sleep(1) # the actual work should be performed here
    print('Process %s: Queue is empty!' % id)

if __name__ == "__main__":
    pool = mp.Pool(processes=3)
    manager = mp.Manager()
    q = manager.Queue()
    baton1 = manager.Queue()
    baton2 = manager.Queue()

    p2 = pool.apply_async(printQueue,(1, q, baton1, baton2))
    p3 = pool.apply_async(printQueue,(2, q, baton2, baton1))
    p1 = pool.apply_async(fillQueue, (list(range(1, 11)), q, baton1))

    pool.close()
    pool.join()

这将输出:

Queue filled
Process 1: 1
Process 2: 2
Process 1: 3
Process 2: 4
Process 1: 5
Process 2: 6
Process 1: 7
Process 2: 8
Process 1: 9
Process 2: 10
Process 1: Queue is empty!
Process 2: Queue is empty!

因此,apply_async异步应用进程——这意味着要触发运行的3个进程都同时运行,并且彼此之间有点冲突

由于您并不是决定性地触发这些进程,因此每次触发进程时,它们的运行顺序可能都会发生变化

我假设你想要:

  1. 进程尝试访问队列之前要填充的队列
  2. 要在进程之间平均分配的“工作”

即便如此,除非以某种方式约束函数,否则它们将get()项的顺序仍然是相当随机的。如果你真的需要函数1只得到赔率,函数2只得到偶数,而且它们的顺序很严格,你可能不需要多重处理

import multiprocessing as mp


def fillQueue(lookup, q):
    list(map(q.put, lookup))
    print('Queue filled')


def printQueue(q, id):
    while not q.empty():
        print('Process {}: {}'.format(id, q.get()))
    print('Process {}: Queue is empty!'.format(id))


if __name__ == "__main__":
    pool = mp.Pool(processes=3)
    manager = mp.Manager()
    q = manager.Queue()

    # no need to construct a list with a counter, we can just use the generator
    lookup = range(101)

    # do not fill the queue while processes are running, do it beforehand!
    fillQueue(lookup, q)

    # don't need different functions, since they are doing the same work
    # just fire off multiple copies of the same function
    p1 = pool.apply_async(printQueue, (q, 1,))
    p2 = pool.apply_async(printQueue, (q, 2,))

    pool.close()
    pool.join()

输出示例:

Queue filled
Process 2: 0
Process 2: 1
Process 2: 2
Process 2: 3
Process 2: 4
Process 2: 5
Process 1: 6
Process 2: 7
Process 1: 8
Process 2: 9
Process 2: 10
Process 1: 11

相关问题 更多 >