<p>下面是一个解决方案,它可以使用flask应用程序工厂模式,还可以创建带有上下文的芹菜任务,而无需使用<code>app.app_context()</code>。在避免循环导入的同时获得该应用程序确实很难,但这解决了问题。这是为芹菜4.2这是最新的时候写的。</p>
<p>结构:</p>
<pre><code>repo_name/
manage.py
base/
base/__init__.py
base/app.py
base/runcelery.py
base/celeryconfig.py
base/utility/celery_util.py
base/tasks/workers.py
</code></pre>
<p>因此<code>base</code>是本例中的主要应用程序包。在<code>base/__init__.py</code>中,我们创建芹菜实例,如下所示:</p>
<pre><code>from celery import Celery
celery = Celery('base', config_source='base.celeryconfig')
</code></pre>
<p><code>base/app.py</code>文件包含烧瓶应用程序工厂<code>create_app</code>,并注意它包含的<code>init_celery(app, celery)</code>:</p>
<pre><code>from base import celery
from base.utility.celery_util import init_celery
def create_app(config_obj):
"""An application factory, as explained here:
http://flask.pocoo.org/docs/patterns/appfactories/.
:param config_object: The configuration object to use.
"""
app = Flask('base')
app.config.from_object(config_obj)
init_celery(app, celery=celery)
register_extensions(app)
register_blueprints(app)
register_errorhandlers(app)
register_app_context_processors(app)
return app
</code></pre>
<p>转到<code>base/runcelery.py</code>内容:</p>
<pre><code>from flask.helpers import get_debug_flag
from base.settings import DevConfig, ProdConfig
from base import celery
from base.app import create_app
from base.utility.celery_util import init_celery
CONFIG = DevConfig if get_debug_flag() else ProdConfig
app = create_app(CONFIG)
init_celery(app, celery)
</code></pre>
<p>接下来,<code>base/celeryconfig.py</code>文件(作为示例):</p>
<pre><code># -*- coding: utf-8 -*-
"""
Configure Celery. See the configuration guide at ->
http://docs.celeryproject.org/en/master/userguide/configuration.html#configuration
"""
## Broker settings.
broker_url = 'pyamqp://guest:guest@localhost:5672//'
broker_heartbeat=0
# List of modules to import when the Celery worker starts.
imports = ('base.tasks.workers',)
## Using the database to store task state and results.
result_backend = 'rpc'
#result_persistent = False
accept_content = ['json', 'application/text']
result_serializer = 'json'
timezone = "UTC"
# define periodic tasks / cron here
# beat_schedule = {
# 'add-every-10-seconds': {
# 'task': 'workers.add_together',
# 'schedule': 10.0,
# 'args': (16, 16)
# },
# }
</code></pre>
<p>现在在<code>base/utility/celery_util.py</code>文件中定义init芹菜:</p>
<pre><code># -*- coding: utf-8 -*-
def init_celery(app, celery):
"""Add flask app context to celery.Task"""
TaskBase = celery.Task
class ContextTask(TaskBase):
abstract = True
def __call__(self, *args, **kwargs):
with app.app_context():
return TaskBase.__call__(self, *args, **kwargs)
celery.Task = ContextTask
</code></pre>
<p>对于<code>base/tasks/workers.py</code>中的工人:</p>
<pre><code>from base import celery as celery_app
from flask_security.utils import config_value, send_mail
from base.bp.users.models.user_models import User
from base.extensions import mail # this is the flask-mail
@celery_app.task
def send_async_email(msg):
"""Background task to send an email with Flask-mail."""
#with app.app_context():
mail.send(msg)
@celery_app.task
def send_welcome_email(email, user_id, confirmation_link):
"""Background task to send a welcome email with flask-security's mail.
You don't need to use with app.app_context() here. Task has context.
"""
user = User.query.filter_by(id=user_id).first()
print(f'sending user {user} a welcome email')
send_mail(config_value('EMAIL_SUBJECT_REGISTER'),
email,
'welcome', user=user,
confirmation_link=confirmation_link)
</code></pre>
<p>然后,您需要在<code>repo_name</code>文件夹</em>中的<em>两个不同的命令提示中启动芹菜节拍和芹菜工人。</p>
<p>在一个命令提示符下执行<code>celery -A base.runcelery:celery beat</code>,另一个命令提示符下执行<code>celery -A base.runcelery:celery worker</code>。</p>
<p>然后,运行需要flask上下文的任务。应该有用。</p>