为什么`celery.current_app`在Flask视图函数中指向默认实例

6 投票
1 回答
6356 浏览
提问于 2025-04-29 05:51

我并不是想在视图函数里使用 celery.current_app,而是有一个函数连接到了 after_task_publish 信号,这个函数用来在任务发布后更新状态。这个功能在 Flask 视图函数外部正常工作,并且状态更新也没问题。但是,当我从视图函数内部发送任务时,任务的状态却没有更新。我检查了一下,发现问题在于 current_app.backend 是一个 DisabledBackend 的实例,这是默认的,而不是我正在使用的 RedisBackend 的实例。

这个问题发生的原因是,在 Flask 视图函数内部,指向当前 Celery 实例的代理 celery.current_app 指向的是一个默认实例,这个实例是在没有当前 Celery 实例时创建的。

我尝试复现这个问题,下面是一个测试脚本:

from __future__ import absolute_import, print_function, unicode_literals

from flask import Flask, request

from celery import Celery, current_app
from celery.signals import after_task_publish
# internal module for debugging purposes
from celery._state import default_app, _tls


# the flask application
flask_app = Flask(__name__)

# the celery application
celery_app = Celery('tasks', broker='amqp://', backend='redis://')

# debugging info
debug = """
[{location}]
celery_app       = {celery_app}
current_app      = {current_app}
add.app          = {add_app}
default_app      = {default_app}
_tls.current_app = {tls_current_app}
"""

print(debug.format(
    location = 'OUTSIDE VIEW',
    celery_app = celery_app,
    current_app = current_app,
    add_app = add.app,
    default_app = default_app,
    tls_current_app = _tls.current_app
))


# fired after a task is published
@after_task_publish.connect
def after_publish(sender=None, body=None, **kwargs):
    print(debug.format(
        location = 'INSIDE SIGNAL FUNCTION',
        celery_app = celery_app,
        current_app = current_app,
        add_app = add.app,
        default_app = default_app,
        tls_current_app = _tls.current_app
    ))

# a simple task for testing
@celery_app.task(name='add')
def add(a, b):
    return a + b


@flask_app.route('/add')
def add_view():
    print(debug.format(
        location = 'INSIDE VIEW',
        celery_app = celery_app,
        current_app = current_app,
        add_app = add.app,
        default_app = default_app,
        tls_current_app = _tls.current_app
    ))

    a = request.args.get('a')
    b = request.args.get('b')

    task = add.delay(a, b)

    return task.task_id


if __name__ == '__main__':
    flask_app.run(debug=True)

这是输出结果:

[OUTSIDE VIEW]
celery_app       = <Celery tasks:0xb69ede4c>
current_app      = <Celery tasks:0xb69ede4c>
add.app          = <Celery tasks:0xb69ede4c>
default_app      = None
_tls.current_app = <Celery tasks:0xb69ede4c>


[INSIDE VIEW]
celery_app       = <Celery tasks:0xb69ede4c>
current_app      = <Celery default:0xb6b0546c>
add.app          = <Celery tasks:0xb69ede4c>
default_app      = None
_tls.current_app = None   # making current_app fallback to the default instance


[INSIDE SIGNAL FUNCTION]
celery_app       = <Celery tasks:0xb69ede4c>
current_app      = <Celery default:0xb6a174ec>
add.app          = <Celery tasks:0xb69ede4c>
default_app      = None
_tls.current_app = None

因为在视图函数内部 _tls.current_app 是 None,所以 celery.current_app 指向了默认实例,这个是从 celery._state._get_current_app 得到的:

return _tls.current_app or default_app

_tlscelery._state._TLS 的一个实例:

class _TLS(threading.local):
    #: Apps with the :attr:`~celery.app.base.BaseApp.set_as_current` attribute
    #: sets this, so it will always contain the last instantiated app,
    #: and is the default app returned by :func:`app_or_default`.
    current_app = None

这个问题和线程有关吗?可能是个 bug 吗?还是说这是正常现象?

需要注意的是,我可以在我连接的函数中使用实际的 Celery 实例,这样一切都能正常工作,但我担心 celery.current_app 在其他地方被使用时会导致我的代码出错。

暂无标签

1 个回答

12

我发现了一个问题,当我在没有开启调试模式的情况下运行Flask应用时,一切都正常。但是当debug设置为True时,程序会使用一个叫做重载器的东西,这个重载器会在另一个线程中运行应用,这个过程发生在werkzeug._reloader.run_with_reloader这个函数里。

根据Python文档,有一个叫threading.local的类,它是用来存储当前应用实例的:

这个类代表了线程本地数据。线程本地数据是指那些值是特定于线程的数据。

不同线程中的实例值会有所不同。

所以这意味着celery._state._tls.current_app在不同线程之间是不能共享的,我们需要手动设置celery实例为当前应用,比如在视图函数中:

celery_app.set_current()

撰写回答