我正在尝试从 Azure 容器读取大型 blob 的 InputStream 并将其存储在我自己的另一个云存储中。
对于小文件,它工作得很好。但对于大型 blob(大约 3GB),Azure 连接在几分钟后就会超时。
我还尝试将所有超时值设置为最大值。也没有帮助。
代码片段:
HttpClient httpClient = new NettyAsyncHttpClientBuilder();
.readTimeout(Duration.ofDays(365))
.responseTimeout(Duration.ofDays(365))
.connectTimeout(Duration.ofDays(365)).build();
RequestRetryOptions retryOptions = new RequestRetryOptions(
RetryPolicyType.EXPONENTIAL,
3,
Duration.ofSeconds(30),
null,
null,
null
);
HttpPipelinePolicy timeoutPolicy = new TimeoutPolicy(Duration.ofDays(365));
HttpClientOptions httpclientoptions = new HttpClientOptions()
.readTimeout(Duration.ofDays(365))
.responseTimeout(Duration.ofDays(365))
.setConnectionIdleTimeout(Duration.ofDays(365))
.setConnectTimeout(Duration.ofDays(365))
.setWriteTimeout(Duration.ofDays(365));
// Building the Azure Client
BlobServiceClient azureClient = new BlobServiceClientBuilder()
.connectionString("My_Connection_String_For_Authentication")
.httpClient(httpClient)
.addPolicy(timeoutPolicy)
.clientOptions(httpclientoptions)
.retryOptions(retryOptions)
.buildClient();
BlobContainerClient containerClient = azureClient.getBlobContainerClient("{azureContainerName}");
PagedResponse<BlobItem> pagedResponse = containerClient.listBlobs(options, continuationToken, null).iterableByPage().iterator().next();
List<BlobItem> pageItems = pagedResponse.getValue();
for(BlobItem blob : pageItems) {
// Opening the inputStream of the Blob
try(InputStream blobInputStream = containerClient.getBlobVersionClient(key, versionId).openInputStream();
BufferedInputStream bufferedStream = new BufferedInputStream(blobInputStream);){
// reading the bufferedStream and piping it to my own cloud storage.
}
从 InputStream 读取字节时,几分钟后出现以下错误。
java.net.SocketException: Broken pipe (Write failed)
at java.base/java.net.SocketOutputStream.socketWrite0(Native Method)
at java.base/java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:110)
at java.base/java.net.SocketOutputStream.write(SocketOutputStream.java:150)
at java.base/sun.security.ssl.SSLSocketOutputRecord.deliver(SSLSocketOutputRecord.java:346)
at java.base/sun.security.ssl.SSLSocketImpl$AppOutputStream.write(SSLSocketImpl.java:1213)
at okio.OutputStreamSink.write(JvmOkio.kt:56)
at okio.AsyncTimeout$sink$1.write(AsyncTimeout.kt:102)
at okio.RealBufferedSink.emitCompleteSegments(RealBufferedSink.kt:256)
at okio.RealBufferedSink.write(RealBufferedSink.kt:147)
at okhttp3.internal.http1.Http1ExchangeCodec$ChunkedSink.write(Http1ExchangeCodec.kt:311)
at okio.ForwardingSink.write(ForwardingSink.kt:29)
at okhttp3.internal.connection.Exchange$RequestBodySink.write(Exchange.kt:223)
at okio.RealBufferedSink.emitCompleteSegments(RealBufferedSink.kt:256)
at okio.RealBufferedSink.writeAll(RealBufferedSink.kt:195)
at com.zoho.nebula.requests.okhttp3.Okhttp3HttpClient$2.writeTo(Okhttp3HttpClient.java:284)
at okhttp3.internal.http.CallServerInterceptor.intercept(CallServerInterceptor.kt:62)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.kt:109)
at okhttp3.internal.connection.ConnectInterceptor.intercept(ConnectInterceptor.kt:34)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.kt:109)
at okhttp3.internal.cache.CacheInterceptor.intercept(CacheInterceptor.kt:95)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.kt:109)
at okhttp3.internal.http.BridgeInterceptor.intercept(BridgeInterceptor.kt:83)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.kt:109)
at okhttp3.internal.http.RetryAndFollowUpInterceptor.intercept(RetryAndFollowUpInterceptor.kt:76)
at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.kt:109)
at okhttp3.internal.connection.RealCall.getResponseWithInterceptorChain$okhttp(RealCall.kt:201)
at okhttp3.internal.connection.RealCall.execute(RealCall.kt:154)
at com.zoho.nebula.requests.okhttp3.Okhttp3HttpClient.execute(Okhttp3HttpClient.java:89)
... 36 more
Suppressed: java.net.SocketException: Operation timed out (Read failed)
at java.base/java.net.SocketInputStream.socketRead0(Native Method)
at java.base/java.net.SocketInputStream.socketRead(SocketInputStream.java:115)
at java.base/java.net.SocketInputStream.read(SocketInputStream.java:168)
at java.base/java.net.SocketInputStream.read(SocketInputStream.java:140)
at java.base/sun.security.ssl.SSLSocketInputRecord.read(SSLSocketInputRecord.java:478)
at java.base/sun.security.ssl.SSLSocketInputRecord.readHeader(SSLSocketInputRecord.java:472)
at java.base/sun.security.ssl.SSLSocketInputRecord.bytesInCompletePacket(SSLSocketInputRecord.java:70)
at java.base/sun.security.ssl.SSLSocketImpl.readApplicationRecord(SSLSocketImpl.java:1333)
at java.base/sun.security.ssl.SSLSocketImpl$AppInputStream.read(SSLSocketImpl.java:976)
at okio.InputStreamSource.read(JvmOkio.kt:93)
at okio.AsyncTimeout$source$1.read(AsyncTimeout.kt:128)
at okio.RealBufferedSource.indexOf(RealBufferedSource.kt:430)
at okio.RealBufferedSource.readUtf8LineStrict(RealBufferedSource.kt:323)
at okhttp3.internal.http1.HeadersReader.readLine(HeadersReader.kt:29)
at okhttp3.internal.http1.Http1ExchangeCodec.readResponseHeaders(Http1ExchangeCodec.kt:180)
at okhttp3.internal.connection.Exchange.readResponseHeaders(Exchange.kt:110)
at okhttp3.internal.http.CallServerInterceptor.intercept(CallServerInterceptor.kt:93)
请协助我解决此问题并解释我做错了什么。
使用的依赖项:
azure-storage-blob-12.22.3.jar
azure-storage-common-12.21.2.jar
azure-core-1.40.0.jar
azure-storage-common-12.21.2.jar
azure-core-http-netty-1.13.4.jar
reactor-netty-http-1.0.31.jar
reactor-netty-core-1.0.31.jar
netty-resolver-dns-4.1.89.Final.jar
azure-json-1.0.1.jar
尝试从 Azure 容器读取大型 blob 的 InputStream 并将其存储在我自己的另一个云存储中。
您可以使用以下代码通过 Azure Java SDK 从 Azure Blob 存储中读取较大的文件。
代码:
private static final String CONNE_STRING="xxxxt";
private static final String CONTAINER_NAME = "xxxx";
private static final String BLOB_NAME = "xxxxx";
public static void main(String[] args) {
com.azure.core.http.HttpClient httpClient = new NettyAsyncHttpClientBuilder()
.readTimeout(Duration.ofDays(365))
.responseTimeout(Duration.ofDays(365))
.connectTimeout(Duration.ofDays(365))
.build();
RequestRetryOptions retryOptions = new RequestRetryOptions(
RetryPolicyType.EXPONENTIAL,
3,
Duration.ofSeconds(30),
null,
null,
null
);
HttpPipelinePolicy timeoutPolicy = new TimeoutPolicy(Duration.ofDays(365));
BlobServiceClient blobServiceClient = new BlobServiceClientBuilder()
.connectionString(CONNE_STRING)
.httpClient(httpClient)
.addPolicy(timeoutPolicy)
.retryOptions(retryOptions)
.buildClient();
BlobContainerClient containerClient = blobServiceClient.getBlobContainerClient(CONTAINER_NAME);
BlobClient blobClient = containerClient.getBlobClient(BLOB_NAME);
readBlobInChunks(blobClient);
}
private static void readBlobInChunks(BlobClient blobClient) {
final long chunkSize = 8 * 1024 * 1024; // 8MB chunks
try {
BlobProperties properties = blobClient.getProperties();
long blobSize = properties.getBlobSize();
for (long offset = 0; offset < blobSize; offset += chunkSize) {
long count = Math.min(chunkSize, blobSize - offset);
try (InputStream blobInputStream = blobClient.openInputStream(new BlobRange(offset, count), null);
BufferedInputStream bufferedStream = new BufferedInputStream(blobInputStream)) {
processChunk(bufferedStream, count);
}
}
System.out.println("Blob read successfully in chunks.");
} catch (IOException e) {
e.printStackTrace();
}
}
private static void processChunk(InputStream inputStream, long count) throws IOException {
byte[] buffer = new byte[8192];
int bytesRead;
long totalBytesRead = 0;
while (totalBytesRead < count && (bytesRead = inputStream.read(buffer)) != -1) {
totalBytesRead += bytesRead;
System.out.write(buffer, 0, bytesRead);
}
}
该方法从 Azure Blob 存储中读取 8 MB 块的大文件并处理每个块。目前,
processChunk
方法将数据打印到控制台。
您可以更改
processChunk
方法将数据上传到您自己的云存储而不是打印。您还可以修改读取块的方法,将每个块并行上传到您的云存储。
输出:
Blob read successfully in chunks.