将SQS代理作为ECS任务的芹菜

2024-04-27 00:52:32 发布

您现在位置:Python中文网/ 问答频道 /正文

我们的架构如下

烧瓶API-->;SQS-->;拉姆达。我们想离开lambda,使用ECS任务。我发现芹菜可以使用SQS作为经纪人。我写了一个启动工人的基本代码

#celery_sqs_test.py
import boto3
from celery import Celery
from kombu.utils.url import quote

from celery.utils.log import get_task_logger

logger = get_task_logger(__name__)

AWS_ACCESS_KEY = quote("AWS_ACCES_KEY")
AWS_SECRET_KEY = quote("AWS_SECRT_KEY")


broker_url = "sqs://{access}:{secret}@".format(access=AWS_ACCESS_KEY,
                                                     secret=AWS_SECRET_KEY,
                                                     )

app = Celery('tasks', broker=broker_url, backend=None)
app.config_from_object('celeryconfig')


@app.task(bind=True)
def consume(self, msg):
    # DO SOMETHING WITH THE RECEIVED MESSAGE

    
    logger.info("inside consume")
   
    client = boto3.client('sqs', 'us-east-1')
    client.delete_message(
        QueueUrl=msg.delivery_info['sqs_queue'],
        ReceiptHandle=msg.delivery_info['sqs_message']['ReceiptHandle']
    )
    msg._state = 'ACK'
    msg.channel.qos.ack(msg.delivery_tag)
    return True

我开始工作如下

celery -A celery_sqs_test worker -l INFO -Q sqs-queue-test-2

下面是配置

from kombu import (
    Exchange,
    Queue
)


broker_transport_options = {'region': "us-east-1"}
broker_transport = 'sqs'

accept_content = ['application/json']
result_serializer = 'json'
content_encoding = 'utf-8'
task_serializer = 'json'

worker_enable_remote_control = False
worker_send_task_events = False
result_backend = None

task_queues = (
    Queue('sqs-queue-test-2', exchange=Exchange('')),
)

工人启动,我看到下面的消息

--- ***** ----- 
-- ******* ---- Darwin-18.7.0-x86_64-i386-64bit 2020-12-04 11:55:42
- *** --- * --- 
- ** ---------- [config]
- ** ---------- .> app:         tasks:0x10c44b710
- ** ---------- .> transport:   sqs://TEST:**@localhost//
- ** ---------- .> results:     disabled://
- *** --- * --- .> concurrency: 12 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** ----- 
 -------------- [queues]
                .> sqs-queue-test-2 exchange=celery(direct) key=celery
                

[tasks]
  . celery_sqs-test.consume

[2020-12-04 11:55:43,335: INFO/MainProcess] Connected to sqs://AKIA2L2YTXTNNNWMAMMF:**@localhost//
[2020-12-04 11:55:44,023: INFO/MainProcess] celery@APY4JGH698D330 ready.

我看到同一个sqs消息被多次接收,我可以在日志中看到消息的内容,但不能看到我的日志语句或执行删除

我从here读到我需要调用该任务。我不太清楚该怎么做。我想我可以将worker作为一个ECS任务运行,消息将被consume方法使用和处理。我是否需要运行另一个ECS任务并从工作者处调用该任务


Tags: keyfromtestimportawsapptaskmsg