NestJS 微服务,带有无模式的rabbitmq

问题描述 投票:0回答:2

我有一个rabbitmq服务,它有一个消息中没有架构json的队列。

像这样:

{"customFieldOne": "foo", "another": "bar", "numbers": [3, 5, 8]}

但是带有rabbitmq的nestjs需要一个json schema,如下所示:

{"pattern": "my-pattern", "data": {"fieldOne": 1 ...}}

必须有模式和数据字段。

有没有办法在没有定义模式或架构的情况下使用任何 json 消息?

在我的代码中,我使用模式进行消费,但我需要使用 Nestjs 微服务来消费来自rabbitmq 的任何数据。

@MessagePattern('my-pattern')
// @EventPattern('my-pattern')
getNotifications(@Payload() data: number[], @Ctx() context: RmqContext) {
  console.log(data);
  console.log(`Pattern: ${context.getPattern()}`);
}

我该怎么做?

rabbitmq nestjs
2个回答
1
投票

我知道我参加聚会迟到了,但我在 Nest 中使用 MQTT 时遇到了类似的问题。

要删除模式并仅传输数据,我已在

something.module.ts
中添加了此代码。

{
  name: 'HYPERVISOR_CLIENT',
  transport: Transport.MQTT,
  options: {
    url: process.env.MQTT_URL ?? 'mqtt://localhost:1883',
    protocolVersion: 5,
    serializer: {
      serialize(value, options?) {
        return value.data
      },
    },
  },
}

这会产生干净的输出


0
投票

NestJS 及其所有消息传递微服务都非常固执己见 - 无论是 RabbitMQ(实际上是 AMQP,不是 MQTT)[https://docs.nestjs.com/microservices/rabbitmq]、MQTT [https://docs.nestjs] .com/microservices/mqtt] 或自定义传输器 [https://docs.nestjs.com/microservices/custom-transport] - 需要使用 @MessagePattern 或 @EventPattern,具体取决于您分别是发布还是发出。

如果您想向 MQTT 发送消息,而不需要自己定义消息模式或事件模式,则需要绕过 NestJS 标准并使用“npm mqtt”根据第一原则编写代码图书馆。

以下是一个示例,基于我自己的经验,只不过是 https://www.cloudamqp.com/docs/nodejs_mqtt.html#cloudamqp-with-mqtt-and-nodejs 中提供的示例的汇编--getting-startedhttps://github.com/mqttjs/MQTT.js#examplehttps://github.com/mqttjs/MQTT.js#aliased-wildcard-import:

import * as mqtt from 'mqtt';

export class AppService {

  sendToMqtt(p_oData): void {

    const options = {
      port: parseInt(process.env.MQTT_PORT),
      username: process.env.MQTT_USER,
      password: process.env.MQTT_PASS,
      /** 
       * This clientId is recommended by mqtt's own documentation 
       * and Rattlesnake's example.  It is a random value and there is
       * no further reference to it in the documentation.
       */
      clientId: 'mqttjs_' + Math.random().toString(16).substr(2, 8)
    };
    
    const oClient = mqtt.connect(
      `mqtt://${process.env.MQTT_HOST}`,
      options
    );
    
    oClient.on("connect", () => {

      console.info("connected");

      const json = JSON.stringify(p_oData);

      oClient.publish(process.env.MQTT_TOPIC, json, () => {
        console.info("Message is published");
        oClient.end(); // Close the connection when published

        oClient.on("end", () => {
          console.info("Client disconnected");
        });

      });

    });


  }

}

要使上述工作正常进行,您需要提供具有以下设置的 .env(使用您的设置进行修改):

MQTT_USER=<your_username>
MQTT_PASS=<your_password>
MQTT_HOST=<your_mqtt_host_domain>
MQTT_PORT=1883 // This is the standard MQTT port
MQTT_TOPIC=<the_topic_you_want_to_publish_against>

以防万一不清楚,并且您还没有这样做,您将需要安装 mqtt(尽管如果您已经尝试将 NestJS 微服务用于 RabbitMQ 或 MQTT,那么您可能会这样做:

npm install mqtt --save
© www.soinside.com 2019 - 2024. All rights reserved.