Django 应用程序向 SQS 发送消息,但工作线程未执行

问题描述 投票:0回答:1

我设置了一个名为

celery-celery
的 AWS SQS,当我调用 API 时,我通过 Celery 将任务发送到 SQS 队列。

我可以看到消息正在进入 SQS 队列 (

https://sqs.us-east-1.amazonaws.com/718854804674/celery-celery
) b/c 队列中有可用消息,并且每当我调用 API 时都会将新消息添加到队列中。

enter image description here

但是,这些消息永远不会消失,也永远不会执行。它们只是作为可用消息坐在队列中。

我正在 Beanstalk 上运行 Django 应用程序。

我正在调用api:

/api/print-hello-world/

哪个nvigaes到url:

path('print-hello-world/', print_hello_world_task, name='print_hello_world')

然后到视图:

def print_hello_world_task(request):
    
    # return JsonResponse({'status': 'API is working'})
    print("Received request to start timer task")
    logger.info("Received request to start timer task")
    task = print_hello_world.delay()  # This triggers the Celery task asynchronously
    print(f"Task started with ID: {task.id}")
    return JsonResponse({'status': 'Task started', 'task_id': task.id})

然后调用tasks.py中的任务:

@shared_task(queue='celery-celery')
def print_hello_world():
    return "Executed"

这是我的 Procfile:

web: gunicorn --workers 1 rvm.wsgi:application
worker: celery -A rvm worker -l debug --loglevel=DEBUG --queues=celery-celery --logfile=/var/log/celery/celery.log -E

这是我的settings.py中与CELERY_相关的元素:

    AWS_ACCESS_KEY = os.getenv('AWS_ACCESS_KEY')
    AWS_SECRET_KEY = os.getenv('AWS_SECRET_KEY')

    CELERY_RESULT_BACKEND = 'django-db'
    CELERY_CACHE_BACKEND = 'django-cache'
    CELERY_ACCEPT_CONTENT = ['json']
    CELERY_TASK_SERIALIZER = 'json'
    CELERY_TASK_DEFAULT_QUEUE = 'celery-celery'
    CELERY_RESULT_SERIALIZER = 'json'
    CELERY_TIMEZONE = 'UTC'
    CELERY_LOG_FILE = '/var/log/celery/celery.log'

    CELERY_BROKER_URL = 'sqs://{0}:{1}@'.format(AWS_ACCESS_KEY, AWS_SECRET_KEY)

   
    logger.info(f'CELERY_BROKER_URL: {CELERY_BROKER_URL}')


    CELERY_BROKER_TRANSPORT_OPTIONS = {
        'region': 'us-east-1',  # AWS region
        'polling_interval': 1,  # Polling interval for messages
        'visibility_timeout': 3600,  # Visibility timeout for messages
        'broker_connection_retry_on_startup': True,  # Retry connecting on startup
        'use_ssl': True,  # Ensure SSL is used
        'sqs': {
            'signature_version': 'v4',  # Use SigV4 for SQS
        }
    }

这是我的芹菜.py:

import os
from celery import Celery
import warnings
from celery.utils.log import get_logger
from kombu import Queue, Exchange

# Set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'rvm.settings')

app = Celery('rvm')

app.conf.task_default_queue = 'celery-celery'

# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#   should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')


# Load task modules from all registered Django apps.
app.autodiscover_tasks()

app.conf.update(
    broker_connection_retry_on_startup=True,  # As discussed before
)

# This line ensures the worker picks up the tasks from the correct queue
app.conf.task_queues = (
    Queue('celery-celery', Exchange('celery-celery'), routing_key='celery-celery'),
)
# app.control.purge()


# Set up logging
logger = get_logger(__name__)

# Ignore the PendingDeprecationWarning
warnings.filterwarnings('ignore', category=PendingDeprecationWarning)

我知道消息正在发送到 SQS b/c,这是来自 celery.log。参见

<Name>ApproximateNumberOfMessages</Name><Value>3</Value>

[2024-08-29 04:19:43,559: DEBUG/MainProcess] Response body:
b'<?xml version="1.0"?><GetQueueAttributesResponse xmlns="http://queue.amazonaws.com/doc/2012-11-05/"><GetQueueAttributesResult><Attribute><Name>ApproximateNumberOfMessages</Name><Value>3</Value></Attribute></GetQueueAttributesResult><ResponseMetadata><RequestId>7c6389e1-ea52-593a-a589-d6a8134d2a9c</RequestId></ResponseMetadata></GetQueueAttributesResponse>'
[2024-08-29 04:19:43,560: DEBUG/MainProcess] Event needs-retry.sqs.GetQueueAttributes: calling handler <botocore.retryhandler.RetryHandler object at 0x7fb8ff92ebd0>
[2024-08-29 04:19:43,560: DEBUG/MainProcess] No retry needed.
[2024-08-29 04:19:43,563: DEBUG/MainProcess] basic.qos: prefetch_count->8

我的 IAM 政策是: enter image description here

我的SQS访问政策在这里:

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "__owner_statement",
      "Effect": "Allow",
      "Principal": {
        "AWS": "arn:aws:iam::718854674:root"
      },
      "Action": "SQS:*",
      "Resource": "arn:aws:sqs:us-east-1:718854674:*"
    },
    {
      "Sid": "AllowRvmJenkinsServiceRoleAccess_Part1",
      "Effect": "Allow",
      "Principal": {
        "AWS": "arn:aws:iam::718854674:role/service-role/rvm-jenkins-service-roll"
      },
      "Action": [
        "sqs:GetQueueAttributes",
        "sqs:SendMessage",
        "sqs:ReceiveMessage",
        "sqs:DeleteMessage"
      ],
      "Resource": "arn:aws:sqs:us-east-1:718854674:celery-celery"
    },
    {
      "Sid": "AllowRvmJenkinsServiceRoleAccess_Part2",
      "Effect": "Allow",
      "Principal": {
        "AWS": "arn:aws:iam::718854674:role/service-role/rvm-jenkins-service-roll"
      },
      "Action": [
        "sqs:ChangeMessageVisibility",
        "sqs:GetQueueUrl",
        "sqs:ListQueues"
      ],
      "Resource": "arn:aws:sqs:us-east-1:718854674:celery-celery"
    },
    {
      "Sid": "AllowRvmJenkinsUserAccess_Part1",
      "Effect": "Allow",
      "Principal": {
        "AWS": "arn:aws:iam::718854674:user/rvm-jenkins-user"
      },
      "Action": [
        "sqs:GetQueueAttributes",
        "sqs:SendMessage",
        "sqs:ReceiveMessage",
        "sqs:DeleteMessage"
      ],
      "Resource": "arn:aws:sqs:us-east-1:718854674:celery-celery"
    },
    {
      "Sid": "AllowRvmJenkinsUserAccess_Part2",
      "Effect": "Allow",
      "Principal": {
        "AWS": "arn:aws:iam::718854674:user/rvm-jenkins-user"
      },
      "Action": [
        "sqs:ChangeMessageVisibility",
        "sqs:GetQueueUrl",
        "sqs:ListQueues"
      ],
      "Resource": "arn:aws:sqs:us-east-1:718854674:celery-celery"
    }
  ]
}

问题是:为什么我的芹菜任务没有被工人执行?

我几乎尝试了所有方法,但我无法弄清楚为什么工作任务没有执行。

感谢您的帮助!

python django celery amazon-sqs
1个回答
0
投票

好吧,我的配置可能是正确的,但我决定改用 redis 缓存。

我遇到的主要问题是我需要创建一个关闭加密并禁用集群模式的自定义缓存。

此时您无法在 UI 中设置这些设置,因此我在我的 beanstalk EC2 上运行此命令:

aws elasticache create-replication-group     --replication-group-id prod-redis-group1     --replication-group-description "Production Redis cluster mode disabled group without encryption"     --engine redis     --cache-node-type cache.m5.large     --num-node-groups 1     --replicas-per-node-group 1     --automatic-failover-enabled     --cache-subnet-group-name my-prod-subnet-group     --security-group-ids sg-0a228f8f1343d23ae     --preferred-maintenance-window sun:05:00-sun:09:00     --tags Key=Name,Value=prod-redis-group1

效果很好。

之后,我必须添加一个文件来设置 .env vars b/c,由于某种原因,它们没有从 beanstalk 配置中传递......稍后我将尝试理解这一点。但基本上我有:

sudo vi /etc/celery_worker.env 其中包含数据库内容:

DB_NAME=blah
DB_HOST=blahblahblah.rds.amazonaws.com
DB_USERNAME=blah
DB_PASSWORD=blah
DEBUG=False
ENVIRONMENT=PRODUCTION

我将继续在这里发布笔记。完成后我将重新组织并最终确定这个答案。

最新问题
© www.soinside.com 2019 - 2025. All rights reserved.