使用 Spring Reactive Redis pub/sub 时如何取消订阅(断开与频道的连接)?

问题描述 投票:0回答:1

我目前在春季使用 Reactive Redis pub/sub,我想取消订阅

如果我在使用

ReactiveRedistemplate.listenChannel()
ReactiveReddisMessageListerContainer
时遇到套接字断开等某些情况,您知道如何取消我的订阅吗?

在redis pub/sub环境中(不是反应式redis),我理解你可以通过提供RedisMessageListenerContainer的removeListener来取消订阅(

RedisMessageListenerContainer.removeListener()
)

@Service
class RedisMessageSubscriber(
    private val redisTemplate: ReactiveRedisTemplate<String, Any>,
    private val reactiveRedisMessageListenerContainer: ReactiveRedisMessageListenerContainer
) {
    private val log = logger()

    fun subscribe(channel: String, ctx: ChannelHandlerContext) {
        val subscribe = redisTemplate.listenToChannel(channel)
            .map {
                MapperUtils.readJsonValueOrThrow(it.message.toString(), ChatDto::class)
            }
            .subscribe { it ->
                ctx.channel().writeAndFlush(CustomResponse(ResponseType.RECEIVE_CHAT, it))
                    .addListener {
                        if (it.isSuccess) {
                            log.info("write channel topic: $channel")
                        } else {
                            ctx.channel().close()
                            log.warn("Failed to send the message to the client. roomId:${channel}, message:${it.cause()}, socketOpen: ${ctx.channel().isOpen}")
                            throw IllegalArgumentException("socket close")
                        }
                    }
            }
        log.info("isDispose: ${subscribe.isDisposed}")
    }
}

我以为如果socket通道被关闭,Redis的订阅也会被取消,但它仍然存在。

sockets redis spring-webflux publish-subscribe
1个回答
0
投票

免责声明:我无法在文档中找到有关如何处理此问题的好示例,但当我在本地测试时它对我有用。

由于您的示例显示了

ChannelHandlerContext
的使用(我假设与 Netty 中使用的相同),那么我假设您正在使用 Lettuce 驱动程序。

在这种情况下,您可以使用

RedisConnectionStateListener
配置连接工厂来挂钩断开连接事件,并使用
.destroy()
.activeSubscriptions
ReactiveRedisMessageListenerContainer
方法:


    @Bean
    fun reactiveRedisMessageListenerContainer(
        reactiveRedisConnectionFactory: LettuceConnectionFactory?
    ): ReactiveRedisMessageListenerContainer {
        val container = ReactiveRedisMessageListenerContainer(reactiveRedisConnectionFactory!!)
        val connectionListener = object : RedisConnectionStateAdapter() {
            override fun onRedisDisconnected(connection: RedisChannelHandler<*, *>?) {
                container.activeSubscriptions.forEach {
                    val channels =
                        it.channels.stream().map { channel -> String(channel.array()) }.reduce { acc, s -> "$acc, $s" }
                    LOGGER.info("Canceling subscription for ${channels.get()} ")
                    it.cancel().subscribe()
                }
                //or you can use container.destroy() alternatively
            }
        }

        reactiveRedisConnectionFactory.start()
        reactiveRedisConnectionFactory.nativeClient?.addListener(connectionListener)

        return container
    }

    @Bean
    @Primary
    fun reactiveRedisConnectionFactory(
    ): LettuceConnectionFactory {
        val redisConfiguration = RedisStandaloneConfiguration(host, port)
        val lettuceConnectionFactory = LettuceConnectionFactory(redisConfiguration)
        return lettuceConnectionFactory
    }

注意: spring-data-redis 在这种情况下的工作方式是,它有目的地不取消订阅频道,以便在重新建立连接时能够重新订阅。我认为这不是应用程序在这种情况下必须做的事情,但对于大多数情况来说,这将是正确的方法。

© www.soinside.com 2019 - 2024. All rights reserved.