我正在使用 Nest.js 的微服务,但我遇到了一个问题。
我的需求是向 RabbitMQ 发送消息并将消息日志保存到 mongo,如果 ClientProxy 已连接,mongo 中的 sendMessageStatus 为“ok”,如果 ClientProxy 未连接,mongo 中的 sendMessageStatus 为“fail”。
当我测试我的应用程序时,我关闭了 RabbitMQ 服务器并发送消息,mongo 可以使用 sendMessageStatus 获取此日志:“fail”,之后我重新启动 RabbitMQ 服务器并再次执行,mongo 应该获取 sendMessageStatus:“ok”,但是它不会,仍然“失败”。
这意味着即使 RabbitMQ 服务器重新启动,ClientProxy 也不会重新连接,我注意到 Nest.js 官方文档说“ClientProxy 是懒惰的。”,那么我该如何重新连接 ClientProxy?
jobQueue.module.ts
return ClientProxyFactory.create({
transport: Transport.RMQ,
options: {
urls: [`amqp://${account}:${password}@${IP}:${port}`],
queue: outputQueueName,
serializer: {
serialize: value => value.data,
},
noAck: false,
persistent: true,
queueOptions: {
durable: true,
}
}
});
jobQueue.service.ts
constructor(
@Inject(CONNECTION_NAME)
private readonly client: ClientRMQ,
) {
};
async sendMessage(data: SendMessageDto) {
try {
this.logger.serviceDebug(SENDMESSAGE_METHOD);
data.id = this.messageID++;
return await this.client.connect()
.then(() => {
return this.client.emit('', data)
}).catch(err => {
return this.client.emit('', data)
.pipe(
catchError(connectionError => {
throw connectionError;
})
);
});
} catch (err) {
console.log('catch in job', err);
throw err;
};
};
client.connect() 无济于事。
myService.service.ts
const messageObserver = await this.jobQueueService.sendMessage(MQCLI);
const createdLog: CreateScheduleExecutionLogDto = {
...data,
scheduleID: scheduleID,
schedule: item,
processDatetime: new Date(),
};
messageObserver.subscribe({
next: x => {
console.log(x);
createdLog.processStatus = OK;
this.scheduleExecutionLogModel.create(createdLog);
},
error: e => {
console.log(e);
createdLog.processStatus = ERROR;
this.scheduleExecutionLogModel.create(createdLog);
},
})
this.client.close() 方法允许您删除旧实例。对 this.client 的新请求将创建一个新连接
try{
const recponse = this.client.emit('').pipe(timeout(5000)).toPromise()
}catch(e){
if(e.err.code == 'ECONNREFUSED'){
this.client.close()
}
}