使用可再生队列的多进程

2 投票
2 回答
2002 浏览
提问于 2025-04-15 13:50

我正在尝试编写一个使用多进程队列的Python程序。

我有多台服务器,其中一台会通过以下方式远程提供这个队列:

from multiprocessing.managers import BaseManager
import Queue
import daemonme

queue = Queue.Queue()

class QueueManager(BaseManager):
    pass

daemonme.createDaemon()
QueueManager.register('get_job', callable=lambda:queue)
m = QueueManager(address=('', 50000), authkey='')
s = m.get_server()
s.serve_forever()

现在我想用我的双Xeon四核服务器来处理这个远程队列中的任务。这些任务彼此之间是完全独立的。所以如果我有8个核心,我希望启动7个进程,每个进程从队列中取一个任务,处理完后再去取下一个。每个这7个进程都会这样做,但我对这个程序的结构还不是很明白。

有没有人能给我一些关于基本结构的建议呢?

提前谢谢大家。

2 个回答

0

你应该使用主从(也叫农民-工人)模式。最开始的进程是主进程,它负责创建任务。

  1. 它会创建一个队列
  2. 然后创建7个从进程,并把这个队列作为参数传给它们
  3. 接着开始往队列里写任务

这些从进程会不停地从队列中读取任务,并执行这些任务(可能会一直执行,直到从队列收到停止的消息)。在这种情况下,我认为不需要使用管理对象。

2

查看文档,了解如何从管理器中获取一个队列(第17.6.2.7段)。然后,使用工作池(第17.6.2.9段)启动7个任务,并将队列传递给每个任务。

另外,你也可以考虑类似于生产者/消费者问题的方案:

from multiprocessing.managers import BaseManager
import random

class Producer():
def __init__(self):
    BaseManager.register('queue')
    self.m = BaseManager(address=('hostname', 50000), authkey='jgsjgfdjs')
    self.m.connect()
    self.cm_queue = self.m.queue()
    while 1:
        time.sleep(random.randint(1,3))
        self.cm_queue.put(<PUT-HERE-JOBS>)

from multiprocessing.managers import BaseManager
import time
import random
class Consumer():
def __init__(self):
    BaseManager.register('queue')

    self.m = BaseManager(address=('host', 50000), authkey='jgsjgfdjs')
    self.m.connect()
    self.queue = self.m.queue()
    while 1:
        <EXECUTE(job = self.queue.get())>


from multiprocessing.managers import BaseManager, Queue
class Manager():

def __init__(self):

    self.queue = QueueQueu()

    BaseManager.register('st_queue', callable=lambda:self.queue)

    self.m = BaseManager(address=('host', 50000), authkey='jgsjgfdjs')
    self.s = self.m.get_server()

    self.s.serve_forever()

撰写回答