我们的架构如下
烧瓶API-->;SQS-->;拉姆达。我们想离开lambda,使用ECS任务。我发现芹菜可以使用SQS作为经纪人。我写了一个启动工人的基本代码
#celery_sqs_test.py
import boto3
from celery import Celery
from kombu.utils.url import quote
from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)
AWS_ACCESS_KEY = quote("AWS_ACCES_KEY")
AWS_SECRET_KEY = quote("AWS_SECRT_KEY")
broker_url = "sqs://{access}:{secret}@".format(access=AWS_ACCESS_KEY,
secret=AWS_SECRET_KEY,
)
app = Celery('tasks', broker=broker_url, backend=None)
app.config_from_object('celeryconfig')
@app.task(bind=True)
def consume(self, msg):
# DO SOMETHING WITH THE RECEIVED MESSAGE
logger.info("inside consume")
client = boto3.client('sqs', 'us-east-1')
client.delete_message(
QueueUrl=msg.delivery_info['sqs_queue'],
ReceiptHandle=msg.delivery_info['sqs_message']['ReceiptHandle']
)
msg._state = 'ACK'
msg.channel.qos.ack(msg.delivery_tag)
return True
我开始工作如下
celery -A celery_sqs_test worker -l INFO -Q sqs-queue-test-2
下面是配置
from kombu import (
Exchange,
Queue
)
broker_transport_options = {'region': "us-east-1"}
broker_transport = 'sqs'
accept_content = ['application/json']
result_serializer = 'json'
content_encoding = 'utf-8'
task_serializer = 'json'
worker_enable_remote_control = False
worker_send_task_events = False
result_backend = None
task_queues = (
Queue('sqs-queue-test-2', exchange=Exchange('')),
)
工人启动,我看到下面的消息
--- ***** -----
-- ******* ---- Darwin-18.7.0-x86_64-i386-64bit 2020-12-04 11:55:42
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app: tasks:0x10c44b710
- ** ---------- .> transport: sqs://TEST:**@localhost//
- ** ---------- .> results: disabled://
- *** --- * --- .> concurrency: 12 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> sqs-queue-test-2 exchange=celery(direct) key=celery
[tasks]
. celery_sqs-test.consume
[2020-12-04 11:55:43,335: INFO/MainProcess] Connected to sqs://AKIA2L2YTXTNNNWMAMMF:**@localhost//
[2020-12-04 11:55:44,023: INFO/MainProcess] celery@APY4JGH698D330 ready.
我看到同一个sqs消息被多次接收,我可以在日志中看到消息的内容,但不能看到我的日志语句或执行删除
我从here读到我需要调用该任务。我不太清楚该怎么做。我想我可以将worker作为一个ECS任务运行,消息将被consume
方法使用和处理。我是否需要运行另一个ECS任务并从工作者处调用该任务
目前没有回答
相关问题 更多 >
编程相关推荐