Celery顺序链式任务

11 投票
2 回答
15869 浏览
提问于 2025-04-17 17:59

我需要通过FTP下载一个文件,修改它,然后再上传回去。我正在使用celery来完成这个任务,但在尝试使用链式调用时遇到了问题,出现了这样的错误:

类型错误:upload_ftp_image() 需要5个参数(给了6个)

另外,我可以使用链式调用来确保步骤是按顺序执行的吗?如果不能,那有什么替代方法呢?

res = chain(download_ftp_image.s(server, username , password, "/test_app_2/model.dae" ,"tmp/test_app_2/"), upload_ftp_image.s(server, username , password, "tmp/test_app_2/model.dae" ,"tmp/test_app_2/")).apply_async()
print res.get()

任务:

@task()
def download_ftp_image(ftp_server, username , password , filename, directory):
    try:
        ftp = FTP(ftp_server)
        ftp.login(username, password)
        if not os.path.exists(directory):
            os.makedirs(directory)
            ftp.retrbinary("RETR /default_app/model.dae" , open(directory + 'model.dae', 'wb').write)
        else:
            ftp.retrbinary("RETR /default_app/model.dae" , open(directory + 'model.dae', 'wb').write)
        ftp.quit()
    except error_perm, resp:
        raise download_ftp_image.retry(countdown=15)

    return "SUCCESS: "  

@task()
def upload_ftp_image(ftp_server, username , password , file , directory):
    try:
        ftp = FTP(ftp_server)
        ftp.login(username, password)
        new_file= file.replace(directory, "")
        directory = directory.replace("tmp","")
        try:
            ftp.storbinary("STOR " + directory + new_file , open(file, "rb"))
        except:
            ftp.mkd(directory)
            ftp.storbinary("STOR " + directory + new_file, open(file, "rb"))
        ftp.quit()
    except error_perm, resp:
        raise upload_ftp_image.retry(countdown=15)

    return "SUCCESS: "

这对于我的具体情况来说是好习惯还是坏习惯呢?:

result = download_ftp_image.apply_async((server, username , password, "/test_app_2/model.dae" ,"tmp/test_app_2/",), queue='rep_data')
result.get()
result = upload_ftp_image.apply_async((server, username , password, "tmp/test_app_2/model.dae" ,"tmp/test_app_2/",), queue='rep_data')
#result.get()

2 个回答

25

如果你不想把前一个任务的返回值当作下一个任务的参数,可以考虑使用“不可变性”。

http://docs.celeryproject.org/en/latest/userguide/canvas.html#immutability

你可以把子任务定义成:

download_ftp_image.s(...) and upload_ftp_image.s(...)

而是把它们定义成:

download_ftp_image.si(...) and upload_ftp_image.si(...)

这样,你就可以像往常一样在任务链中使用这些任务,参数的数量也保持不变。

17

在链式调用中,前一个任务的结果总是作为第一个参数传递给下一个任务。根据链式调用的文档

下一个任务会使用它的父任务的结果作为第一个参数。在上面的例子中,这意味着会执行 mul(4, 16),因为结果是4。

你的 upload_ftp_image 任务没有接受这个额外的参数,所以它就出错了。

你在这里有一个很好的链式调用的场景;第二个任务会确保在第一个任务完成后才会被调用(否则结果就无法传递下去了)。

只需为前一个任务的结果添加一个参数即可:

def upload_ftp_image(download_result, ftp_server, username , password , file , directory):

你可以利用这个结果值;比如让下载方法返回下载文件的路径,这样上传方法就知道要上传什么了?

撰写回答