MYSQL R2DBC 的 Spring Data 多主机设置

问题描述 投票:0回答:3

我正在尝试从 Spring R2DBC 访问只读副本数据库。我的连接字符串如下所示

spring:
  r2dbc:
    url: r2dbc:mysql://db-master-dev-pvt.xyz***.com:3306,db-replica-dev-pvt.xyz**.com:3306/employee?autoReconnect=true&useUnicode=yes&characterEncoding=UTF-8
    username: 
    password: 

但是我得到了一个未知的主机。我正在关注以下文档 https://r2dbc.io/spec/0.8.2.RELEASE/spec/html/#overview.connection.discovery 根据文档,我们可以使用逗号(,)分隔多个主机配置,但是当我尝试进行查询或进行任何运行状况检查时,它会抛出未知主机异常。相同的配置与 Spring Data JPA 配合良好。

 {
                "database": "MySQL",
                "validationQuery": "validate(REMOTE)",
                "error": "java.net.UnknownHostException: failed to resolve 'db-master-dev-pvt.xyz**.com:3306,db-replica-dev-pvt.xyz**.com:3306'"
            }  

堆栈跟踪

{"@timestamp":"2021-02-12T11:34:18.438Z","@version":"1","message":"Operator called default onErrorDropped","logger_name":"reactor.core.publisher.Operators","thread_name":"reactor-tcp-epoll-1","level":"ERROR","level_value":40000,"stack_trace":"java.net.UnknownHostException: failed to resolve 'myDB-master-dev-pvt.xyz**.com:3306,myDB-replica-dev-pvt.myAPI.com:3306' after 2 queries \n\tat io.netty.resolver.dns.DnsResolveContext.finishResolve(DnsResolveContext.java:1013)\n\t... 35 common frames omitted\nWrapped by: org.springframework.transaction.CannotCreateTransactionException: Could not open R2DBC Connection for transaction; nested exception is java.net.UnknownHostException: failed to resolve 'myDB-master-dev-pvt.myAPI.com:3306,MyDB-replica-dev-pvt.myAPI.com:3306' after 2 queries \n\tat org.springframework.r2dbc.connection.R2dbcTransactionManager.lambda$null$5(R2dbcTransactionManager.java:226)\n\tat reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onError(FluxOnErrorResume.java:94)\n\tat org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onError(ScopePassingSpanSubscriber.java:95)\n\tat reactor.core.publisher.MonoFlatMap$FlatMapMain.onError(MonoFlatMap.java:172)\n\tat org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onError(ScopePassingSpanSubscriber.java:95)\n\tat reactor.core.publisher.FluxPeek$PeekSubscriber.onError(FluxPeek.java:221)\n\tat org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onError(ScopePassingSpanSubscriber.java:95)\n\tat reactor.core.publisher.FluxRetry$RetrySubscriber.onError(FluxRetry.java:94)\n\tat org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onError(ScopePassingSpanSubscriber.java:95)\n\tat org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onError(ScopePassingSpanSubscriber.java:95)\n\tat reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onError(FluxPeekFuseable.java:234)\n\tat org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onError(ScopePassingSpanSubscriber.java:95)\n\tat reactor.core.publisher.MonoFlatMap$FlatMapMain.onError(MonoFlatMap.java:172)\n\tat org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onError(ScopePassingSpanSubscriber.java:95)\n\tat reactor.core.publisher.FluxPeek$PeekSubscriber.onError(FluxPeek.java:221)\n\tat org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onError(ScopePassingSpanSubscriber.java:95)\n\tat reactor.core.publisher.FluxPeek$PeekSubscriber.onError(FluxPeek.java:221)\n\tat reactor.pool.AbstractPool$Borrower.fail(AbstractPool.java:427)\n\tat reactor.pool.SimpleDequePool.lambda$drainLoop$5(SimpleDequePool.java:309)\n\tat reactor.core.publisher.FluxDoOnEach$DoOnEachSubscriber.onError(FluxDoOnEach.java:186)\n\tat org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onError(ScopePassingSpanSubscriber.java:95)\n\tat org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onError(ScopePassingSpanSubscriber.java:95)\n\tat reactor.core.publisher.MonoFlatMap$FlatMapMain.onError(MonoFlatMap.java:172)\n\tat org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onError(ScopePassingSpanSubscriber.java:95)\n\tat reactor.core.publisher.MonoFlatMap$FlatMapMain.onError(MonoFlatMap.java:172)\n\tat org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onError(ScopePassingSpanSubscriber.java:95)\n\tat reactor.core.publisher.FluxMap$MapSubscriber.onError(FluxMap.java:132)\n\tat org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onError(ScopePassingSpanSubscriber.java:95)\n\tat reactor.core.publisher.MonoCreate$DefaultMonoSink.error(MonoCreate.java:189)\n\tat reactor.netty.resources.NewConnectionProvider$DisposableConnect.onError(NewConnectionProvider.java:139)\n\tat org.springframework.cloud.sleuth.instrument.reactor.ScopePassingSpanSubscriber.onError(ScopePassingSpanSubscriber.java:95)\n\tat reactor.core.publisher.MonoFlatMap$FlatMapMain.secondError(MonoFlatMap.java:192)\n\tat reactor.core.publisher.MonoFlatMap$FlatMapInner.onError(MonoFlatMap.java:259)\n\tat reactor.netty.transport.TransportConnector$MonoChannelPromise.tryFailure(TransportConnector.java:464)\n\tat reactor.netty.transport.TransportConnector.lambda$doResolveAndConnect$6(TransportConnector.java:271)\n\tat io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:578)\n\tat io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:552)\n\tat io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:491)\n\tat io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:616)\n\tat io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:609)\n\tat io.netty.util.concurrent.DefaultPromise.setFailure(DefaultPromise.java:109)\n\tat io.netty.resolver.InetSocketAddressResolver$1.operationComplete(InetSocketAddressResolver.java:62)\n\tat io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:578)\n\tat io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:571)\n\tat io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:550)\n\tat io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:491)\n\tat io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:616)\n\tat io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:609)\n\tat io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:117)\n\tat io.netty.resolver.dns.DnsNameResolver.tryFailure(DnsNameResolver.java:936)\n\tat io.netty.resolver.dns.DnsNameResolver.access$500(DnsNameResolver.java:90)\n\tat io.netty.resolver.dns.DnsNameResolver$5.operationComplete(DnsNameResolver.java:956)\n\tat io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:578)\n\tat io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:552)\n\tat io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:491)\n\tat io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:616)\n\tat io.netty.util.concurrent.DefaultPromise.setFailure0(DefaultPromise.java:609)\n\tat io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:117)\n\tat io.netty.resolver.dns.DnsResolveContext.finishResolve(DnsResolveContext.java:1021)\n\tat io.netty.resolver.dns.DnsResolveContext.tryToFinishResolve(DnsResolveContext.java:966)\n\tat io.netty.resolver.dns.DnsResolveContext.query(DnsResolveContext.java:414)\n\tat io.netty.resolver.dns.DnsResolveContext.tryToFinishResolve(DnsResolveContext.java:938)\n\tat io.netty.resolver.dns.DnsResolveContext.access$700(DnsResolveContext.java:63)\n\tat io.netty.resolver.dns.DnsResolveContext$2.operationComplete(DnsResolveContext.java:467)\n\tat io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:578)\n\tat io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:571)\n\tat io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:550)\n\tat io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:491)\n\tat io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:616)\n\tat io.netty.util.concurrent.DefaultPromise.setSuccess0(DefaultPromise.java:605)\n\tat io.netty.util.concurrent.DefaultPromise.trySuccess(DefaultPromise.java:104)\n\tat io.netty.resolver.dns.DnsQueryContext.trySuccess(DnsQueryContext.java:201)\n\tat io.netty.resolver.dns.DnsQueryContext.finish(DnsQueryContext.java:193)\n\tat io.netty.resolver.dns.DnsNameResolver$DnsResponseHandler.channelRead(DnsNameResolver.java:1230)\n\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)\n\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)\n\tat io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)\n\tat io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)\n\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)\n\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)\n\tat io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)\n\tat io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)\n\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)\n\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)\n\tat io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)\n\tat io.netty.channel.epoll.EpollDatagramChannel.read(EpollDatagramChannel.java:681)\n\tat io.netty.channel.epoll.EpollDatagramChannel.access$100(EpollDatagramChannel.java:58)\n\tat io.netty.channel.epoll.EpollDatagramChannel$EpollDatagramChannelUnsafe.epollInReady(EpollDatagramChannel.java:499)\n\tat io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:480)\n\tat io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:378)\n\tat io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)\n\tat io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)\n\tat io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)\n\tat java.lang.Thread.run(Thread.java:748)\nWrapped by: reactor.core.Exceptions$ErrorCallbackNotImplemented: org.springframework.transaction.CannotCreateTransactionException: Could not open R2DBC Connection for transaction; nested exception is java.net.UnknownHostException: failed to resolve 'myDB-master-dev-pvt.xyz**.com:3306,myDB-replica-dev-pvt.xyz**.com:3306' after 2 queries \n","caller_class_name":"reactor.util.Loggers$Slf4JLogger","caller_method_name":"error","caller_file_name":"Loggers.java","caller_line_number":314,"traceId":"","instance_activeProfiles":"dev","instance_port":"8080","instance_ip":"instance_ip_IS_UNDEFINED","instance_application_name":"employee-adjustment-service"}

谢谢

spring-boot spring-webflux project-reactor spring-data-r2dbc r2dbc
3个回答
0
投票

我已经完成了分析,我在这里提供,以便将来如果有人想要检查 mysql,他们必须在计划 R2DBC 之前看到这一点。我去了 R2DBC 团队,之后,我发现我正在使用的驱动程序没有多主机。他们官方支持 postgres、MSSQL 和 H2,

我正在使用下面的驱动程序

<!-- https://mvnrepository.com/artifact/dev.miku/r2dbc-mysql -->
<dependency>
    <groupId>dev.miku</groupId>
    <artifactId>r2dbc-mysql</artifactId>
    <version>0.8.2.RELEASE</version>
</dependency>

所以这需要在这个驱动程序中解决,我必须在那里提出一个问题 https://github.com/mirromutth/r2dbc-mysql/issues/169以便修复它。 感谢来自 Spring R2DBC 的 Mark @mp911de 的澄清和支持。


0
投票

我能够调用多个主机(逗号分隔的主机)

r2dbc.oracle.config.host1=a.b.com
r2dbc.oracle.config.host2=x.y.com

我的配置类如下

@Value("${r2dbc.oracle.config.host1}")
    private String host1;

    @Value("${r2dbc.oracle.config.host2}")
    private String host2;

@Bean()
    @Qualifier("remoteabcConnectionFactory")
    public ConnectionFactory remoteRemedyConnectionFactory() {
        String hosts = host1 + "," + host2;
        return ConnectionFactories.get(ConnectionFactoryOptions.builder()
                .option(DRIVER, driver)
                .option(HOST, host)
                .option(HOST, hosts)
                .option(PORT, Integer.valueOf(port))
                .option(USER, user)
                .option(PASSWORD, password)
                .option(DATABASE, database)
                .build());
    }

您还可以从一个属性参数中获取逗号分隔的主机 应用程序属性


0
投票

您可以使用此配置来后处理

spring.r2dbc
属性中设置的值。然而,这将使所有列出的主机只使用一个

@Configuration
class R2dbcConfiguration(
    private val r2dbcProperties: R2dbcProperties
) {
    private val log: Logger = LoggerFactory.getLogger(this::class.java)

    @Bean
    fun postProcessMultipleHostsConfiguration(): ConnectionFactoryOptionsBuilderCustomizer {
        return ConnectionFactoryOptionsBuilderCustomizer { builder: ConnectionFactoryOptions.Builder ->
            val hostOption: Any = ConnectionFactoryOptions
                .parse(r2dbcProperties.url)
                .getRequiredValue(Option.valueOf("host"))
            val hosts: List<String> = Optional.of(hostOption) <---will contain all hosts
                .filter { x -> x is String }
                .map { obj -> obj as String }
                .map { host -> host.split(",") }
                .orElseGet { listOf() }
            if (hosts.size > 1) {
                log.info("Failover configuration for r2dbc data source doesn't supported, will use first of provided hosts")
                val hostAndPort = hosts[0].split(":").toTypedArray()
                if (hostAndPort.isNotEmpty()) {
                    builder.option(ConnectionFactoryOptions.HOST, hostAndPort[0]) <--- keep only one host
                }
                if (hostAndPort.size > 1) {
                    builder.option(ConnectionFactoryOptions.PORT, hostAndPort[1].toInt())
                }
            }
        }
    }

}
最新问题
© www.soinside.com 2019 - 2024. All rights reserved.