Nest.js ClientProxy 重新连接

问题描述 投票:0回答:1

我正在使用 Nest.js 的微服务,但我遇到了一个问题。

我的需求是向 RabbitMQ 发送消息并将消息日志保存到 mongo,如果 ClientProxy 已连接,mongo 中的 sendMessageStatus 为“ok”,如果 ClientProxy 未连接,mongo 中的 sendMessageStatus 为“fail”。

当我测试我的应用程序时,我关闭了 RabbitMQ 服务器并发送消息,mongo 可以使用 sendMessageStatus 获取此日志:“fail”,之后我重新启动 RabbitMQ 服务器并再次执行,mongo 应该获取 sendMessageStatus:“ok”,但是它不会,仍然“失败”。

这意味着即使 RabbitMQ 服务器重新启动,ClientProxy 也不会重新连接,我注意到 Nest.js 官方文档说“ClientProxy 是懒惰的。”,那么我该如何重新连接 ClientProxy?

jobQueue.module.ts

return ClientProxyFactory.create({
            transport: Transport.RMQ,
            options: {
              urls: [`amqp://${account}:${password}@${IP}:${port}`],
              queue: outputQueueName,
              serializer: {
                serialize: value => value.data,
              },
              noAck: false,
              persistent: true,
              queueOptions: {
                durable: true,
              }
            }
          });

jobQueue.service.ts

constructor(
        @Inject(CONNECTION_NAME)
        private readonly client: ClientRMQ,
    ) {
    };
async sendMessage(data: SendMessageDto) {
        try {
            this.logger.serviceDebug(SENDMESSAGE_METHOD);
            data.id = this.messageID++;
            return await this.client.connect()
                .then(() => {
                    return this.client.emit('', data)
                }).catch(err => {
                    return this.client.emit('', data)
                        .pipe(
                            catchError(connectionError => {
                                throw connectionError;
                            })
                        );
                });
        } catch (err) {
            console.log('catch in job', err);
            throw err;
        };
    };

client.connect() 无济于事。

myService.service.ts

const messageObserver = await this.jobQueueService.sendMessage(MQCLI);
                            const createdLog: CreateScheduleExecutionLogDto = {
                                ...data,
                                scheduleID: scheduleID,
                                schedule: item,
                                processDatetime: new Date(),
                            };
                            messageObserver.subscribe({
                                next: x => {
                                    console.log(x);
                                    createdLog.processStatus = OK;
                                    this.scheduleExecutionLogModel.create(createdLog);
                                },
                                error: e => {
                                    console.log(e);
                                    createdLog.processStatus = ERROR;
                                    this.scheduleExecutionLogModel.create(createdLog);
                                },
                            })
node.js rabbitmq nestjs
1个回答
0
投票

this.client.close() 方法允许您删除旧实例。对 this.client 的新请求将创建一个新连接

try{
    const recponse = this.client.emit('').pipe(timeout(5000)).toPromise()
}catch(e){
    if(e.err.code == 'ECONNREFUSED'){
        this.client.close()
    }
}
© www.soinside.com 2019 - 2024. All rights reserved.