Django-q2 类函数的任务调度钩子
我有一个这样的对象:
class TestApp(models.Model):
cron = models.CharField(max_length=200)
args = models.CharField(max_length=200)
test_function = models.ForeignKey(TestFunction, on_delete=models.CASCADE)
scheduled_task = models.ForeignKey(Schedule, blank=True, null=True, on_delete=models.SET_NULL)
def get_args(self):
return ast.literal_eval(self.args)
def run_function(self):
Module = __import__(self.test_function.library_name)
func = getattr(Module, self.test_function.function_name)
result = func(*self.get_args())
print(result)
return result
def print_task(self, task):
print(self.id, task)
我想要一个这样的定时任务:
def save(self, *args, **kwargs):
self.scheduled_task = Schedule.objects.create(
func=self.run_function,
hook=self.print_task,
schedule_type=Schedule.CRON,
cron=self.cron
)
super(TestApp, self).save(*args, **kwargs)
但是这样做不行,结果会是:
18:32:02 [Q] INFO Process-f625bf1f4a024df8be5e15647bf294a9 created task alaska-ten-october-mirror from schedule [4]
18:32:02 [Q] INFO Process-069b6ca530ae4e83be6aedbd669a94a7 processing alaska-ten-october-mirror '<bound method TestApp.run_function of <TestApp: TestApp object (1)>>' [4]
18:32:02 [Q] ERROR malformed return hook '<bound method TestApp.print_task of <TestApp: TestApp object (1)>>' for [alaska-ten-october-mirror]
18:32:02 [Q] ERROR Failed '<bound method TestApp.run_function of <TestApp: TestApp object (1)>>' (alaska-ten-october-mirror) - Function <bound method TestApp.run_function of <TestApp: TestApp object (1)>> is not defined : Traceback (most recent call last):
如果我做一个简单的异步调用,它就能正常工作:
def save(self, *args, **kwargs):
async_task(
func=self.run_function,
hook=self.print_task
)
super(TestApp, self).save(*args, **kwargs)
这样就能正确完成任务:
18:35:25 [Q] INFO Process-cfc73b4a7c5d48d69eed82b311f18250 processing ceiling-echo-six-west '<bound method TestApp.run_function of <TestApp: TestApp object (1)>>'
-2.0
1 ceiling-echo-six-west
我不太明白为什么异步调用可以做到,而定时任务却不行。
1 个回答
0
我想这是因为定时任务需要能够独立于其他任务运行,但上面提到的问题是因为:
- 一个任务可以把一个引用函数当作函数和钩子使用
- 定时任务需要把函数设置为“仅限点字符串”。
如何解决这个问题:
def save(self, *args, **kwargs):
self.scheduled_task = Schedule.objects.create(
func='test_app.tasks.run_function',
hook='test_app.tasks.print_task',
schedule_type=Schedule.CRON,
cron=self.cron,
kwargs={'TestApp_id': self.id}
)
super(TestApp, self).save(*args, **kwargs)
然后在你的 tasks.py 文件中,我这样定义了任务:
def run_function(**kwargs):
id = kwargs['TestApp_id']
test_app = TestApp.objects.get(pk=id)
Module = __import__(test_app.test_function.library_name)
func = getattr(Module, test_app.test_function.function_name)
result = func(*test_app.get_args())
print(result)
return {
'result': result,
'TestApp_id': id
}
def print_task(task):
print(task.result['TestApp_id'], task.result['result'])