在任务处理期间更改python-gearman工作者任务
我正在尝试在python-gearman工作者的工作周期中更改可用的任务。这样做的原因是为了让我对工作进程有一点控制权,并允许它们从数据库中重新加载数据。我需要每个工作者定期重新加载,但我不想简单地终止这些进程,我希望服务能够持续可用,这意味着我必须分批重新加载。因此,我会让4个工作者在重新加载的同时,另外4个工作者可以继续处理任务,然后再重新加载下4个工作者。
具体流程如下:
- 启动重新加载流程4次。
- 取消注册
reload
任务 - 重新加载数据集
- 注册一个
finishReload
任务 - 返回
- 取消注册
- 重复步骤1,直到没有工作者注册了
reload
任务。 - 启动
finishReload
(1)任务,直到没有工作者可用finishReload
任务。
(1) finishReload任务会取消注册finishReload
任务,并注册reload
任务,然后返回。
现在,我遇到的问题是,当我更改工作者进程可用的任务时,作业会失败。没有错误信息或异常,只是在garmand日志中显示“ERROR”。这里有一个简单的程序可以复现这个问题。
工作者
import gearman
def reversify(gmWorker, gmJob):
return "".join(gmJob.data[::-1])
def strcount(gmWorker, gmJob):
gmWorker.unregister_task('reversify') # problem line
return str(len(gmJob.data))
worker = gearman.GearmanWorker(['localhost:4730'])
worker.register_task('reversify', reversify)
worker.register_task('strcount', strcount)
while True:
worker.work()
客户端
import gearman
client = gearman.GearmanClient(['localhost:4730'])
a = client.submit_job('reversify', 'spam and eggs')
print a.result
>>> sgge dna maps
a = client.submit_job('strcount', 'spam and eggs')
...
如果有任何我可以进一步解释的地方,请告诉我。
编辑:我知道会有人问我提到的日志。我也在Google的gearman小组发布了这个问题,日志可以在那找到。
2 个回答
0
从表面上看,问题似乎是你在开始一个工作后,就在工作还没完成之前,把负责这个工作的工人从工作服务器上注销了。
1
看起来可以通过创建一个GearmanWorker类的子类,并添加一些标志来解决这个问题。我需要确保当前的工作完成后,才能从工作者向服务器发送新的命令,因为这似乎会打断正在进行的工作。因此,如果我们重写on_job_complete
这个函数,就可以在发送send_job_complete
命令后检查启用/禁用的标志,并根据这些标志采取相应的行动。下面是新的工作者程序:
工作者
import gearman
def reversify(gmWorker, gmJob):
return "".join(gmJob.data[::-1])
def enable_reversify(gmWorker, gmJob):
myWorker.enableReversify = 1
return 'OK'
def strcount(gmWorker, gmJob):
myWorker.enableReversify = -1
return str(len(gmJob.data))
class myWorker(gearman.GearmanWorker):
enableReversify = 0 # 0 = do nothing, -1 = turn off, 1 = turn on
def on_job_complete(self, current_job, job_result):
self.send_job_complete(current_job, job_result)
### check the flag here and enable or disable tasks ###
if myWorker.enableReversify == -1:
self.unregister_task('reversify')
if myWorker.enableReversify == 1:
self.register_task('reversify', reversify)
myWorker.enableReversify = 0 # reset the flag
return True
worker = myWorker(['localhost:4730'])
worker.register_task('reversify', reversify)
worker.register_task('strcount', strcount)
worker.register_task('enableReversify', enable_reversify)
while True:
worker.work()