Celery顺序链式任务
我需要通过FTP下载一个文件,修改它,然后再上传回去。我正在使用celery来完成这个任务,但在尝试使用链式调用时遇到了问题,出现了这样的错误:
类型错误:upload_ftp_image() 需要5个参数(给了6个)
另外,我可以使用链式调用来确保步骤是按顺序执行的吗?如果不能,那有什么替代方法呢?
res = chain(download_ftp_image.s(server, username , password, "/test_app_2/model.dae" ,"tmp/test_app_2/"), upload_ftp_image.s(server, username , password, "tmp/test_app_2/model.dae" ,"tmp/test_app_2/")).apply_async()
print res.get()
任务:
@task()
def download_ftp_image(ftp_server, username , password , filename, directory):
try:
ftp = FTP(ftp_server)
ftp.login(username, password)
if not os.path.exists(directory):
os.makedirs(directory)
ftp.retrbinary("RETR /default_app/model.dae" , open(directory + 'model.dae', 'wb').write)
else:
ftp.retrbinary("RETR /default_app/model.dae" , open(directory + 'model.dae', 'wb').write)
ftp.quit()
except error_perm, resp:
raise download_ftp_image.retry(countdown=15)
return "SUCCESS: "
@task()
def upload_ftp_image(ftp_server, username , password , file , directory):
try:
ftp = FTP(ftp_server)
ftp.login(username, password)
new_file= file.replace(directory, "")
directory = directory.replace("tmp","")
try:
ftp.storbinary("STOR " + directory + new_file , open(file, "rb"))
except:
ftp.mkd(directory)
ftp.storbinary("STOR " + directory + new_file, open(file, "rb"))
ftp.quit()
except error_perm, resp:
raise upload_ftp_image.retry(countdown=15)
return "SUCCESS: "
这对于我的具体情况来说是好习惯还是坏习惯呢?:
result = download_ftp_image.apply_async((server, username , password, "/test_app_2/model.dae" ,"tmp/test_app_2/",), queue='rep_data')
result.get()
result = upload_ftp_image.apply_async((server, username , password, "tmp/test_app_2/model.dae" ,"tmp/test_app_2/",), queue='rep_data')
#result.get()
2 个回答
25
如果你不想把前一个任务的返回值当作下一个任务的参数,可以考虑使用“不可变性”。
http://docs.celeryproject.org/en/latest/userguide/canvas.html#immutability
你可以把子任务定义成:
download_ftp_image.s(...) and upload_ftp_image.s(...)
而是把它们定义成:
download_ftp_image.si(...) and upload_ftp_image.si(...)
这样,你就可以像往常一样在任务链中使用这些任务,参数的数量也保持不变。
17
在链式调用中,前一个任务的结果总是作为第一个参数传递给下一个任务。根据链式调用的文档:
下一个任务会使用它的父任务的结果作为第一个参数。在上面的例子中,这意味着会执行
mul(4, 16)
,因为结果是4。
你的 upload_ftp_image
任务没有接受这个额外的参数,所以它就出错了。
你在这里有一个很好的链式调用的场景;第二个任务会确保在第一个任务完成后才会被调用(否则结果就无法传递下去了)。
只需为前一个任务的结果添加一个参数即可:
def upload_ftp_image(download_result, ftp_server, username , password , file , directory):
你可以利用这个结果值;比如让下载方法返回下载文件的路径,这样上传方法就知道要上传什么了?