用芹菜运行简单的周期任务

2024-06-16 11:05:24 发布

您现在位置:Python中文网/ 问答频道 /正文

我好像不知道怎么让它工作。我想每十秒钟运行一次函数

from __future__ import absolute_import, unicode_literals, print_function
from celery import Celery
import app as x # the library which hold the func i want to task

app = Celery(
    'myapp',
    broker='amqp://guest@localhost//',
)

app.conf.timezone = 'UTC'

@app.task
def shed_task():
    x.run()


@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
    # Calls say('hello') every 10 seconds.
    sender.add_periodic_task(10.0, shed_task.s(), name='add every 10')

if __name__ == '__main__':
    app.start()

然后,当我运行脚本时,它只是向我显示了一组可以与芹菜一起使用的命令。我怎样才能让它运转起来?我必须在命令行或其他地方运行它吗?你知道吗

此外,当我得到它运行时,我能看到一个完整的任务列表连同任何错误?你知道吗


Tags: thenamefromimportaddapptaskdef
1条回答
网友
1楼 · 发布于 2024-06-16 11:05:24

您只需使用下面的python线程模块就可以做到这一点

import time, threading
def foo():
    print(time.ctime())
    threading.Timer(10, foo).start()

foo()

相关问题 更多 >