使用 Redisson Redis 客户端实现 Flink Async I/O 导致错误“已达到 DatagramSockets 的最大数量”

问题描述 投票:0回答:1

我将 Flink 应用程序部署到 AWS Managed Flink,在尝试使用 Redisson(Redis 客户端)实现异步 I/O 以连接到 AWS MemoryDB(Redis 集群)时,出现错误(见下文)。我不清楚 Redisson 如何创建新套接字以及 AWS Managed Flink 如何配置最大套接字数量。有人可以阐明如何解决这个问题吗?谢谢。

java.lang.IllegalStateException: failed to create a new resolver
at io.netty.resolver.AddressResolverGroup.getResolver(AddressResolverGroup.java:72)
at org.redisson.client.RedisClient.resolveAddr(RedisClient.java:203)
at org.redisson.client.RedisClient.connectAsync(RedisClient.java:220)
at org.redisson.connection.MasterSlaveConnectionManager.connectToNode(MasterSlaveConnectionManager.java:125)
at org.redisson.connection.MasterSlaveConnectionManager.connectToNode(MasterSlaveConnectionManager.java:111)
at org.redisson.cluster.ClusterConnectionManager.doConnect(ClusterConnectionManager.java:91)
at org.redisson.connection.MasterSlaveConnectionManager.connect(MasterSlaveConnectionManager.java:192)
at org.redisson.config.ConfigSupport.createConnectionManager(ConfigSupport.java:220)
at org.redisson.Redisson.<init>(Redisson.java:69)
at org.redisson.Redisson.create(Redisson.java:114)
at com.amazon.otf.etl.MyAsyncMap.open(MyAsyncMap.java:40)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:100)
at org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.open(AsyncWaitOperator.java:163)
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:700)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:676)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:643)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:922)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:568)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: io.netty.channel.ChannelException: Unable to create Channel from class class io.netty.channel.socket.nio.NioDatagramChannel
at io.netty.channel.ReflectiveChannelFactory.newChannel(ReflectiveChannelFactory.java:46)
at io.netty.bootstrap.AbstractBootstrap.initAndRegister(AbstractBootstrap.java:310)
at io.netty.bootstrap.AbstractBootstrap.register(AbstractBootstrap.java:227)
at io.netty.resolver.dns.DnsNameResolver.<init>(DnsNameResolver.java:517)
at io.netty.resolver.dns.DnsNameResolverBuilder.build(DnsNameResolverBuilder.java:527)
at io.netty.resolver.dns.DnsAddressResolverGroup.newNameResolver(DnsAddressResolverGroup.java:114)
at io.netty.resolver.dns.DnsAddressResolverGroup.newResolver(DnsAddressResolverGroup.java:92)
at io.netty.resolver.dns.DnsAddressResolverGroup.newResolver(DnsAddressResolverGroup.java:77)
at io.netty.resolver.AddressResolverGroup.getResolver(AddressResolverGroup.java:70)
... 23 more
Caused by: java.lang.reflect.InvocationTargetException
at jdk.internal.reflect.GeneratedConstructorAccessor247.newInstance(Unknown Source)
at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
at io.netty.channel.ReflectiveChannelFactory.newChannel(ReflectiveChannelFactory.java:44)
... 31 more
Caused by: io.netty.channel.ChannelException: Failed to open a socket.
at io.netty.channel.socket.nio.NioDatagramChannel.newSocket(NioDatagramChannel.java:91)
at io.netty.channel.socket.nio.NioDatagramChannel.<init>(NioDatagramChannel.java:120)
... 35 more
Caused by: java.net.SocketException: maximum number of DatagramSockets reached
at java.base/sun.net.ResourceManager.beforeUdpCreate(ResourceManager.java:72)
at java.base/sun.nio.ch.DatagramChannelImpl.<init>(DatagramChannelImpl.java:132)
at java.base/sun.nio.ch.SelectorProviderImpl.openDatagramChannel(SelectorProviderImpl.java:42)
at io.netty.channel.socket.nio.NioDatagramChannel.newSocket(NioDatagramChannel.java:89)
... 36 more
java redis apache-flink redisson amazon-kinesis-analytics
1个回答
0
投票

您成功解决这个问题了吗?

© www.soinside.com 2019 - 2024. All rights reserved.