这让我有些沮丧,并且由于缺乏内部的 Celery 文档而花费了我大量的时间进行研究和逆向工程。这里有一个漂亮的教程:
https://ask.github.io/celery/tutorials/clickcounter.html
就是这样做的,使用 kombu 在 Django 视图和 Celery 任务之间进行通信。但这是针对 Celery 2.6 的,现在是 2020 年了,我们正在使用 4.4,尤其是这个:
from celery.messaging import establish_connection
已被长期弃用。当我认为昆布本地人在这里可以正常工作并且我们不需要依赖该模块提供的胡萝卜兼容性时,它也依赖于
kombu.compat
。但这是一个次要问题。
我的主要问题是如何更新这个示例,我发现没有任何 Celery 文档可以提供帮助,并且在经过数小时的代码阅读和跟踪后仍然无法将我的手指放在 Celery 4.4 中的模拟上。
我在 celery 代码库中找到了提示和线索,只是还没有找到答案。
如果有更有经验的声音来解释这一点,帮助获得像本教程这样使用 Celery 4.4 方法工作的示例,我会感到高兴、感激和兴奋。
为了说明我发现了这个:
COMPAT_MODULES = {
'celery': {
'execute': {
'send_task': 'send_task',
},
'decorators': {
'task': 'task',
'periodic_task': _compat_periodic_task_decorator,
},
'log': {
'get_default_logger': 'log.get_default_logger',
'setup_logger': 'log.setup_logger',
'setup_logging_subsystem': 'log.setup_logging_subsystem',
'redirect_stdouts_to_logger': 'log.redirect_stdouts_to_logger',
},
'messaging': {
'TaskConsumer': 'amqp.TaskConsumer',
'establish_connection': 'connection',
'get_consumer_set': 'amqp.TaskConsumer',
},
'registry': {
'tasks': 'tasks',
},
},
'celery.task': {
'control': {
'broadcast': 'control.broadcast',
'rate_limit': 'control.rate_limit',
'time_limit': 'control.time_limit',
'ping': 'control.ping',
'revoke': 'control.revoke',
'discard_all': 'control.purge',
'inspect': 'control.inspect',
},
'schedules': 'celery.schedules',
'chords': 'celery.canvas',
}
}
这里:https://github.com/celery/celery/blob/master/celery/local.py
这是一个诱人的线索,
celery.messaging.establish_connection
被替换为......connection
......尽管解释起来很困难。
实例化 Celery 时创建的 Celery 应用:
app = Celery('current_module', broker='pyamqp://mybroker', backend='rpc://')
有一个
connection
属性,但在本教程中 app.connection
并不能取代 connection = establish_connection()
(生活没那么简单)。
我将继续研究这个问题并尝试解决它,但我想邀请任何可能为我节省时间和精力的人提供一些指导,并提供一些打开海带连接并在一个函数中发送消息的示例代码(比如 Django 视图)并打开一个 kombu 连接并在另一个函数(比如 Celery 任务)中接收消息,要么从头开始(忽略 Celery),要么更理想地使用 Celery(因为 Celery 已经拥有代理的所有配置)并且可能有辅助函数来做到这一点
。一个候选者可能是 Celery 的
Queue
或 SimpleQueue
类,但它们缺乏我能找到的任何类型的内部文档。
我也遇到了这个问题,几个小时后我找到了答案;
如果你想像AMQP客户端一样使用
Kombu
和Celery
(分布式任务队列),你需要支持Celery支持的消息协议;
(https://docs.celeryq.dev/en/stable/internals/protocol.html)
我这样写sender,我使用
dependency injection
,有关客户端的更多详细信息请参阅下面:
import asyncio
from logging import Logger
from kombu.utils import uuid
from producer.amqp_client import AmqpClient
class SendAlertCommand:
def __init__(self, logger: Logger, amqp: AmqpClient, debug: bool) -> None:
self.debug = debug
self.logger = logger
self.amqp = amqp
self.producer = amqp.producer
def __call__(self):
async def main():
text = {"message": "Hello World"}
body = (
[],
text,
{}
)
task_id = uuid()
self.amqp.producer.publish(
correlation_id=task_id,
# rabbitmq broker setting
expiration=60,
timeout=60,
# declare=True,
retry=True,
retry_policy={
'interval_start': 0, # First retry immediately,
'interval_step': 2, # then increase by 2s for every retry.
'interval_max': 30, # but don't exceed 30s between retries.
'max_retries': 30, # give up after 30 tries.
},
exchange='celery',
routing_key="send_hello",
serializer="msgpack",
# celery message protocol v2
# correlation_id=task_id,
body=body,
headers={
'id': task_id,
'lang': 'py',
'task': 'consumer.tasks.hello',
'root_id': None,
'parent_id': None,
'group': None,
}
)
print("sended!")
asyncio.run(main(), debug=self.debug)
我的 AMQP 客户端
def callback_undeliverable_messages(self, exception: BaseException, exchange: str, routing_key: str, message: Any) -> None:
self.logger.error(f"AMQPClient Error: {exception}; Exchange: {exchange}, Routing Key: {routing_key}; Message: {message}")
def __init__(self, config: dict, rules: dict, logger: Logger) -> None:
self.config = config
self.rules = rules
self.logger = logger
with Connection(f"amqp://{config['user']}:{config['password']}@{config['host']}:{config['port']}") as connection:
self.connection: Connection = connection
self.connection.connect()
with Producer(connection, compression="brotli", on_return=self.callback_undeliverable_messages) as producer:
self.producer: Producer = producer
上层设置完成后,我的celery开始接收消息并处理它。