当无法导入任务时如何运行Celery任务

6 投票
2 回答
2392 浏览
提问于 2025-04-17 08:39

我有两台服务器:一台运行着一个django应用,另一台同时运行着rabbitmq队列和celery工作者。在运行队列和工作者的服务器上的tasks.py文件里,有一个任务如下:

@task(queue="reports")
def test_task():
    time.sleep(120)

我的目标是从django的视图中执行这个任务。因为这个任务的代码在与django视图不同的服务器上,所以我想调用这个任务。我正在尝试使用以下代码将任务从django发送到工作者机器。

send_task("tasks.test_task", task_id=task_id, args=[], kwargs={}, publisher=publisher, queue=queue)

我在这里找到了这个方法,但到目前为止测试都没有成功。

我在celery工作者服务器上用tail -F命令查看celery工作者的日志文件,然后在浏览器中访问包含send_task的视图的URL。我希望在tail输出中看到任务显示为“已接收”,但并没有。

celery工作者的日志级别是DEBUG,日志文件显示任务以正确的名称注册,并且django应用的settings.py里包含了rabbitmq服务器的正确IP和凭证。在尝试不同的方法时,我偶尔会在celery日志文件中看到错误信息,当我将传递给send_task的字符串改为一个无效的任务时(比如send_task('asdf'))。这会导致日志文件中出现UnregisteredError错误。不过,这种情况只是偶尔发生,在测试不同的设置和调用组合时,我还没有找到一种可靠的方法来重现这个行为。

另外,这是django项目settings.py中的相关部分(实际值已去掉):

CELERY_RESULT_BACKEND = 'amqp'
BROKER_HOST = 'the.correct.IP.address'
BROKER_USER = 'the_correct_user'
BROKER_PASSWORD = 'the_correct_pass'
BROKER_VHOST = 'the_correct_vhost'
BROKER_PORT = 5672

我在网上搜索了一下,发现关于send_task的信息不多。你觉得我可能哪里做错了?

2 个回答

2

我觉得你想做的事情是不可能的。Celery的工作进程需要能访问到它们要执行的任务代码,这一点是无法绕开的。

修正一下:

但你真正想要的是:让工作进程能访问到代码,但Django的视图只通过名字来引用这些任务。

8

问题解决了,原来我传给send_task的publisher这个关键字参数是无效的,导致出现了错误。我没有看到这个错误,因为我是在用AJAX请求页面,而不是直接访问它。这个情况的其他部分都是正确的。我还去掉了传给send_task的那些不必要的关键字参数和参数。

send_task("tasks.test_task", task_id=task_id, queue=queue)

撰写回答