在作业处理过程中改变Python工人的任务

2024-06-13 00:20:58 发布

您现在位置:Python中文网/ 问答频道 /正文

我正在尝试更改pythongearman worker在其工作周期中可用的任务。我这样做的原因是允许我对我的工作进程有一点控制,并允许它们从数据库中重新加载。我需要每个工作人员定期重新加载,但我不想简单地终止进程,我希望服务始终可用,这意味着我必须成批重新加载。所以我将有4个工人重新加载,而另外4个工人可以处理,然后重新加载下4个工人。在

流程:

  1. 启动重新加载过程4次。
    1. 注销reload进程
    2. 重新加载数据集
    3. 注册finishReload任务
    4. 返回
  2. 重复步骤1,直到没有注册了reload任务的worker。在
  3. 启动finishReload(1)任务,直到没有可用的finishReload任务的工人。在

(1)finishReload任务取消注册finishReload任务并注册{}任务,然后返回。在

现在,我遇到的问题是,当我更改工作进程可用的任务时,作业失败。没有错误消息或异常,只有gearmand日志中的“error”。这里有一个快速的程序可以复制这个问题。在

工人

import gearman 
def reversify(gmWorker, gmJob): 
        return "".join(gmJob.data[::-1]) 
def strcount(gmWorker, gmJob): 
        gmWorker.unregister_task('reversify')  # problem line 
        return str(len(gmJob.data)) 
worker = gearman.GearmanWorker(['localhost:4730']) 
worker.register_task('reversify', reversify) 
worker.register_task('strcount', strcount) 
while True: 
        worker.work() 

客户

^{pr2}$

如果有什么需要我解释的,请告诉我。在

编辑:我知道有人会要求查看我提到的日志。我也把这个问题发到了Google上的gearman小组,并且log is available there。在


Tags: registertaskdatareturn进程defreloadworker
2条回答

快速浏览一下,问题似乎是您正在启动一个作业,然后在作业完成之前从作业服务器取消注册workers执行该作业的能力。在

看起来,将GearmanWorker类子类化并添加一些标志可以解决这个问题。在我开始从工作线程向服务器发出新命令之前,我需要让作业完成,这似乎会中断当前作业。因此,如果我们重写on_job_complete函数,我们可以检查启用/禁用标志,并在发出send_job_complete命令后对其进行操作。新工人计划如下:

工人

import gearman

def reversify(gmWorker, gmJob):
        return "".join(gmJob.data[::-1])

def enable_reversify(gmWorker, gmJob):
        myWorker.enableReversify = 1
        return 'OK'

def strcount(gmWorker, gmJob):
        myWorker.enableReversify = -1
        return str(len(gmJob.data))

class myWorker(gearman.GearmanWorker):

        enableReversify = 0 # 0 = do nothing, -1 = turn off, 1 = turn on

        def on_job_complete(self, current_job, job_result):
                self.send_job_complete(current_job, job_result)
                ### check the flag here and enable or disable tasks ###
                if myWorker.enableReversify == -1:
                        self.unregister_task('reversify')
                if myWorker.enableReversify == 1:
                        self.register_task('reversify', reversify)
                myWorker.enableReversify = 0 # reset the flag
                return True

worker = myWorker(['localhost:4730']) 
worker.register_task('reversify', reversify)
worker.register_task('strcount', strcount)
worker.register_task('enableReversify', enable_reversify)

while True:
        worker.work() 

相关问题 更多 >