如何用芹菜链等待任务完成?

2024-04-25 05:55:42 发布

您现在位置:Python中文网/ 问答频道 /正文

我试图在这里创建一个芹菜连锁店:

chain(getAllProducts.s(shopname, hdrs),
    editOgTags.s(title, description, whichImage, readableShopname, currentThemeId),
    notifyBulkEditFinish.si(email, name, readableShopname, totalProducts),
    updateBulkEditTask.si(taskID))()

在editOgTags中,有3个子任务:

@shared_task(ignore_result=True)
def editOgTags(products, title, description, whichImage, readableShopname, currentThemeId):
    for product in products:
        editOgTitle.delay(product, title, readableShopname)
        editOgDescription.delay(product, description, readableShopname)
        editOgImage.delay(product, int(whichImage), currentThemeId)

在每个editOgXXX函数中,都有一个要使用速率限制调用的函数:

@shared_task(rate_limit='1/s')
def updateMetafield(index, loop_var, target_id, type_value):
    resource = type_value + 's'
    # print(f"loop_var key = {loop_var[index]['key']}")
    if type_value == 'product' or type_value == 'collection' or type_value == 'article' or type_value == 'page':
        meta = shopify.Metafield.find(resource=resource, resource_id=target_id, namespace='global', key=loop_var[index]['key'])
        checkAndWaitShopifyAPICallLimit()
    else:
        print("Not available metafield type! Cannot update.")
        return

    if meta:
        # meta[0].destroy()
        meta[0].value = loop_var[index]['value']
        meta[0].save()
    else:
        metafield = shopify.Metafield.create({
            'value_type': 'string',
            'namespace': 'global',
            'value': loop_var[index]['value'],
            'value-type': 'string',
            'key': loop_var[index]['key'],
            'resource': resource,
            'resource_id': target_id,
            })
        metafield.save()

在漏桶算法下,它一次提供40个api调用,2个reqs/s补充。因为shopify功能的速率限制为2次请求/秒。我将速率限制设置为1/s。当它用完api配额时,我将调用time.sleep(20)以等待CheckAndWaitShopIficationApicallLimit()中的补充

问题是在所有任务完成之前调用电子邮件通知功能(notifyBulkEditFinish)。如何确保在所有任务完成后调用电子邮件功能

我怀疑sleep函数会使任务落后于队列中的email函数


Tags: key函数loopidindextitlevaluevar
2条回答

你的问题在于“所有任务完成后”的定义

editOgTags启动len(products) * 3子任务——显然每个子任务都会启动另一个异步子任务。如果您想在发送电子邮件之前等待所有这些任务执行完毕,则需要一些同步机制。芹菜的内置解决方案是chord对象。ATM,您的代码等待editOgTags完成,但此任务所做的唯一事情是启动其他子任务-然后返回,无论这些子任务本身是否完成

A chord is just like a group but with a callback. The chain primitive lets us link together signatures so that one is called after the other, essentially forming a chain of callbacks. What is the difference to change the chain to chord?

请注意,我并不是说您必须用chord替换整个chain。提示:链、组和和弦是要执行的任务,因此您可以通过组合任务、链、组和和弦来创建复杂的工作流

正如上面提到的,区别在于chord将等待它的头中的所有任务完成后再执行回调。这允许N个异步任务并行执行,但在运行回调之前仍要等待所有任务完成。当然,这需要对代码进行一些思考和重新组织(因此,如果需要,子任务会被考虑在内),但这确实回答了您的问题:“如何确保在所有任务完成后调用电子邮件功能?”

要扩展@bruno的注释:使用chord并修改editOgTags函数来创建与通知相关的组:

from celery import chord

@shared_task(ignore_result=True)
def editOgTags(products, title, description, whichImage, readableShopname, currentThemeId, name, email, totalProducts):
    tasks = []
    for product in products:
        tasks.append(editOgTitle.si(product, title, readableShopname))
        tasks.append(editOgDescription.si(product, description, readableShopname))
        tasks.append(editOgImage.si(product, int(whichImage), currentThemeId))
    # kick off the chord, notifyBulk... will be called after all of these 
    # edit... tasks complete.
    chord(tasks)(notifyBulkEditFinish.si(email, name, readableShopname, totalProducts))

相关问题 更多 >