我正在尝试使用 Cassandra CqlSession 类连接到我的两个远程联系点。我正在使用 spring boot 3.3.3 和 spring-data-cassandra 4.3.3。以下是使用的 datastax 驱动程序依赖项:
<dependency>
<groupId>org.apache.cassandra</groupId>
<artifactId>java-driver-core</artifactId>
<version>4.18.1</version>
</dependency>
<dependency>
<groupId>org.apache.cassandra</groupId>
<artifactId>java-driver-query-builder</artifactId>
<version>4.18.1</version>
</dependency>
<dependency>
<groupId>org.apache.cassandra</groupId>
<artifactId>java-driver-mapper-runtime</artifactId>
<version>4.18.1</version>
</dependency>
<dependency>
<groupId>org.apache.cassandra</groupId>
<artifactId>java-driver-mapper-processor</artifactId>
<version>4.18.1</version>
</dependency>
下面是我使用 SSL 证书连接到 Cassandra 集群的 Bean 配置:
@Bean("cluster")
@Primary
public CqlSession sslSession() throws Exception {
// Load the certificate and private key into a KeyStore
KeyStore keyStore = KeyStore.getInstance("PKCS12");
keyStore.load(null, null);
SSLContext sslContext;
try {
Path tmCertPath = Paths.get(cassandraClient.getAppCertPath());
Path tmPrivateKeyPath = Paths.get(cassandraClient.getAppKeyPath());
X509Certificate certificate = kmiDefinitions.loadCertificate(tmCertPath);
PrivateKey privateKey = kmiDefinitions.loadPrivateKey(tmPrivateKeyPath);
KeyStore.PrivateKeyEntry keyEntry = new KeyStore.PrivateKeyEntry(privateKey, new Certificate[]{certificate});
keyStore.setEntry("test_management_secrets", keyEntry, new KeyStore.PasswordProtection("".toCharArray()));
KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
keyManagerFactory.init(keyStore, "".toCharArray());
sslContext = SSLContext.getInstance("TLS");
sslContext.init(keyManagerFactory.getKeyManagers(), null, null);
} catch (GeneralSecurityException | IOException e) {
logger.error("Could not create mutual SSL Context!", e);
throw e;
}
return CqlSession.builder()
.addContactPoint(new InetSocketAddress(cassandraClient.getPrimaryHost(), 9042))
.addContactPoint(new InetSocketAddress(cassandraClient.getFallbackHost(), 9042))
.withLocalDatacenter("SEA")
.withSslContext(sslContext)
.withKeyspace(cassandraClient.getKeySpace())
.addRequestTracker(new MultiplexingRequestTracker())
.build();
}
下面是我用于 cassandra 主机名和 SSL 机密的自定义 YAML 配置:
application:
cassandra-client:
app-cert-path: some_relative_path.certificate
app-key-path: some_relative_path.private_key
key-space: akatest_sqa2
primary-host: api-beta.caas.dbattery.com
fallback-host: fallback-beta.caas.dbattery.com
下面是我当前遇到的错误,这不是我的本地环境,我正在使用 staging env。
Caused by: com.datastax.oss.driver.api.core.AllNodesFailedException: Could not reach any contact point, make sure you've provided valid addresses (showing first 1 nodes, use getAllErrors() for more): Node(endPoint=localhost/<unresolved>:9042, hostId=null, hashCode=18682432): [com.datastax.oss.driver.api.core.connection.ConnectionInitException: [s1|control|connecting...] Protocol initialization request, step 1 (OPTIONS): failed to send request (io.netty.channel.StacklessClosedChannelException)]
at com.datastax.oss.driver.api.core.AllNodesFailedException.copy(AllNodesFailedException.java:143)
at com.datastax.oss.driver.internal.core.util.concurrent.CompletableFutures.getUninterruptibly(CompletableFutures.java:151)
at com.datastax.oss.driver.api.core.session.SessionBuilder.build(SessionBuilder.java:837)
at org.springframework.data.cassandra.config.CqlSessionFactoryBean.buildSystemSession(CqlSessionFactoryBean.java:535)
at org.springframework.data.cassandra.config.CqlSessionFactoryBean.afterPropertiesSet(CqlSessionFactoryBean.java:469)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeInitMethods(AbstractAutowireCapableBeanFactory.java:1853)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1802)
... 24 more
Suppressed: com.datastax.oss.driver.api.core.connection.ConnectionInitException: [s1|control|connecting...] Protocol initialization request, step 1 (OPTIONS): failed to send request (io.netty.channel.StacklessClosedChannelException)
at com.datastax.oss.driver.internal.core.channel.ProtocolInitHandler$InitRequest.fail(ProtocolInitHandler.java:358)
at com.datastax.oss.driver.internal.core.channel.ChannelHandlerRequest.writeListener(ChannelHandlerRequest.java:89)
at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:590)
at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:557)
at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:492)
at io.netty.util.concurrent.DefaultPromise.addListener(DefaultPromise.java:185)
at io.netty.channel.DefaultChannelPromise.addListener(DefaultChannelPromise.java:95)
at io.netty.channel.DefaultChannelPromise.addListener(DefaultChannelPromise.java:30)
at com.datastax.oss.driver.internal.core.channel.ChannelHandlerRequest.send(ChannelHandlerRequest.java:78)
at com.datastax.oss.driver.internal.core.channel.ProtocolInitHandler$InitRequest.send(ProtocolInitHandler.java:195)
at com.datastax.oss.driver.internal.core.channel.ProtocolInitHandler.onRealConnect(ProtocolInitHandler.java:126)
at com.datastax.oss.driver.internal.core.channel.ConnectInitHandler.lambda$connect$0(ConnectInitHandler.java:59)
at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:590)
at io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:583)
at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:559)
at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:492)
at io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:636)
at io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:629)
at io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:118)
at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:326)
at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:342)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:776)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:840)
Suppressed: io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: localhost/127.0.0.1:9042
Caused by: java.net.ConnectException: Connection refused
at java.base/sun.nio.ch.Net.pollConnect(Native Method)
at java.base/sun.nio.ch.Net.pollConnectNow(Net.java:672)
at java.base/sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:946)
at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:337)
at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:339)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:776)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: io.netty.channel.StacklessClosedChannelException
at io.netty.channel.AbstractChannel$AbstractUnsafe.flush0()(Unknown Source)
最初当我没有明确定义本地数据中心时,我遇到了以下错误:
Caused by: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'cluster' defined in class path resource [com/akamai/commons/configs/CassandraClientConfig.class]: Failed to instantiate [com.datastax.oss.driver.api.core.CqlSession]: Factory method 'sslSession' threw exception with message: Since you provided explicit contact points, the local DC must be explicitly set (see basic.load-balancing-policy.local-datacenter in the config, or set it programmatically with SessionBuilder.withLocalDatacenter). Current contact points are: Node(endPoint=api-beta.caas.dbattery.com/23.99.129.142:9042, hostId=ddcf16d5-3ca9-46ad-bce1-935c675245a0, hashCode=2c701135)=SEA, Node(endPoint=fallback-beta.caas.dbattery.com/23.99.129.141:9042, hostId=cbe38c1d-35ab-4f1c-9f4f-002e3ae1dd67, hashCode=1db3dd9b)=SEA. Current DCs in this cluster are: EWR, SEA
at org.springframework.beans.factory.support.ConstructorResolver.instantiate(ConstructorResolver.java:648)
at org.springframework.beans.factory.support.ConstructorResolver.instantiateUsingFactoryMethod(ConstructorResolver.java:485)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.instantiateUsingFactoryMethod(AbstractAutowireCapableBeanFactory.java:1355)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBeanInstance(AbstractAutowireCapableBeanFactory.java:1185)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:562)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:522)
at org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$0(AbstractBeanFactory.java:337)
at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:234)
at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:335)
at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:200)
at org.springframework.beans.factory.config.DependencyDescriptor.resolveCandidate(DependencyDescriptor.java:254)
at org.springframework.beans.factory.support.DefaultListableBeanFactory.doResolveDependency(DefaultListableBeanFactory.java:1443)
at org.springframework.beans.factory.support.DefaultListableBeanFactory.resolveDependency(DefaultListableBeanFactory.java:1353)
at org.springframework.beans.factory.support.ConstructorResolver.resolveAutowiredArgument(ConstructorResolver.java:904)
at org.springframework.beans.factory.support.ConstructorResolver.createArgumentArray(ConstructorResolver.java:782)
... 79 more
Caused by: org.springframework.beans.BeanInstantiationException: Failed to instantiate [com.datastax.oss.driver.api.core.CqlSession]: Factory method 'sslSession' threw exception with message: Since you provided explicit contact points, the local DC must be explicitly set (see basic.load-balancing-policy.local-datacenter in the config, or set it programmatically with SessionBuilder.withLocalDatacenter). Current contact points are: Node(endPoint=api-beta.caas.dbattery.akamai.com/23.54.119.142:9042, hostId=ddcf16d5-3ca9-46ad-bce1-935c675245a0, hashCode=2c701135)=SEA, Node(endPoint=fallback-beta.caas.dbattery.akamai.com/23.54.119.141:9042, hostId=cbe38c1d-35ab-4f1c-9f4f-002e3ae1dd67, hashCode=1db3dd9b)=SEA. Current DCs in this cluster are: EWR, SEA
at org.springframework.beans.factory.support.SimpleInstantiationStrategy.instantiate(SimpleInstantiationStrategy.java:178)
at org.springframework.beans.factory.support.ConstructorResolver.instantiate(ConstructorResolver.java:644)
... 93 more
Caused by: java.lang.IllegalStateException: Since you provided explicit contact points, the local DC must be explicitly set (see basic.load-balancing-policy.local-datacenter in the config, or set it programmatically with SessionBuilder.withLocalDatacenter). Current contact points are: Node(endPoint=api-beta.caas.dbattery.akamai.com/23.54.119.142:9042, hostId=ddcf16d5-3ca9-46ad-bce1-935c675245a0, hashCode=2c701135)=SEA, Node(endPoint=fallback-beta.caas.dbattery.akamai.com/23.54.119.141:9042, hostId=cbe38c1d-35ab-4f1c-9f4f-002e3ae1dd67, hashCode=1db3dd9b)=SEA. Current DCs in this cluster are: EWR, SEA
at com.datastax.oss.driver.internal.core.loadbalancing.helper.MandatoryLocalDcHelper.discoverLocalDc(MandatoryLocalDcHelper.java:93)
at com.datastax.oss.driver.internal.core.loadbalancing.DefaultLoadBalancingPolicy.discoverLocalDc(DefaultLoadBalancingPolicy.java:122)
at com.datastax.oss.driver.internal.core.loadbalancing.BasicLoadBalancingPolicy.init(BasicLoadBalancingPolicy.java:170)
at com.datastax.oss.driver.internal.core.metadata.LoadBalancingPolicyWrapper.init(LoadBalancingPolicyWrapper.java:123)
at com.datastax.oss.driver.internal.core.session.DefaultSession$SingleThreaded.initializePools(DefaultSession.java:475)
at com.datastax.oss.driver.internal.core.session.DefaultSession$SingleThreaded.lambda$init$5(DefaultSession.java:393)
at java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1150)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147)
at com.datastax.oss.driver.internal.core.metadata.MetadataManager$SingleThreaded.lambda$startSchemaRequest$1(MetadataManager.java:450)
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
at java.base/java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:614)
at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:653)
at java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:482)
at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:173)
at io.netty.channel.DefaultEventLoop.run(DefaultEventLoop.java:54)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:840)
现在,我不确定到底是什么问题,当我指定错误中指定的确切数据中心时,它会连接到本地主机,而我们没有在临时环境中运行本地 cassandra。我是 Cassandra 的新手,请帮我解决这个问题。另外,如果需要任何其他信息,请告诉我。
我尝试为每个接触点设置数据中心值,如下所示:
CqlSession.builder()
.addContactPoint(new InetSocketAddress(cassandraClient.getPrimaryHost(), 9042))
.withLocalDatacenter("EWR")
.addContactPoint(new InetSocketAddress(cassandraClient.getFallbackHost(), 9042))
.withLocalDatacenter("SEA")
.withSslContext(sslContext)
.withKeyspace(cassandraClient.getKeySpace())
.addRequestTracker(new MultiplexingRequestTracker())
.build();
使用上述配置,我收到以下错误:
Caused by: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'cluster' defined in class path resource [com/akamai/commons/configs/CassandraClientConfig.class]: Failed to instantiate [com.datastax.oss.driver.api.core.CqlSession]: Factory method 'sslSession' threw exception with message: Multiple entries with same key: default=EWR and default=SEA
at org.springframework.beans.factory.support.ConstructorResolver.instantiate(ConstructorResolver.java:648)
at org.springframework.beans.factory.support.ConstructorResolver.instantiateUsingFactoryMethod(ConstructorResolver.java:485)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.instantiateUsingFactoryMethod(AbstractAutowireCapableBeanFactory.java:1355)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBeanInstance(AbstractAutowireCapableBeanFactory.java:1185)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:562)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:522)
at org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$0(AbstractBeanFactory.java:337)
at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:234)
at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:335)
at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:200)
at org.springframework.beans.factory.config.DependencyDescriptor.resolveCandidate(DependencyDescriptor.java:254)
at org.springframework.beans.factory.support.DefaultListableBeanFactory.doResolveDependency(DefaultListableBeanFactory.java:1443)
at org.springframework.beans.factory.support.DefaultListableBeanFactory.resolveDependency(DefaultListableBeanFactory.java:1353)
at org.springframework.beans.factory.support.ConstructorResolver.resolveAutowiredArgument(ConstructorResolver.java:904)
at org.springframework.beans.factory.support.ConstructorResolver.createArgumentArray(ConstructorResolver.java:782)
由于我在暂存中没有任何本地 cassandra,我还尝试使用here指定的替代负载平衡策略,但即使如此,我在尝试连接到本地主机时仍遇到以下异常:
Caused by: com.datastax.oss.driver.api.core.AllNodesFailedException: Could not reach any contact point, make sure you've provided valid addresses (showing first 1 nodes, use getAllErrors() for more): Node(endPoint=localhost/<unresolved>:9042, hostId=null, hashCode=5ad2d127): [com.datastax.oss.driver.api.core.connection.ConnectionInitException: [s1|control|connecting...] Protocol initialization request, step 1 (OPTIONS): failed to send request (io.netty.channel.StacklessClosedChannelException)]
at com.datastax.oss.driver.api.core.AllNodesFailedException.copy(AllNodesFailedException.java:143)
at com.datastax.oss.driver.internal.core.util.concurrent.CompletableFutures.getUninterruptibly(CompletableFutures.java:151)
at com.datastax.oss.driver.api.core.session.SessionBuilder.build(SessionBuilder.java:837)
at org.springframework.data.cassandra.config.CqlSessionFactoryBean.buildSystemSession(CqlSessionFactoryBean.java:535)
at org.springframework.data.cassandra.config.CqlSessionFactoryBean.afterPropertiesSet(CqlSessionFactoryBean.java:469)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeInitMethods(AbstractAutowireCapableBeanFactory.java:1853)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1802)
预先感谢您的帮助。
所以当前的问题似乎是这样的:“具有相同密钥的多个条目:default=EWR 和 default=SEA”
您只能在连接上定义一个“本地”数据中心,目前有两个。尝试选择其中一个:
.withLocalDatacenter("EWR")
...或者...
.withLocalDatacenter("SEA")
...但不是两者兼而有之。