我有一个rabbitmq服务,它有一个消息中没有架构json的队列。
像这样:
{"customFieldOne": "foo", "another": "bar", "numbers": [3, 5, 8]}
但是带有rabbitmq的nestjs需要一个json schema,如下所示:
{"pattern": "my-pattern", "data": {"fieldOne": 1 ...}}
必须有模式和数据字段。
有没有办法在没有定义模式或架构的情况下使用任何 json 消息?
在我的代码中,我使用模式进行消费,但我需要使用 Nestjs 微服务来消费来自rabbitmq 的任何数据。
@MessagePattern('my-pattern')
// @EventPattern('my-pattern')
getNotifications(@Payload() data: number[], @Ctx() context: RmqContext) {
console.log(data);
console.log(`Pattern: ${context.getPattern()}`);
}
我该怎么做?
我知道我参加聚会迟到了,但我在 Nest 中使用 MQTT 时遇到了类似的问题。
要删除模式并仅传输数据,我已在
something.module.ts
中添加了此代码。
{
name: 'HYPERVISOR_CLIENT',
transport: Transport.MQTT,
options: {
url: process.env.MQTT_URL ?? 'mqtt://localhost:1883',
protocolVersion: 5,
serializer: {
serialize(value, options?) {
return value.data
},
},
},
}
这会产生干净的输出
NestJS 及其所有消息传递微服务都非常固执己见 - 无论是 RabbitMQ(实际上是 AMQP,不是 MQTT)[https://docs.nestjs.com/microservices/rabbitmq]、MQTT [https://docs.nestjs] .com/microservices/mqtt] 或自定义传输器 [https://docs.nestjs.com/microservices/custom-transport] - 需要使用 @MessagePattern 或 @EventPattern,具体取决于您分别是发布还是发出。
如果您想向 MQTT 发送消息,而不需要自己定义消息模式或事件模式,则需要绕过 NestJS 标准并使用“npm mqtt”根据第一原则编写代码图书馆。
以下是一个示例,基于我自己的经验,只不过是 https://www.cloudamqp.com/docs/nodejs_mqtt.html#cloudamqp-with-mqtt-and-nodejs 中提供的示例的汇编--getting-started 和 https://github.com/mqttjs/MQTT.js#example 和 https://github.com/mqttjs/MQTT.js#aliased-wildcard-import:
import * as mqtt from 'mqtt';
export class AppService {
sendToMqtt(p_oData): void {
const options = {
port: parseInt(process.env.MQTT_PORT),
username: process.env.MQTT_USER,
password: process.env.MQTT_PASS,
/**
* This clientId is recommended by mqtt's own documentation
* and Rattlesnake's example. It is a random value and there is
* no further reference to it in the documentation.
*/
clientId: 'mqttjs_' + Math.random().toString(16).substr(2, 8)
};
const oClient = mqtt.connect(
`mqtt://${process.env.MQTT_HOST}`,
options
);
oClient.on("connect", () => {
console.info("connected");
const json = JSON.stringify(p_oData);
oClient.publish(process.env.MQTT_TOPIC, json, () => {
console.info("Message is published");
oClient.end(); // Close the connection when published
oClient.on("end", () => {
console.info("Client disconnected");
});
});
});
}
}
要使上述工作正常进行,您需要提供具有以下设置的 .env(使用您的设置进行修改):
MQTT_USER=<your_username>
MQTT_PASS=<your_password>
MQTT_HOST=<your_mqtt_host_domain>
MQTT_PORT=1883 // This is the standard MQTT port
MQTT_TOPIC=<the_topic_you_want_to_publish_against>
以防万一不清楚,并且您还没有这样做,您将需要安装 mqtt(尽管如果您已经尝试将 NestJS 微服务用于 RabbitMQ 或 MQTT,那么您可能会这样做:
npm install mqtt --save