Nestjs Rabbitmq 将 env 变量与 @golevelup/nestjs-rabbitmq 包一起使用

问题描述 投票:0回答:2

我尝试使用 @golevelup/nestjs-rabbitmq 连接到rabbitmq并使用消息。我想在 @RabbitSubscribe 方法中使用环境变量。它们总是未定义,在寻找解决方案时我遇到了这个包:@golevelup/profiguration

我尝试将其全部设置完毕,但无法正常工作:

import { createProfiguration } from '@golevelup/profiguration';
import { Channel } from 'amqp-connection-manager';

interface Config {
 exchange: string;
 routingKey: string;
 queue: string;
 queueOptions: {
   durable: boolean;
 };
 allowNonJsonMessages: boolean;
 createQueueIfNotExists: boolean;
 errorHandler: () => {};
}

export const subscribeConfig = createProfiguration<Config>({
  exchange: { default: 'test', env: process.env.RABBITMQ_EXCHANGE_NAME },
  routingKey: { default: 'test', env: process.env.RABBITMQ_ROUTING_KEY },
  queue: { default: 'test', env: process.env.RABBITMQ_QUEUE_NAME },
  queueOptions: {
  durable: { default: true },
  },
  allowNonJsonMessages: { default: true },
  createQueueIfNotExists: { default: true },
  errorHandler: (channel: Channel, msg: any, error: Error) => {
    console.log(error);
    channel.reject(msg, false);
  },
});


@RabbitSubscribe({
  exchange: subscribeConfig.get('exchange'),
  routingKey: subscribeConfig.get('routingKey'),
  queue: subscribeConfig.get('queue'),
  queueOptions: {
    durable: true,
  },
  allowNonJsonMessages: true,
  createQueueIfNotExists: true,
  errorHandler: (channel: Channel, msg: any, error: Error) => {
    console.log(error);
    channel.reject(msg, false);
  },
})
public async onQueueConsumption(msg: RequestDto, amqpMsg: ConsumeMessage) {
  console.log(msg);
}

当我启动应用程序时,我总是收到此错误:

C:\Source\app\node_modules\convict\src\main.js:679
      throw new Error(output)
            ^
Error: errorHandler:  should be of type Function: value was {}
  at Object.validate (C:\Source\app\node_modules\convict\src\main.js:679:17)
  at exports.createProfiguration (C:\Source\app\libs\profiguration\src\lib\profiguration.ts:162:28)
  at Object.<anonymous> (C:\Source\app\src\app.serviceConfig.ts:17:51)
  at Module._compile (node:internal/modules/cjs/loader:1105:14)
  at Object.Module._extensions..js (node:internal/modules/cjs/loader:1159:10)
  at Module.load (node:internal/modules/cjs/loader:981:32)
  at Function.Module._load (node:internal/modules/cjs/loader:827:12)
  at Module.require (node:internal/modules/cjs/loader:1005:19)
  at require (node:internal/modules/cjs/helpers:102:18)
  at Object.<anonymous> (C:\Source\app\src\app.service.ts:12:1)
  at Module._compile (node:internal/modules/cjs/loader:1105:14)
  at Object.Module._extensions..js (node:internal/modules/cjs/loader:1159:10)
  at Module.load (node:internal/modules/cjs/loader:981:32)
  at Function.Module._load (node:internal/modules/cjs/loader:827:12)
  at Module.require (node:internal/modules/cjs/loader:1005:19)
  at require (node:internal/modules/cjs/helpers:102:18)
rabbitmq nestjs
2个回答
1
投票

我最终没有使用@golevelup/profiguration包。

要在 @RabbitSubscribe 装饰器中使用 ENV 变量,您必须创建一个返回 RabbitMQ 装饰器的工厂函数。在那里您初始化 ENV 值,然后在 RabbitSubscribe 配置中设置常量并返回此配置。

在 AppService 中,您可以使用工厂函数在您的服务中生成 RabbitMQ 装饰器!

如果有人需要环境配置,我使用:ENV Config

这是代码:

import { Nack, RabbitSubscribe } from '@golevelup/nestjs-rabbitmq';
import { Channel } from 'amqp-connection-manager';
import * as dotenv from 'dotenv';
import { getEnvPath } from './common/helper/env.helper';

const envFilePath: string = getEnvPath(`${__dirname}/common/envs`);


export function getRabbitMQDecorator(): ReturnType<typeof RabbitSubscribe> {
  dotenv.config({path:envFilePath});

  const exchange = process.env.RABBITMQ_EXCHANGE_NAME;
  const routingKey = process.env.RABBITMQ_ROUTING_KEY;
  const queue = process.env.RABBITMQ_QUEUE_NAME;

  console.log('Exchange:', exchange);
  console.log('Routing Key:', routingKey);
  console.log('Queue:', queue);

  return RabbitSubscribe({
    exchange,
    routingKey,
    queue,
    queueOptions: {
    durable: true,
  },
  allowNonJsonMessages: true,
  createQueueIfNotExists: true,
  errorHandler: (channel: Channel, msg: any, error: Error) => {
    console.log(error);
    channel.reject(msg, false);
  },
});
}

你这样使用它:

import { Injectable } from '@nestjs/common';
import { Message} from './app.dto';
import { getRabbitMQDecorator } from './rabbitmq.decorator';

const RabbitMQSubscribe = getRabbitMQDecorator();

@Injectable()
export class AppService {

   @RabbitMQSubscribe
   public async onQueueConsumption(msg: Message, amqpMsg: ConsumeMessage) {
     console.log(msg);
   }
}

0
投票

更简单的答案。 第一步,您应该创建带有 RabbitMQ 选项的配置文件。 例如:

    export default registerAs(
  'sfap-rabbitmq',
  () =>
    ({
      exchanges: [
        {
          name: get('SFAP_EXCH').required().asString(),
          type: 'topic', // Change this if needed
          options: {
            durable: true,
          },
        },
      ],
      handlers: {
        handleMessage: {
          exchange: get('SFAP_EXCH').required().asString(),
          routingKey: get('SFAP_ROUTING_KEY').required().asString(),
          queue: get('SFAP_QUEUE').required().asString(),
        },
        handleMessageLog: {
          exchange: get('SFAP_LOG_EXCH').required().asString(),
          routingKey: get('SFAP_LOG_ROUTING_KEY').required().asString(),
          queue: get('SFAP_LOG_QUEUE').required().asString(),
        },
        handleMessageException: {
          exchange: get('SFAP_EXCEPTION_EXCH').required().asString(),
          routingKey: get('SFAP_EXCEPTION_ROUTING_KEY').required().asString(),
          queue: get('SFAP_EXCEPTION_QUEUE').required().asString(),
        },
      },
      uri: get('SFAP_URI').required().asString(),
    } as RabbitMQConfig),
);

哪里

处理消息

是你的类中的名称方法。 下一步您应该在模块中导入您的配置。 示例:

@Module({
  imports: [
    HttpModule,
    DatabaseModule,
    LoggerModule,
    RabbitMQModule.forRootAsync(RabbitMQModule, {
      imports: [ConfigModule],
      useFactory: (config: ConfigService) => config.get('sfap-rabbitmq'),
      inject: [ConfigService],
    }),
    ConfigModule.forRoot({
      load: [urlSend], // Загрузите ваш конфигурационный файл
    }),
  ],
  providers: [RabbitSfap],
  controllers: [],
  exports: [RabbitSfap],
})
export class SfapModule {}

最后一部分被使用。

@RabbitSubscribe({
    name: 'handleMessage',
  })
  async handleMessage(data: SfapRabbitDto): Promise<void> {
    ....Functional
  }
© www.soinside.com 2019 - 2024. All rights reserved.