我尝试使用 @golevelup/nestjs-rabbitmq 连接到rabbitmq并使用消息。我想在 @RabbitSubscribe 方法中使用环境变量。它们总是未定义,在寻找解决方案时我遇到了这个包:@golevelup/profiguration。
我尝试将其全部设置完毕,但无法正常工作:
import { createProfiguration } from '@golevelup/profiguration';
import { Channel } from 'amqp-connection-manager';
interface Config {
exchange: string;
routingKey: string;
queue: string;
queueOptions: {
durable: boolean;
};
allowNonJsonMessages: boolean;
createQueueIfNotExists: boolean;
errorHandler: () => {};
}
export const subscribeConfig = createProfiguration<Config>({
exchange: { default: 'test', env: process.env.RABBITMQ_EXCHANGE_NAME },
routingKey: { default: 'test', env: process.env.RABBITMQ_ROUTING_KEY },
queue: { default: 'test', env: process.env.RABBITMQ_QUEUE_NAME },
queueOptions: {
durable: { default: true },
},
allowNonJsonMessages: { default: true },
createQueueIfNotExists: { default: true },
errorHandler: (channel: Channel, msg: any, error: Error) => {
console.log(error);
channel.reject(msg, false);
},
});
@RabbitSubscribe({
exchange: subscribeConfig.get('exchange'),
routingKey: subscribeConfig.get('routingKey'),
queue: subscribeConfig.get('queue'),
queueOptions: {
durable: true,
},
allowNonJsonMessages: true,
createQueueIfNotExists: true,
errorHandler: (channel: Channel, msg: any, error: Error) => {
console.log(error);
channel.reject(msg, false);
},
})
public async onQueueConsumption(msg: RequestDto, amqpMsg: ConsumeMessage) {
console.log(msg);
}
当我启动应用程序时,我总是收到此错误:
C:\Source\app\node_modules\convict\src\main.js:679
throw new Error(output)
^
Error: errorHandler: should be of type Function: value was {}
at Object.validate (C:\Source\app\node_modules\convict\src\main.js:679:17)
at exports.createProfiguration (C:\Source\app\libs\profiguration\src\lib\profiguration.ts:162:28)
at Object.<anonymous> (C:\Source\app\src\app.serviceConfig.ts:17:51)
at Module._compile (node:internal/modules/cjs/loader:1105:14)
at Object.Module._extensions..js (node:internal/modules/cjs/loader:1159:10)
at Module.load (node:internal/modules/cjs/loader:981:32)
at Function.Module._load (node:internal/modules/cjs/loader:827:12)
at Module.require (node:internal/modules/cjs/loader:1005:19)
at require (node:internal/modules/cjs/helpers:102:18)
at Object.<anonymous> (C:\Source\app\src\app.service.ts:12:1)
at Module._compile (node:internal/modules/cjs/loader:1105:14)
at Object.Module._extensions..js (node:internal/modules/cjs/loader:1159:10)
at Module.load (node:internal/modules/cjs/loader:981:32)
at Function.Module._load (node:internal/modules/cjs/loader:827:12)
at Module.require (node:internal/modules/cjs/loader:1005:19)
at require (node:internal/modules/cjs/helpers:102:18)
我最终没有使用@golevelup/profiguration包。
要在 @RabbitSubscribe 装饰器中使用 ENV 变量,您必须创建一个返回 RabbitMQ 装饰器的工厂函数。在那里您初始化 ENV 值,然后在 RabbitSubscribe 配置中设置常量并返回此配置。
在 AppService 中,您可以使用工厂函数在您的服务中生成 RabbitMQ 装饰器!
如果有人需要环境配置,我使用:ENV Config
这是代码:
import { Nack, RabbitSubscribe } from '@golevelup/nestjs-rabbitmq';
import { Channel } from 'amqp-connection-manager';
import * as dotenv from 'dotenv';
import { getEnvPath } from './common/helper/env.helper';
const envFilePath: string = getEnvPath(`${__dirname}/common/envs`);
export function getRabbitMQDecorator(): ReturnType<typeof RabbitSubscribe> {
dotenv.config({path:envFilePath});
const exchange = process.env.RABBITMQ_EXCHANGE_NAME;
const routingKey = process.env.RABBITMQ_ROUTING_KEY;
const queue = process.env.RABBITMQ_QUEUE_NAME;
console.log('Exchange:', exchange);
console.log('Routing Key:', routingKey);
console.log('Queue:', queue);
return RabbitSubscribe({
exchange,
routingKey,
queue,
queueOptions: {
durable: true,
},
allowNonJsonMessages: true,
createQueueIfNotExists: true,
errorHandler: (channel: Channel, msg: any, error: Error) => {
console.log(error);
channel.reject(msg, false);
},
});
}
你这样使用它:
import { Injectable } from '@nestjs/common';
import { Message} from './app.dto';
import { getRabbitMQDecorator } from './rabbitmq.decorator';
const RabbitMQSubscribe = getRabbitMQDecorator();
@Injectable()
export class AppService {
@RabbitMQSubscribe
public async onQueueConsumption(msg: Message, amqpMsg: ConsumeMessage) {
console.log(msg);
}
}
更简单的答案。 第一步,您应该创建带有 RabbitMQ 选项的配置文件。 例如:
export default registerAs(
'sfap-rabbitmq',
() =>
({
exchanges: [
{
name: get('SFAP_EXCH').required().asString(),
type: 'topic', // Change this if needed
options: {
durable: true,
},
},
],
handlers: {
handleMessage: {
exchange: get('SFAP_EXCH').required().asString(),
routingKey: get('SFAP_ROUTING_KEY').required().asString(),
queue: get('SFAP_QUEUE').required().asString(),
},
handleMessageLog: {
exchange: get('SFAP_LOG_EXCH').required().asString(),
routingKey: get('SFAP_LOG_ROUTING_KEY').required().asString(),
queue: get('SFAP_LOG_QUEUE').required().asString(),
},
handleMessageException: {
exchange: get('SFAP_EXCEPTION_EXCH').required().asString(),
routingKey: get('SFAP_EXCEPTION_ROUTING_KEY').required().asString(),
queue: get('SFAP_EXCEPTION_QUEUE').required().asString(),
},
},
uri: get('SFAP_URI').required().asString(),
} as RabbitMQConfig),
);
哪里
处理消息
是你的类中的名称方法。 下一步您应该在模块中导入您的配置。 示例:
@Module({
imports: [
HttpModule,
DatabaseModule,
LoggerModule,
RabbitMQModule.forRootAsync(RabbitMQModule, {
imports: [ConfigModule],
useFactory: (config: ConfigService) => config.get('sfap-rabbitmq'),
inject: [ConfigService],
}),
ConfigModule.forRoot({
load: [urlSend], // Загрузите ваш конфигурационный файл
}),
],
providers: [RabbitSfap],
controllers: [],
exports: [RabbitSfap],
})
export class SfapModule {}
最后一部分被使用。
@RabbitSubscribe({
name: 'handleMessage',
})
async handleMessage(data: SfapRabbitDto): Promise<void> {
....Functional
}