使用来自 Celery 的 SQS 消息

问题描述 投票:0回答:3

我的 Python 应用程序中有一个 Celery 实现。我使用的经纪商是SQS。发送到 SQS 的消息通过 Boto3 的 send_message() API 来自不同的应用程序。现在我的困惑是如何触发 Celery 从 SQS 中选择消息进行处理。 Celery 中将运行一些任务,它应该正确处理来自 SQS 的消息。我的要求类似于 Celery Consumer SQS Messages

根据我的理解,Celery 会轮询 SQS,直到消息到达那里。有人可以帮我解决这个问题吗?

python amazon-sqs django-celery
3个回答
1
投票

我每 20 秒调用一次此任务:

@app.task(name='listen_to_sqs_telemetry')
def listen_to_sqs_telemetry():
    logger.info('start listen_to_telemetry')
    sqs = get_sqs_client()
    queue_url = 'https://sqs.us-east-2.amazonaws.com/xxx'
    logger.info('Using ' + queue_url)

    keep_going = True
    num = 0
    while keep_going:
        keep_going = False
        try:
            response = sqs.receive_message(
                QueueUrl=queue_url,
                AttributeNames=[
                    'SentTimestamp',
                ],
                MaxNumberOfMessages=5,
                MessageAttributeNames=[
                    'All'
                ],
                WaitTimeSeconds=20
            )
            # logger.info(response)
            if 'Messages' in response:
                keep_going = True
                for rec in response['Messages']:
                    # Process message
                    sqs.delete_message(
                        QueueUrl=queue_url,
                        ReceiptHandle=rec['ReceiptHandle']
                    )
                    num = num + 1
            else:
                pass
                # logger.info(response)
        except Exception as e:
            logger.error(str(e))
    logger.info('done with listen_to_sqs_telemetry')
    return "Processed {} message(s)".format(num)

0
投票

如果我理解你的意思,请尝试将工作线程作为守护进程运行。使用像supervisord这样的工具来做到这一点。


0
投票

对于我的用途,我选择创建一个单独的 AWS Lambda 函数来接收事件并将它们转发给 Celery 工作线程(通过不同的队列)。

我曾多次使用此模式的变体将数据转发给 celery 工作人员进行处理。

  • 来自第 3 方服务的 Webhook 向 Lambda 函数 URL 发出请求
  • S3 存储桶事件通知
  • Cognito 用户事件挂钩

以下示例在新文件上传时从 S3 接收事件:

  • 包括 celery 库(所以我不必担心自己维护 celery 消息结构)。
  • 与worker中的方法具有相同签名的空方法声明,并使用celery任务注释进行注释。
  • 处理程序解压输入消息,并触发任务。
# - Env vars set:
#   - `CELERY_BROKER_TRANSPORT` = 'sqs' (could inline, this is always the same)
#   - `CELERY_TASK_DEFAULT_QUEUE` = 'celery-queue' (name of the sqs queue configured to use with celery)

import os
import logging
from celery import Celery, shared_task

logging.basicConfig(level=logging.DEBUG)

logging.info('Loading function')

app = Celery('triggers')
app.config_from_object(os.environ, namespace='CELERY')


@shared_task(name='on_new_file_uploaded')
def on_new_file_uploaded(records):
    '''
    The signature needs to match the task found in celery worker, but the
    implementation exists there.
    '''
    pass


def lambda_handler(event, context):
    try:
        logging.info('Posting file uploaded action')
        on_new_file_uploaded.delay(event['Records'])
        logging.info('Posted file uploaded action')
    except Exception as e:
        logging.exception('Error handling message for {}.'.format(event['Records']))
        raise e
© www.soinside.com 2019 - 2024. All rights reserved.