PagedIterable<FileSystemItem>总是阻塞

问题描述 投票:0回答:1

我对 Spring Webflux 和 Azure 都是新手,我不明白这里发生了什么。我正在尝试检查数据湖容器是否已存在。我不明白为什么这会被阻止。我还尝试了本文档中的许多内容(https://learn.microsoft.com/en-us/azure/developer/java/sdk/pagination),得到了相同的结果。

    public boolean checkIfFileSystemExists(String fileSystem) {
        return azureDataLakeClient.getDataLakeServiceClient().listFileSystems().stream().anyMatch(c -> fileSystem.equals(c.getName()));
    }
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-10
Caused by: java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-10
    at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:86) ~[reactor-core-3.6.3.jar:3.6.3]
    at reactor.core.publisher.Flux.blockLast(Flux.java:2815) ~[reactor-core-3.6.3.jar:3.6.3]
    at com.azure.core.util.paging.ContinuablePagedByIteratorBase.requestPage(ContinuablePagedByIteratorBase.java:102) ~[azure-core-1.49.0.jar:1.49.0]
    at com.azure.core.util.paging.ContinuablePagedByItemIterable$ContinuablePagedByItemIterator.<init>(ContinuablePagedByItemIterable.java:75) ~[azure-core-1.49.0.jar:1.49.0]
    at com.azure.core.util.paging.ContinuablePagedByItemIterable.iterator(ContinuablePagedByItemIterable.java:55) ~[azure-core-1.49.0.jar:1.49.0]
    at java.base/java.lang.Iterable.spliterator(Iterable.java:101) ~[na:na]
    at com.azure.core.util.paging.ContinuablePagedIterable.stream(ContinuablePagedIterable.java:85) ~[azure-core-1.49.0.jar:1.49.0]
    at com.parsons.datamart.common.datalake.azure.writer.AzureDataLakeWriter.checkIfFileSystemExists(AzureDataLakeWriter.java:33) ~[main/:na]
    at com.parsons.datamart.common.datalake.azure.writer.AzureDataLakeWriter.write(AzureDataLakeWriter.java:44) ~[main/:na]
    at com.parsons.datamart.ingest.web.WebClientIngestor.notifyIngest(WebClientIngestor.java:74) ~[main/:na]
    at com.parsons.datamart.ingest.web.WebClientIngestor.lambda$ingest$1(WebClientIngestor.java:37) ~[main/:na]
    at reactor.core.publisher.LambdaMonoSubscriber.onNext(LambdaMonoSubscriber.java:171) ~[reactor-core-3.6.3.jar:3.6.3]
    at reactor.core.publisher.MonoFlatMap$FlatMapMain.secondComplete(MonoFlatMap.java:245) ~[reactor-core-3.6.3.jar:3.6.3]
    at reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:305) ~[reactor-core-3.6.3.jar:3.6.3]
    at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79) ~[reactor-core-3.6.3.jar:3.6.3]
    at reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:539) ~[reactor-core-3.6.3.jar:3.6.3]
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129) ~[reactor-core-3.6.3.jar:3.6.3]
    at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107) ~[reactor-core-3.6.3.jar:3.6.3]
    at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onNext(FluxMapFuseable.java:299) ~[reactor-core-3.6.3.jar:3.6.3]
    at reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onNext(FluxFilterFuseable.java:337) ~[reactor-core-3.6.3.jar:3.6.3]
    at reactor.core.publisher.Operators$BaseFluxToMonoOperator.completePossiblyEmpty(Operators.java:2097) ~[reactor-core-3.6.3.jar:3.6.3]
    at reactor.core.publisher.MonoCollect$CollectSubscriber.onComplete(MonoCollect.java:145) ~[reactor-core-3.6.3.jar:3.6.3]
    at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:144) ~[reactor-core-3.6.3.jar:3.6.3]
    at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:260) ~[reactor-core-3.6.3.jar:3.6.3]
    at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:144) ~[reactor-core-3.6.3.jar:3.6.3]
    at reactor.netty.channel.FluxReceive.onInboundComplete(FluxReceive.java:415) ~[reactor-netty-core-1.1.16.jar:1.1.16]
    at reactor.netty.channel.ChannelOperations.onInboundComplete(ChannelOperations.java:446) ~[reactor-netty-core-1.1.16.jar:1.1.16]
    at reactor.netty.channel.ChannelOperations.terminate(ChannelOperations.java:500) ~[reactor-netty-core-1.1.16.jar:1.1.16]
    at reactor.netty.http.client.HttpClientOperations.onInboundNext(HttpClientOperations.java:782) ~[reactor-netty-http-1.1.16.jar:1.1.16]
    at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:114) ~[reactor-netty-core-1.1.16.jar:1.1.16]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
    at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) ~[netty-codec-4.1.107.Final.jar:4.1.107.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
    at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346) ~[netty-codec-4.1.107.Final.jar:4.1.107.Final]
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318) ~[netty-codec-4.1.107.Final.jar:4.1.107.Final]
    at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
    at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1475) ~[netty-handler-4.1.107.Final.jar:4.1.107.Final]
    at io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1338) ~[netty-handler-4.1.107.Final.jar:4.1.107.Final]
    at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1387) ~[netty-handler-4.1.107.Final.jar:4.1.107.Final]
    at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:529) ~[netty-codec-4.1.107.Final.jar:4.1.107.Final]
    at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:468) ~[netty-codec-4.1.107.Final.jar:4.1.107.Final]
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:290) ~[netty-codec-4.1.107.Final.jar:4.1.107.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562) ~[netty-transport-4.1.107.Final.jar:4.1.107.Final]
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) ~[netty-common-4.1.107.Final.jar:4.1.107.Final]
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.107.Final.jar:4.1.107.Final]
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.107.Final.jar:4.1.107.Final]
    at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]
spring-webflux azure-data-lake
1个回答
0
投票

PagedIterable 总是阻塞的 reactor.core.Exceptions$ErrorCallbackNotImplemented:java.lang.IllegalStateException:block()/blockFirst()/blockLast() 正在阻塞,线程reactor-http-nio-10 不支持引起原因:java.lang.IllegalStateException: block()/blockFirst()/blockLast() 是阻塞的,线程reactor-http-nio-10不支持

上面的错误表明,正在使用

block()
方法,这是当前线程不支持的阻塞操作。

我同意 Igor Artamonov 的评论,要解决

Spring WebFlux
等非阻塞环境中的阻塞调用问题,请使用 Azure SDK 的异步、非阻塞方法。

您似乎正在使用 Azure Data Lake Storage 客户端的同步版本。为了避免阻塞,可以切换到异步版本的客户端。

根据您的代码,表明您正在使用同步

Azure Data Lake Storage client
。为了避免阻塞,请使用异步版本的客户端。

这是

Azure Data Lake Storage client
的最新依赖。

Pom.XML

 <dependency>
     <groupId>com.azure</groupId>
     <artifactId>azure-storage-file-datalake</artifactId>
     <version>12.19.0</version>
 </dependency>

您可以使用下面的异步代码来获取以

mono
类型表示结果的文件系统列表。

public Mono<Boolean> checkIfFileSystemExists(String fileSystem) {
    return dataLakeServiceAsyncClient.listFileSystems()
        .filter(c -> fileSystem.equals(c.getName()))
        .hasElements();
}
如果过滤后的 Flux 中有元素,

Mono 返回

true
,如果没有,则返回
false

参考: DataLakeServiceAsyncClient 类 |微软学习

© www.soinside.com 2019 - 2024. All rights reserved.