在任务处理期间更改python-gearman工作者任务

0 投票
2 回答
1198 浏览
提问于 2025-04-16 14:01

我正在尝试在python-gearman工作者的工作周期中更改可用的任务。这样做的原因是为了让我对工作进程有一点控制权,并允许它们从数据库中重新加载数据。我需要每个工作者定期重新加载,但我不想简单地终止这些进程,我希望服务能够持续可用,这意味着我必须分批重新加载。因此,我会让4个工作者在重新加载的同时,另外4个工作者可以继续处理任务,然后再重新加载下4个工作者。

具体流程如下:

  1. 启动重新加载流程4次。
    1. 取消注册reload任务
    2. 重新加载数据集
    3. 注册一个finishReload任务
    4. 返回
  2. 重复步骤1,直到没有工作者注册了reload任务。
  3. 启动finishReload(1)任务,直到没有工作者可用finishReload任务。

(1) finishReload任务会取消注册finishReload任务,并注册reload任务,然后返回。

现在,我遇到的问题是,当我更改工作者进程可用的任务时,作业会失败。没有错误信息或异常,只是在garmand日志中显示“ERROR”。这里有一个简单的程序可以复现这个问题。

工作者

import gearman 
def reversify(gmWorker, gmJob): 
        return "".join(gmJob.data[::-1]) 
def strcount(gmWorker, gmJob): 
        gmWorker.unregister_task('reversify')  # problem line 
        return str(len(gmJob.data)) 
worker = gearman.GearmanWorker(['localhost:4730']) 
worker.register_task('reversify', reversify) 
worker.register_task('strcount', strcount) 
while True: 
        worker.work() 

客户端

import gearman 
client = gearman.GearmanClient(['localhost:4730']) 
a = client.submit_job('reversify', 'spam and eggs') 
print a.result 
>>> sgge dna maps 

a = client.submit_job('strcount', 'spam and eggs') 
...

如果有任何我可以进一步解释的地方,请告诉我。

编辑:我知道会有人问我提到的日志。我也在Google的gearman小组发布了这个问题,日志可以在那找到

2 个回答

0

从表面上看,问题似乎是你在开始一个工作后,就在工作还没完成之前,把负责这个工作的工人从工作服务器上注销了。

1

看起来可以通过创建一个GearmanWorker类的子类,并添加一些标志来解决这个问题。我需要确保当前的工作完成后,才能从工作者向服务器发送新的命令,因为这似乎会打断正在进行的工作。因此,如果我们重写on_job_complete这个函数,就可以在发送send_job_complete命令后检查启用/禁用的标志,并根据这些标志采取相应的行动。下面是新的工作者程序:

工作者

import gearman

def reversify(gmWorker, gmJob):
        return "".join(gmJob.data[::-1])

def enable_reversify(gmWorker, gmJob):
        myWorker.enableReversify = 1
        return 'OK'

def strcount(gmWorker, gmJob):
        myWorker.enableReversify = -1
        return str(len(gmJob.data))

class myWorker(gearman.GearmanWorker):

        enableReversify = 0 # 0 = do nothing, -1 = turn off, 1 = turn on

        def on_job_complete(self, current_job, job_result):
                self.send_job_complete(current_job, job_result)
                ### check the flag here and enable or disable tasks ###
                if myWorker.enableReversify == -1:
                        self.unregister_task('reversify')
                if myWorker.enableReversify == 1:
                        self.register_task('reversify', reversify)
                myWorker.enableReversify = 0 # reset the flag
                return True

worker = myWorker(['localhost:4730']) 
worker.register_task('reversify', reversify)
worker.register_task('strcount', strcount)
worker.register_task('enableReversify', enable_reversify)

while True:
        worker.work() 

撰写回答