我的 Python 应用程序中有一个 Celery 实现。我使用的经纪商是SQS。发送到 SQS 的消息通过 Boto3 的 send_message() API 来自不同的应用程序。现在我的困惑是如何触发 Celery 从 SQS 中选择消息进行处理。 Celery 中将运行一些任务,它应该正确处理来自 SQS 的消息。我的要求类似于 Celery Consumer SQS Messages。
根据我的理解,Celery 会轮询 SQS,直到消息到达那里。有人可以帮我解决这个问题吗?
我每 20 秒调用一次此任务:
@app.task(name='listen_to_sqs_telemetry')
def listen_to_sqs_telemetry():
logger.info('start listen_to_telemetry')
sqs = get_sqs_client()
queue_url = 'https://sqs.us-east-2.amazonaws.com/xxx'
logger.info('Using ' + queue_url)
keep_going = True
num = 0
while keep_going:
keep_going = False
try:
response = sqs.receive_message(
QueueUrl=queue_url,
AttributeNames=[
'SentTimestamp',
],
MaxNumberOfMessages=5,
MessageAttributeNames=[
'All'
],
WaitTimeSeconds=20
)
# logger.info(response)
if 'Messages' in response:
keep_going = True
for rec in response['Messages']:
# Process message
sqs.delete_message(
QueueUrl=queue_url,
ReceiptHandle=rec['ReceiptHandle']
)
num = num + 1
else:
pass
# logger.info(response)
except Exception as e:
logger.error(str(e))
logger.info('done with listen_to_sqs_telemetry')
return "Processed {} message(s)".format(num)
如果我理解你的意思,请尝试将工作线程作为守护进程运行。使用像supervisord这样的工具来做到这一点。
对于我的用途,我选择创建一个单独的 AWS Lambda 函数来接收事件并将它们转发给 Celery 工作线程(通过不同的队列)。
我曾多次使用此模式的变体将数据转发给 celery 工作人员进行处理。
以下示例在新文件上传时从 S3 接收事件:
# - Env vars set:
# - `CELERY_BROKER_TRANSPORT` = 'sqs' (could inline, this is always the same)
# - `CELERY_TASK_DEFAULT_QUEUE` = 'celery-queue' (name of the sqs queue configured to use with celery)
import os
import logging
from celery import Celery, shared_task
logging.basicConfig(level=logging.DEBUG)
logging.info('Loading function')
app = Celery('triggers')
app.config_from_object(os.environ, namespace='CELERY')
@shared_task(name='on_new_file_uploaded')
def on_new_file_uploaded(records):
'''
The signature needs to match the task found in celery worker, but the
implementation exists there.
'''
pass
def lambda_handler(event, context):
try:
logging.info('Posting file uploaded action')
on_new_file_uploaded.delay(event['Records'])
logging.info('Posted file uploaded action')
except Exception as e:
logging.exception('Error handling message for {}.'.format(event['Records']))
raise e