Spring Boot RabbitMQ - 如何减少许多主题(事件)的样板?

问题描述 投票:0回答:1

我想知道在SpringBoot中初始化许多RabbitMQ队列/绑定时是否有办法减少样板代码的数量?

遵循事件驱动的方法,我的应用程序产生了50种类型的事件(稍后将分成几个较小的应用程序,但仍然)。每个事件都与“主题”类型交换。某些事件正在被其他应用程序消耗,某些事件由发送它们的同一应用程序另外消耗。

让我们考虑一下发布和自我消费的案例。

在Spring Boot中,我需要声明每个事件:

  1. 在配置中路由密钥名称(如“event.item.purchased”)
  2. 队列名称在同一个应用程序中使用该事件(“queue.event.item.purchased”)
  3. 匹配配置属性类字段或变量itemPurchasedRoutingKey或代码中的常量来保存属性名称(如$ {event.item.purchased})
  4. 用于创建队列的bean(名称具有事件名称),例如itemPurchasedQueue
  5. 用于绑定创建的bean(名称具有事件名称)和路由键名称。像itemPurchasedBinding一样,用itemPurchasedQueue.bind(... itemPurchasedRoutingKey)构造
  6. RabbitListener for event,带有包含队列名称的注释(无法在运行时定义)

所以 - 以一种或另一种形式提到“购买商品”的6个地方。样板代码的数量只是杀了我:)如果有50个事件,很容易犯错误 - 添加新事件时,你需要记住将它添加到6个地方。

理想情况下,对于每个我想要的活动:

  1. 在config中指定路由密钥。可以通过附加公共前缀(特定于应用程序)来构建队列名称。
  2. 使用一些注释或替代RabbitListener,它自动声明队列(通过路由键+前缀),绑定到它,并侦听事件。

有没有办法优化它?我考虑过自定义注释,但RabbitListener不喜欢动态队列名称,如果我在一些util方法中声明它们,则spring boot无法找到队列和绑定的bean。也许有办法在代码中声明所有这些东西,但它不是Spring方式,我相信:)

spring spring-boot rabbitmq
1个回答
0
投票

所以我最终使用手动bean声明并为每个bean使用1个bind()方法

@Configuration
@EnableConfigurationProperties(RabbitProperties::class)
class RabbitConfiguration(
    private val properties: RabbitProperties,
    private val connectionFactory: ConnectionFactory
) {

    @Bean
    fun admin() = RabbitAdmin(connectionFactory)

    @Bean
    fun exchange() = TopicExchange(properties.template.exchange)

    @Bean
    fun rabbitMessageConverter() = Jackson2JsonMessageConverter(
        jacksonObjectMapper()
            .registerModule(JavaTimeModule())
            .registerModule(Jdk8Module())
            .disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES)
            .enable(DeserializationFeature.READ_UNKNOWN_ENUM_VALUES_AS_NULL)
    )

    @Value("\${okko.rabbit.queue-prefix}")
    lateinit var queuePrefix: String

    fun <T> bind(routingKey: String, listener: (T) -> Mono<Void>): SimpleMessageListenerContainer {
        val queueName = "$queuePrefix.$routingKey"
        val queue = Queue(queueName)
        admin().declareQueue(queue)
        admin().declareBinding(BindingBuilder.bind(queue).to(exchange()).with(routingKey)!!)

        val container = SimpleMessageListenerContainer(connectionFactory)
        container.addQueueNames(queueName)
        container.setMessageListener(MessageListenerAdapter(MessageHandler(listener), rabbitMessageConverter()))
        return container
    }

    internal class MessageHandler<T>(private val listener: (T) -> Mono<Void>) {

        // NOTE: don't change name of this method, rabbit needs it
        fun handleMessage(message: T) {
            listener.invoke(message).subscribeOn(Schedulers.elastic()).subscribe()
        }
    }
}


@Service
@Configuration
class EventConsumerRabbit(
    private val config: RabbitConfiguration,
    private val routingKeys: RabbitEventRoutingKeyConfig
) {

    @Bean
    fun event1() = handle(routingKeys.event1)

    @Bean
    fun event2() = handle(routingKeys.event2)

    ...

    private fun<T> handle(routingKey: String): Mono<Void> = config.bind<T>(routingKey) {
        log.debug("consume rabbit event: $it")
        ... // handle event, return Mono<Void>
    }

    companion object {
        private val log by logger()
    }
}

@Configuration
@ConfigurationProperties("my.rabbit.routing-key.event")
class RabbitEventRoutingKeyConfig {
    lateinit var event1: String
    lateinit var event2: String
    ...
}
© www.soinside.com 2019 - 2024. All rights reserved.