Django Celery Beat中使用从另一个库导入的类函数的参数

2024-05-23 20:32:55 发布

您现在位置:Python中文网/ 问答频道 /正文

我试图让一个函数在我的django项目中工作,这个项目使用celerybeat从包装器库导入基于类的函数。我一直在读到芹菜不太适合上课。我的函数login\u mb不接受参数,但当我尝试注册并调用此任务时,我得到一个错误Couldn't apply scheduled task login_mb: login_mb() takes 0 positional arguments but 1 was given 这是因为self在包装器函数中被导入?在

我该怎么做才能让它和celerybeat一起工作?在

在设置.py在

CELERY_BEAT_SCHEDULE = {
   'login_mb': {
        'task': 'backend.tasks.login_mb',
        'schedule': timedelta(minutes=30),
    } ,

在任务.py在

^{pr2}$

在顶点.py(包装器库)

from matchbook.baseclient import BaseClient
from matchbook import endpoints


class APIClient(BaseClient):

    def __init__(self, username, password=None):
        super(APIClient, self).__init__(username, password)

        self.login = endpoints.Login(self)
        self.keep_alive = endpoints.KeepAlive(self)
        self.logout = endpoints.Logout(self)
        self.betting = endpoints.Betting(self)
        self.account = endpoints.Account(self)
        self.market_data = endpoints.MarketData(self)
        self.reference_data = endpoints.ReferenceData(self)
        self.reporting = endpoints.Reporting(self)

    def __repr__(self):
        return '<APIClient [%s]>' % self.username

    def __str__(self):
        return 'APIClient'

Tags: 项目函数frompyselftaskdefusername
1条回答
网友
1楼 · 发布于 2024-05-23 20:32:55

这个错误与您的包装器库无关,您的任务似乎没有任何问题。在

问题的出现是因为您已经用bind=True定义了任务,当您这样做时,celery automatillca会向包含当前任务信息的方法注入一个参数。因此,您可以删除bind=True,或者向任务方法添加一个参数,如下所示:

@shared_task(bind=True)
def login_mb(self):
    mb = APIClient('abc', '123')
    mb.login()
    mb.keep_alive()

相关问题 更多 >