我对 Spring Webflux 和 Azure 都是新手,我不明白这里发生了什么。我正在尝试检查数据湖容器是否已存在。我不明白为什么这会被阻止。我还尝试了本文档中的许多内容(https://learn.microsoft.com/en-us/azure/developer/java/sdk/pagination),得到了相同的结果。
public boolean checkIfFileSystemExists(String fileSystem) {
return azureDataLakeClient.getDataLakeServiceClient().listFileSystems().stream().anyMatch(c -> fileSystem.equals(c.getName()));
}
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-10
Caused by: java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-10
at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:86) ~[reactor-core-3.6.3.jar:3.6.3]
at reactor.core.publisher.Flux.blockLast(Flux.java:2815) ~[reactor-core-3.6.3.jar:3.6.3]
at com.azure.core.util.paging.ContinuablePagedByIteratorBase.requestPage(ContinuablePagedByIteratorBase.java:102) ~[azure-core-1.49.0.jar:1.49.0]
at com.azure.core.util.paging.ContinuablePagedByItemIterable$ContinuablePagedByItemIterator.<init>(ContinuablePagedByItemIterable.java:75) ~[azure-core-1.49.0.jar:1.49.0]
at com.azure.core.util.paging.ContinuablePagedByItemIterable.iterator(ContinuablePagedByItemIterable.java:55) ~[azure-core-1.49.0.jar:1.49.0]
at java.base/java.lang.Iterable.spliterator(Iterable.java:101) ~[na:na]
at com.azure.core.util.paging.ContinuablePagedIterable.stream(ContinuablePagedIterable.java:85) ~[azure-core-1.49.0.jar:1.49.0]
at com.parsons.datamart.common.datalake.azure.writer.AzureDataLakeWriter.checkIfFileSystemExists(AzureDataLakeWriter.java:33) ~[main/:na]
at com.parsons.datamart.common.datalake.azure.writer.AzureDataLakeWriter.write(AzureDataLakeWriter.java:44) ~[main/:na]
at com.parsons.datamart.ingest.web.WebClientIngestor.notifyIngest(WebClientIngestor.java:74) ~[main/:na]
at com.parsons.datamart.ingest.web.WebClientIngestor.lambda$ingest$1(WebClientIngestor.java:37) ~[main/:na]
at reactor.core.publisher.LambdaMonoSubscriber.onNext(LambdaMonoSubscriber.java:171) ~[reactor-core-3.6.3.jar:3.6.3]
at reactor.core.publisher.MonoFlatMap$FlatMapMain.secondComplete(MonoFlatMap.java:245) ~[reactor-core-3.6.3.jar:3.6.3]
at reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:305) ~[reactor-core-3.6.3.jar:3.6.3]
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79) ~[reactor-core-3.6.3.jar:3.6.3]
at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:539) ~[reactor-core-3.6.3.jar:3.6.3]
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129) ~[reactor-core-3.6.3.jar:3.6.3]
at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107) ~[reactor-core-3.6.3.jar:3.6.3]
at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onNext(FluxMapFuseable.java:299) ~[reactor-core-3.6.3.jar:3.6.3]
at reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onNext(FluxFilterFuseable.java:337) ~[reactor-core-3.6.3.jar:3.6.3]
at reactor.core.publisher.Operators$BaseFluxToMonoOperator.completePossiblyEmpty(Operators.java:2097) ~[reactor-core-3.6.3.jar:3.6.3]
at reactor.core.publisher.MonoCollect$CollectSubscriber.onComplete(MonoCollect.java:145) ~[reactor-core-3.6.3.jar:3.6.3]
at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:144) ~[reactor-core-3.6.3.jar:3.6.3]
at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:260) ~[reactor-core-3.6.3.jar:3.6.3]
at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:144) ~[reactor-core-3.6.3.jar:3.6.3]
at reactor.netty.channel.FluxReceive.onInboundComplete(FluxReceive.java:415) ~[reactor-netty-core-1.1.16.jar:1.1.16]
at reactor.netty.channel.ChannelOperations.onInboundComplete(ChannelOperations.java:446) ~[reactor-netty-core-1.1.16.jar:1.1.16]
at reactor.netty.channel.ChannelOperations.terminate(ChannelOperations.java:500) ~[reactor-netty-core-1.1.16.jar:1.1.16]
at reactor.netty.http.client.HttpClientOperations.onInboundNext(HttpClientOperations.java:782) ~[reactor-netty-http-1.1.16.jar:1.1.16]
at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:114) ~[reactor-netty-core-1.1.16.jar:1.1.16]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) ~[netty-codec-4.1.107.Final.jar:4.1.107.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346) ~[netty-codec-4.1.107.Final.jar:4.1.107.Final]
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318) ~[netty-codec-4.1.107.Final.jar:4.1.107.Final]
at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1475) ~[netty-handler-4.1.107.Final.jar:4.1.107.Final]
at io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1338) ~[netty-handler-4.1.107.Final.jar:4.1.107.Final]
at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1387) ~[netty-handler-4.1.107.Final.jar:4.1.107.Final]
at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:529) ~[netty-codec-4.1.107.Final.jar:4.1.107.Final]
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:468) ~[netty-codec-4.1.107.Final.jar:4.1.107.Final]
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:290) ~[netty-codec-4.1.107.Final.jar:4.1.107.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) ~[netty-common-4.1.107.Final.jar:4.1.107.Final]
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.107.Final.jar:4.1.107.Final]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.107.Final.jar:4.1.107.Final]
at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]
PagedIterable 总是阻塞的 reactor.core.Exceptions$ErrorCallbackNotImplemented:java.lang.IllegalStateException:block()/blockFirst()/blockLast() 正在阻塞,线程reactor-http-nio-10 不支持引起原因:java.lang.IllegalStateException: block()/blockFirst()/blockLast() 是阻塞的,线程reactor-http-nio-10不支持
上面的错误表明,正在使用
block()
方法,这是当前线程不支持的阻塞操作。
我同意 Igor Artamonov 的评论,要解决
Spring WebFlux
等非阻塞环境中的阻塞调用问题,请使用 Azure SDK 的异步、非阻塞方法。
您似乎正在使用 Azure Data Lake Storage 客户端的同步版本。为了避免阻塞,可以切换到异步版本的客户端。
根据您的代码,表明您正在使用同步
Azure Data Lake Storage client
。为了避免阻塞,请使用异步版本的客户端。
这是
Azure Data Lake Storage client
的最新依赖。
Pom.XML
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-storage-file-datalake</artifactId>
<version>12.19.0</version>
</dependency>
您可以使用下面的异步代码来获取以
mono
类型表示结果的文件系统列表。
public Mono<Boolean> checkIfFileSystemExists(String fileSystem) {
return dataLakeServiceAsyncClient.listFileSystems()
.filter(c -> fileSystem.equals(c.getName()))
.hasElements();
}
如果过滤后的 Flux 中有元素,Mono 返回
true
,如果没有,则返回 false
。