我正在使用rabbitmq和nestjs。我需要将消息从一个队列复制到另一个队列。我在rabbitmq上建立了一个交换以使其正常工作。但是如何改变nestjs内部rabbitmq的交换呢?
我的API网关
我当前在nestjs中的rabbitmq配置:
constructor( ) {
this.rabbitmq = ClientProxyFactory.create({
transport: Transport.RMQ,
options: {
urls: [`amqp://${this.configService.get<string>('RABBITMQ_USER')}:${this.configService.get<string>('RABBITMQ_PASSWORD')}@${this.configService.get<string>('RABBITMQ_URL')}`],
queue: 'students'
}
})
}
createStudent(@Body() body: CreateStudentDto): Observable<any> {
return this.rabbitmq.send('createStudent', body)
}
我的客户
@MessagePattern('createStudent')
async createStudent(@Payload() student: Student, @Ctx() context: RmqContext) {
const channel = context.getChannelRef()
const originalMsg = context.getMessage()
try {
let response = await this.studentService.createStudent(student)
await channel.ack(originalMsg)
return response;
} catch(error) {
this.logger.log(`error: ${JSON.stringify(error.message)}`)
const filterAckError = ackErrors.filter(ackError => error.message.includes(ackError))
if (filterAckError.length > 0) {
await channel.ack(originalMsg)
}
}
}
我需要将消息发送到两个队列。
这很有趣,但是 amqplib 不具备 amqp 协议用于交易所的完整功能。
或者,使用第三方库。
例如:https://www.npmjs.com/package/nestjs-rmq