我目前在春季使用 Reactive Redis pub/sub,我想取消订阅
如果我在使用
ReactiveRedistemplate.listenChannel()
或 ReactiveReddisMessageListerContainer
时遇到套接字断开等某些情况,您知道如何取消我的订阅吗?
在redis pub/sub环境中(不是反应式redis),我理解你可以通过提供RedisMessageListenerContainer的removeListener来取消订阅(
RedisMessageListenerContainer.removeListener()
)
@Service
class RedisMessageSubscriber(
private val redisTemplate: ReactiveRedisTemplate<String, Any>,
private val reactiveRedisMessageListenerContainer: ReactiveRedisMessageListenerContainer
) {
private val log = logger()
fun subscribe(channel: String, ctx: ChannelHandlerContext) {
val subscribe = redisTemplate.listenToChannel(channel)
.map {
MapperUtils.readJsonValueOrThrow(it.message.toString(), ChatDto::class)
}
.subscribe { it ->
ctx.channel().writeAndFlush(CustomResponse(ResponseType.RECEIVE_CHAT, it))
.addListener {
if (it.isSuccess) {
log.info("write channel topic: $channel")
} else {
ctx.channel().close()
log.warn("Failed to send the message to the client. roomId:${channel}, message:${it.cause()}, socketOpen: ${ctx.channel().isOpen}")
throw IllegalArgumentException("socket close")
}
}
}
log.info("isDispose: ${subscribe.isDisposed}")
}
}
我以为如果socket通道被关闭,Redis的订阅也会被取消,但它仍然存在。
免责声明:我无法在文档中找到有关如何处理此问题的好示例,但当我在本地测试时它对我有用。
由于您的示例显示了
ChannelHandlerContext
的使用(我假设与 Netty 中使用的相同),那么我假设您正在使用 Lettuce 驱动程序。
在这种情况下,您可以使用
RedisConnectionStateListener
配置连接工厂来挂钩断开连接事件,并使用 .destroy()
的 .activeSubscriptions
或 ReactiveRedisMessageListenerContainer
方法:
@Bean
fun reactiveRedisMessageListenerContainer(
reactiveRedisConnectionFactory: LettuceConnectionFactory?
): ReactiveRedisMessageListenerContainer {
val container = ReactiveRedisMessageListenerContainer(reactiveRedisConnectionFactory!!)
val connectionListener = object : RedisConnectionStateAdapter() {
override fun onRedisDisconnected(connection: RedisChannelHandler<*, *>?) {
container.activeSubscriptions.forEach {
val channels =
it.channels.stream().map { channel -> String(channel.array()) }.reduce { acc, s -> "$acc, $s" }
LOGGER.info("Canceling subscription for ${channels.get()} ")
it.cancel().subscribe()
}
//or you can use container.destroy() alternatively
}
}
reactiveRedisConnectionFactory.start()
reactiveRedisConnectionFactory.nativeClient?.addListener(connectionListener)
return container
}
@Bean
@Primary
fun reactiveRedisConnectionFactory(
): LettuceConnectionFactory {
val redisConfiguration = RedisStandaloneConfiguration(host, port)
val lettuceConnectionFactory = LettuceConnectionFactory(redisConfiguration)
return lettuceConnectionFactory
}
注意: spring-data-redis 在这种情况下的工作方式是,它有目的地不取消订阅频道,以便在重新建立连接时能够重新订阅。我认为这不是应用程序在这种情况下必须做的事情,但对于大多数情况来说,这将是正确的方法。