RabbitMQ Pika和Django Channels websocket

问题描述 投票:1回答:1

我第一次使用Django Channels和RabbitMQ pika。我正在尝试从RabbitMQ队列中消费。我正在使用Django Channels AsyncConsumer分组发送给网络套接字中连接的每个人。

User type 1:可以创建任务

User type 2:可以接受任务。

用例:当user type 1创建任务时,它将发布在Rabbitmq中。当从队列中使用它时,它必须是group-sent到前端。当user type 2接受任务时,user type 2的其他实例无法接受该任务,因此我们再次从队列中消费,并将队列中的下一个任务发送给所有人。

我使用sync_to_async在不同的线程中创建了连接,我将其从回调函数附加到内存中列表中。每当有人接受时,我都会将其从列表中弹出并确认队列。

class AcceptTaskConsumer(AsyncConsumer):
    body = [] #IN MEMORY LIST 
    delivery = {} #To store ack delivery_tag 


    async def websocket_connect(self, event):
        print("AcceptTaskConsumer connected", event)
        AcceptTaskConsumer.get_task() #STARTS Queue listener in new thread
        self.room_group_name = "user_type_2"
        await self.channel_layer.group_add(
            self.room_group_name,
            self.channel_name
        )

        await self.send({
            "type": "websocket.accept"
        })

    async def websocket_receive(self, event):
        if event["text"] == "Hi": #If connecting first time
            if AcceptTaskConsumer.body:
                await self.channel_layer.group_send(
                    self.room_group_name,
                    {
                        "type": "message",
                        "text": AcceptTaskConsumer.body[0]["body"]
                    }
                )
            else:
                await self.channel_layer.group_send(
                    self.room_group_name,
                    {
                        "type": "message",
                        "text": "No New Tasks"
                    }
                )

        else: #When someone accepts a task-> ack and send next task in queue
            print(json.loads(event["text"])["id"])
            AcceptTaskConsumer.channel.basic_ack(delivery_tag=AcceptTaskConsumer.delivery[json.loads(event["text"])["id"]])
            AcceptTaskConsumer.delivery.pop(json.loads(event["text"])["id"])
            AcceptTaskConsumer.body.pop(0)
            await self.channel_layer.group_send(
                self.room_group_name,
                {
                    "type": "message",
                    "text": "No New Tasks"
                }
            )

            if AcceptTaskConsumer.body:
                await self.channel_layer.group_send(
                    self.room_group_name,
                    {
                        "type": "message",
                        "text": AcceptTaskConsumer.body[0]["body"]
                    }
                )

    async def message(self, event):
        await self.send({
            "type": "websocket.send",
            "text": event["text"]
        })

    @classmethod
    @sync_to_async
    def get_task(cls): #pika consumer
        cls.connection = pika.BlockingConnection(
            pika.ConnectionParameters(host='localhost'))
        cls.channel = cls.connection.channel()

        cls.channel.queue_declare(queue='task_', arguments={"x-max-priority": 3})

        cls.channel.basic_consume(
            queue='task_', on_message_callback=AcceptTaskConsumer.callback, auto_ack=False)
        cls.channel.start_consuming()

    @classmethod
    def callback(cls, ch, method, properties, body):
        task_obj = {"body": json.dumps(body.decode("utf-8")),
                    "delivery_tag": method.delivery_tag}
        AcceptTaskConsumer.body.append(task_obj)
        AcceptTaskConsumer.delivery[json.loads(json.loads(task_obj["body"]))["id"]] = method.delivery_tag
        cls.channel.stop_consuming()

    async def websocket_disconnect(self, event):
        print(event)
        await self.send({
            "type": "websocket.close"
        })

        await self.channel_layer.group_discard(
            self.room_group_name,
            self.channel_name
        )

我很确定这不是正确的方法,因为它没有按预期工作

我经常遇到类似的错误。

  • 169个频道中的39个群组交付量过大
  • pika.exceptions.StreamLostError:流连接丢失:BrokenPipeError(32,'Broken pipe')

我也尝试像this answer一样运行队列侦听器。没事。有经验的人对此有什么想法吗?有没有更好的方法来解决此问题。?

django rabbitmq django-channels pika
1个回答
2
投票

您应该将rabitMQ消费逻辑移出Websocket使用者。

只需运行Django使用者的django command,该使用者可以从RabbitMQ接收消息,然后使用send_group将它们通过组发送到通道。

如果使用django命令,您将需要调用send_group参见https://channels.readthedocs.io/en/latest/topics/channel_layers.html#using-outside-of-consumers

from channels.layers import get_channel_layer

channel_layer = get_channel_layer()

async_to_sync(
    channel_layer.group_send
)(
    "user_type_2",
    {"type": "message", "msg": 123}
)

然后,在websocket使用者中,您应该订阅用户想要/允许获得的组。

© www.soinside.com 2019 - 2024. All rights reserved.