使用可再生队列的多进程
我正在尝试编写一个使用多进程队列的Python程序。
我有多台服务器,其中一台会通过以下方式远程提供这个队列:
from multiprocessing.managers import BaseManager
import Queue
import daemonme
queue = Queue.Queue()
class QueueManager(BaseManager):
pass
daemonme.createDaemon()
QueueManager.register('get_job', callable=lambda:queue)
m = QueueManager(address=('', 50000), authkey='')
s = m.get_server()
s.serve_forever()
现在我想用我的双Xeon四核服务器来处理这个远程队列中的任务。这些任务彼此之间是完全独立的。所以如果我有8个核心,我希望启动7个进程,每个进程从队列中取一个任务,处理完后再去取下一个。每个这7个进程都会这样做,但我对这个程序的结构还不是很明白。
有没有人能给我一些关于基本结构的建议呢?
提前谢谢大家。
2 个回答
0
你应该使用主从(也叫农民-工人)模式。最开始的进程是主进程,它负责创建任务。
- 它会创建一个队列
- 然后创建7个从进程,并把这个队列作为参数传给它们
- 接着开始往队列里写任务
这些从进程会不停地从队列中读取任务,并执行这些任务(可能会一直执行,直到从队列收到停止的消息)。在这种情况下,我认为不需要使用管理对象。
2
查看文档,了解如何从管理器中获取一个队列(第17.6.2.7段)。然后,使用工作池(第17.6.2.9段)启动7个任务,并将队列传递给每个任务。
另外,你也可以考虑类似于生产者/消费者问题的方案:
from multiprocessing.managers import BaseManager
import random
class Producer():
def __init__(self):
BaseManager.register('queue')
self.m = BaseManager(address=('hostname', 50000), authkey='jgsjgfdjs')
self.m.connect()
self.cm_queue = self.m.queue()
while 1:
time.sleep(random.randint(1,3))
self.cm_queue.put(<PUT-HERE-JOBS>)
from multiprocessing.managers import BaseManager
import time
import random
class Consumer():
def __init__(self):
BaseManager.register('queue')
self.m = BaseManager(address=('host', 50000), authkey='jgsjgfdjs')
self.m.connect()
self.queue = self.m.queue()
while 1:
<EXECUTE(job = self.queue.get())>
from multiprocessing.managers import BaseManager, Queue
class Manager():
def __init__(self):
self.queue = QueueQueu()
BaseManager.register('st_queue', callable=lambda:self.queue)
self.m = BaseManager(address=('host', 50000), authkey='jgsjgfdjs')
self.s = self.m.get_server()
self.s.serve_forever()