Spring Redis Stream消费者停止消费消息(地址已在使用中)

问题描述 投票:0回答:2

我尝试使用 spring-data-redis 测试 redis 流。 我已经实现了两个应用程序 - 第一个应用程序将记录添加到流中,第二个应用程序使用流中的消息。

它可以工作,但一段时间后(通常在已经处理了 80000 多条消息之后),“消费者应用程序”抛出异常:“org.springframework.data.redis.RedisConnectionFailureException:无法连接到 Redis;嵌套异常是 java.lang. util.concurrent.CompletionException:io.lettuce.core.RedisConnectionException:无法连接到本地主机:6379

我在Ubuntu(win10 linux子系统)上安装了Redis 6.2.1并使用默认配置运行(端口为6379)。当我在 Docker 中运行 Redis 时,也会发生同样的情况。

我测试了Redis的性能,因此消息是循环添加的,没有延迟。 我将消息添加到流中:

ObjectRecord<String, String> record = StreamRecords.newRecord()
                .ofObject(message)
                .withStreamKey("my-stream");

redisTemplate.opsForStream()
                .add(record);

我将消息消费为:

StreamReceiver.StreamReceiverOptions<String, MapRecord<String, String, String>> options =
                StreamReceiver.StreamReceiverOptions.builder()
                        .pollTimeout(Duration.ofSeconds(2))
                        .build();

StreamReceiver<String, MapRecord<String, String, String>> receiver = StreamReceiver.create(connectionFactory, options);
        Flux<MapRecord<String, String, String>> messages = receiver.receive(StreamOffset.fromStart("my-tream"));

messages.subscribe(new StreamSubscriber());

其中

StreamSubscriber
只是我的
org.reactivestreams.Subscriber
实现。我也尝试过
StreamMessageListenerContainer
方法,但结果是一样的。

org.springframework.data.redis.RedisConnectionFailureException: Unable to connect to Redis; nested exception is java.util.concurrent.CompletionException: io.lettuce.core.RedisConnectionException: Unable to connect to localhost:6379
    at org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory$ExceptionTranslatingConnectionProvider.translateException(LettuceConnectionFactory.java:1553)
    at org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory$ExceptionTranslatingConnectionProvider.lambda$getConnectionAsync$0(LettuceConnectionFactory.java:1491)
    at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
    at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
    at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
    at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
    at io.lettuce.core.DefaultConnectionFuture.lambda$null$0(DefaultConnectionFuture.java:257)
    at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
    at java.base/java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:883)
    at java.base/java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2251)
    at java.base/java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:143)
    at io.lettuce.core.DefaultConnectionFuture.lambda$thenCompose$1(DefaultConnectionFuture.java:254)
    at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
    at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
    at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
    at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088)
    at io.lettuce.core.AbstractRedisClient.lambda$initializeChannelAsync0$4(AbstractRedisClient.java:405)
    at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:578)
    at io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:571)
    at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:550)
    at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:491)
    at io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:616)
    at io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:609)
    at io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:117)
    at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:321)
    at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:337)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:707)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.util.concurrent.CompletionException: io.lettuce.core.RedisConnectionException: Unable to connect to localhost:6379
    at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331)
    at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346)
    at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:632)
    ... 30 more
Caused by: io.lettuce.core.RedisConnectionException: Unable to connect to localhost:6379
    at io.lettuce.core.RedisConnectionException.create(RedisConnectionException.java:78)
    at io.lettuce.core.RedisConnectionException.create(RedisConnectionException.java:56)
    at io.lettuce.core.RedisClient.lambda$transformAsyncConnectionException$20(RedisClient.java:767)
    at io.lettuce.core.DefaultConnectionFuture.lambda$thenCompose$1(DefaultConnectionFuture.java:253)
    ... 22 more
Caused by: java.util.concurrent.CompletionException: io.netty.channel.AbstractChannel$AnnotatedSocketException: Address already in use: no further information: localhost/127.0.0.1:6379
    at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:331)
    at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:346)
    at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:632)
    ... 20 more
Caused by: io.netty.channel.AbstractChannel$AnnotatedSocketException: Address already in use: no further information: localhost/127.0.0.1:6379
Caused by: java.net.BindException: Address already in use: no further information
    at java.base/sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
    at java.base/sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:779)
    at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:330)
    at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:334)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:707)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.base/java.lang.Thread.run(Thread.java:834)

RedisTemplate
LettuceConnectionFactory
是由springboot创建和自动配置的(我也尝试过一些自定义配置,但没有成功)

我还观察到,redis 连接在消息发送到流后关闭,并且在发送新消息时创建新连接。

2021-04-11 09:12:11.455 DEBUG 20952 --- [input/provider1] sk.kedros.learn.camel.redis.Publisher    : sending message: {"obuSn":"eeee00a5-616e-46ad-80e4-50780fab1336","lon":0.0,"lat":0.0,"data":"99761 - provider1"}
2021-04-11 09:12:11.455 DEBUG 20952 --- [input/provider1] o.s.d.redis.core.RedisConnectionUtils    : Fetching Redis Connection from RedisConnectionFactory
2021-04-11 09:12:11.455 DEBUG 20952 --- [input/provider1] io.lettuce.core.RedisChannelHandler      : dispatching command AsyncCommand [type=XADD, output=StatusOutput [output=null, error='null'], commandType=io.lettuce.core.protocol.Command]
2021-04-11 09:12:11.455 DEBUG 20952 --- [input/provider1] i.lettuce.core.protocol.DefaultEndpoint  : [channel=0xbb31e231, /127.0.0.1:51322 -> localhost/127.0.0.1:6379, epid=0x1] write() writeAndFlush command AsyncCommand [type=XADD, output=StatusOutput [output=null, error='null'], commandType=io.lettuce.core.protocol.Command]
2021-04-11 09:12:11.455 DEBUG 20952 --- [input/provider1] i.lettuce.core.protocol.DefaultEndpoint  : [channel=0xbb31e231, /127.0.0.1:51322 -> localhost/127.0.0.1:6379, epid=0x1] write() done
2021-04-11 09:12:11.455 DEBUG 20952 --- [ioEventLoop-4-1] io.lettuce.core.protocol.CommandHandler  : [channel=0xbb31e231, /127.0.0.1:51322 -> localhost/127.0.0.1:6379, chid=0x1] write(ctx, AsyncCommand [type=XADD, output=StatusOutput [output=null, error='null'], commandType=io.lettuce.core.protocol.Command], promise)
2021-04-11 09:12:11.455 DEBUG 20952 --- [ioEventLoop-4-1] io.lettuce.core.protocol.CommandEncoder  : [channel=0xbb31e231, /127.0.0.1:51322 -> localhost/127.0.0.1:6379] writing command AsyncCommand [type=XADD, output=StatusOutput [output=null, error='null'], commandType=io.lettuce.core.protocol.Command]
2021-04-11 09:12:11.455 DEBUG 20952 --- [ioEventLoop-4-1] io.lettuce.core.protocol.CommandHandler  : [channel=0xbb31e231, /127.0.0.1:51322 -> localhost/127.0.0.1:6379, chid=0x1] Received: 22 bytes, 1 commands in the stack
2021-04-11 09:12:11.455 DEBUG 20952 --- [ioEventLoop-4-1] io.lettuce.core.protocol.CommandHandler  : [channel=0xbb31e231, /127.0.0.1:51322 -> localhost/127.0.0.1:6379, chid=0x1] Stack contains: 1 commands
2021-04-11 09:12:11.455 DEBUG 20952 --- [ioEventLoop-4-1] i.l.core.protocol.RedisStateMachine      : Decode done, empty stack: true
2021-04-11 09:12:11.455 DEBUG 20952 --- [ioEventLoop-4-1] io.lettuce.core.protocol.CommandHandler  : [channel=0xbb31e231, /127.0.0.1:51322 -> localhost/127.0.0.1:6379, chid=0x1] Completing command AsyncCommand [type=XADD, output=StatusOutput [output=1618125131455-2, error='null'], commandType=io.lettuce.core.protocol.Command]
2021-04-11 09:12:11.455 DEBUG 20952 --- [input/provider1] o.s.d.redis.core.RedisConnectionUtils    : Closing Redis Connection.

这也会影响“消费者应用程序”的 Redis 连接 - Redis 连接定期打开和关闭。

2021-04-11 09:11:52.426 DEBUG 21636 --- [ioEventLoop-4-7] o.s.d.r.stream.DefaultStreamReceiver     : [stream: loc] onStreamMessage(MapBackedRecord{recordId=1618125112425-0, kvMap={payload={"obuSn":"d066bdd7-21b5-46f0-81ca-09afe4fd6596","lon":0.0,"lat":1.0,"data":"68541 - provider1"}}}): Emitting item, slow-path
2021-04-11 09:11:52.426 DEBUG 21636 --- [ioEventLoop-4-7] i.l.core.protocol.RedisStateMachine      : Decode done, empty stack: true
2021-04-11 09:11:52.426 DEBUG 21636 --- [ioEventLoop-4-7] io.lettuce.core.protocol.CommandHandler  : [channel=0x896e94aa, /127.0.0.1:65535 -> localhost/127.0.0.1:6379, chid=0x3bdf] Completing command SubscriptionCommand [type=XREAD, output=StreamReadOutput [output=[], error='null'], commandType=io.lettuce.core.protocol.Command]
2021-04-11 09:11:52.426 DEBUG 21636 --- [ioEventLoop-4-7] io.lettuce.core.RedisChannelHandler      : closeAsync()
2021-04-11 09:11:52.426 DEBUG 21636 --- [ioEventLoop-4-7] i.lettuce.core.protocol.DefaultEndpoint  : [channel=0x896e94aa, /127.0.0.1:65535 -> localhost/127.0.0.1:6379, epid=0x3bdf] closeAsync()
2021-04-11 09:11:52.426 DEBUG 21636 --- [ioEventLoop-4-7] o.s.d.r.stream.DefaultStreamReceiver     : [stream: loc] onComplete()
2021-04-11 09:11:52.426 DEBUG 21636 --- [ioEventLoop-4-7] o.s.d.r.stream.DefaultStreamReceiver     : [stream: loc] scheduleIfRequired()
2021-04-11 09:11:52.426 DEBUG 21636 --- [ioEventLoop-4-7] o.s.d.r.stream.DefaultStreamReceiver     : [stream: loc] scheduleIfRequired(): Activating subscription
2021-04-11 09:11:52.426 DEBUG 21636 --- [ioEventLoop-4-7] o.s.d.r.stream.DefaultStreamReceiver     : [stream: loc] scheduleIfRequired(): Activating subscription, offset ReadOffset(offset=1618125112425-0)
2021-04-11 09:11:52.426 DEBUG 21636 --- [ioEventLoop-4-7] io.lettuce.core.RedisClient              : Trying to get a Redis connection for: redis://localhost
2021-04-11 09:11:52.426 DEBUG 21636 --- [ioEventLoop-4-7] io.lettuce.core.RedisClient              : Resolved SocketAddress localhost:6379 using redis://localhost
2021-04-11 09:11:52.426 DEBUG 21636 --- [ioEventLoop-4-7] io.lettuce.core.AbstractRedisClient      : Connecting to Redis at localhost:6379
2021-04-11 09:11:52.427 DEBUG 21636 --- [ioEventLoop-4-7] io.lettuce.core.protocol.CommandHandler  : [channel=0x896e94aa, /127.0.0.1:65535 -> localhost/127.0.0.1:6379, chid=0x3bdf] channelInactive()
2021-04-11 09:11:52.427 DEBUG 21636 --- [ioEventLoop-4-8] io.lettuce.core.protocol.CommandHandler  : [channel=0x87e269ec, [id: 0x6e14c57c] (inactive), chid=0x3be0] channelRegistered()
2021-04-11 09:11:52.427 DEBUG 21636 --- [ioEventLoop-4-7] i.lettuce.core.protocol.DefaultEndpoint  : [channel=0x896e94aa, /127.0.0.1:65535 -> localhost/127.0.0.1:6379, epid=0x3bdf] deactivating endpoint handler
2021-04-11 09:11:52.427 DEBUG 21636 --- [ioEventLoop-4-7] io.lettuce.core.protocol.CommandHandler  : [channel=0x896e94aa, /127.0.0.1:65535 -> localhost/127.0.0.1:6379, chid=0x3bdf] channelInactive() done
2021-04-11 09:11:52.427 DEBUG 21636 --- [ioEventLoop-4-7] i.l.core.protocol.ConnectionWatchdog     : [channel=0x896e94aa, /127.0.0.1:65535 -> localhost/127.0.0.1:6379, last known addr=localhost/127.0.0.1:6379] channelInactive()
2021-04-11 09:11:52.427 DEBUG 21636 --- [ioEventLoop-4-7] i.l.core.protocol.ConnectionWatchdog     : [channel=0x896e94aa, /127.0.0.1:65535 -> localhost/127.0.0.1:6379, last known addr=localhost/127.0.0.1:6379] Reconnect scheduling disabled
2021-04-11 09:11:52.427 DEBUG 21636 --- [ioEventLoop-4-7] io.lettuce.core.protocol.CommandHandler  : [channel=0x896e94aa, /127.0.0.1:65535 -> localhost/127.0.0.1:6379, chid=0x3bdf] channelUnregistered()
2021-04-11 09:11:52.432 DEBUG 21636 --- [ioEventLoop-4-8] io.lettuce.core.AbstractRedisClient      : Connecting to Redis at localhost:6379: localhost:6379

io.netty.channel.AbstractChannel$AnnotatedSocketException: Address already in use: no further information: localhost/127.0.0.1:6379
Caused by: java.net.BindException: Address already in use: no further information
    at java.base/sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) ~[na:na]

正如我提到的,它运行良好一段时间,但最终以 地址已在使用中:没有更多信息

结束

我错过了什么吗?是Redis安装/配置问题吗?还是生菜客户的问题?或者?

java spring-boot redis spring-data-redis lettuce
2个回答
0
投票

您的 Redis 服务器可能由于内存不足而崩溃。为您的服务器分配更多的内存,或者您可以对Redis流执行stream maxlen操作,以便Redis服务器具有内存大小的一致性。


0
投票

在我的例子中,启用底层 Lettuce Redis 客户端的连接池后,该错误不再发生。

在应用程序属性中:

# Explicitly select our preferred driver instead of relying on automatic selection.
spring.data.redis.client-type=lettuce
# Spring seems to auto-detect commons-pool2 in the classpath and activate pooling,
# but we prefer to clearly state we are using pooling here.
spring.data.redis.lettuce.pool.enabled=true

并添加连接池所需的对

commons-pool2
的依赖:

implementation("org.apache.commons:commons-pool2")
© www.soinside.com 2019 - 2024. All rights reserved.