也许我在开始使用 Nestjs 时错过了一些基本的事情,但不知何故,我无法使用以下方式让 Nest.js 以 Pub/Sub 模式接收消息:https://github.com/golevelup/nestjs 。我像这样创建消息服务:
@Module({
imports: [
RabbitMQModule.forRoot(RabbitMQModule, {
exchanges: [
{
name: 'tx-nextor',
type: 'topic',
},
],
uri: RABBIT_URI,
channels: {
'channel-1': {
prefetchCount: 15,
default: true,
},
},
}),
MessagingModule,
],
providers: [],
controllers: [],
})
export class MessagingModule {}
一切都连接正确,我可以在 RabbitMQ 管理中看到连接和通道。创建订阅者后:
@Injectable()
export class ResourceService {
@RabbitSubscribe({
exchange: 'tx-nextor',
routingKey: 'resource.*',
queue: 'resource-history-resource.*.updated',
})
public async updatedHandler(msg: {}, amqpMsg: ConsumeMessage) {
console.log('Subscribe handler ran');
console.log(JSON.stringify(msg));
console.log(`Correlation id: ${amqpMsg.properties.correlationId}`);
return 'test';
}
}
这也连接:
[resource-history] [Nest] 220 - 07/09/2022, 4:30:47 PM LOG [RoutesResolver] AppController {/}: +8ms
[resource-history] [Nest] 220 - 07/09/2022, 4:30:47 PM LOG [RouterExplorer] Mapped {/, GET} route +3ms
[resource-history] [Nest] 220 - 07/09/2022, 4:30:47 PM LOG [RoutesResolver] HistoryController {/history}: +1ms
[resource-history] [Nest] 220 - 07/09/2022, 4:30:47 PM LOG [RouterExplorer] Mapped {/history/appointment, GET} route +1ms
[resource-history] [Nest] 220 - 07/09/2022, 4:30:47 PM LOG [RouterExplorer] Mapped {/history/test, POST} route +1ms
[resource-history] [Nest] 220 - 07/09/2022, 4:30:47 PM LOG [RabbitMQModule] Initializing RabbitMQ Handlers
[resource-history] [Nest] 220 - 07/09/2022, 4:30:47 PM LOG [RabbitMQModule] Registering rabbitmq handlers from ResourceService
[resource-history] [Nest] 220 - 07/09/2022, 4:30:47 PM LOG [RabbitMQModule] ResourceService.updatedHandler {subscribe} -> tx-nextor::resource.*::resource-history-resource.*.updated
[resource-history] [Nest] 220 - 07/09/2022, 4:30:48 PM LOG [NestApplication] Nest application successfully started +11ms
将 RabbitMQModule 添加到模块定义中的导出中解决了我的问题。
参考:RabbitMQ 和 NestJS 的问题。我无法使用 Nestjs-rabbitmq 和 NestJS 发布消息
@Module({
imports: [
RabbitMQModule.forRoot(RabbitMQModule, {
exchanges: [
{
name: 'tx-nextor',
type: 'topic',
},
],
uri: RABBIT_URI,
channels: {
'channel-1': {
prefetchCount: 15,
default: true,
},
},
}),
MessagingModule,
],
providers: [],
controllers: [],
exports: [RabbitMQModule],
})
export class MessagingModule {}
此外,在 RabbitMQ 管理中,确保有效负载的值中有“”,因此它将是“测试”与测试(发送 JSON 也应该有效)
希望这有帮助!