在 Android 中使用 RxJava、OkHttp 和 Okio 下载进度

问题描述 投票:0回答:3

在我们的应用程序中,我使用此代码下载图像文件。我需要在用户界面上显示下载进度(下载的字节数百分比)。如何在此代码中获取下载进度?我寻找解决方案,但仍然无法自己完成。

Observable<String> downloadObservable = Observable.create(
                    sub -> {
                        Request request = new Request.Builder()
                                .url(media.getMediaUrl())
                                .build();
                        Response response = null;
                        try {
                            response = http_client.newCall(request).execute();
                            if (response.isSuccessful()) {
                                Log.d(TAG, "response.isSuccessful()");
                                String mimeType = MimeTypeMap.getFileExtensionFromUrl(media.getMediaUrl());
                                File file = new File(helper.getTmpFolder() + "/" + helper.generateUniqueName() + "test." + mimeType);
                                BufferedSink sink = Okio.buffer(Okio.sink(file));
                                sink.writeAll(response.body().source());
                                sink.close();
                                sub.onNext(response.toString());
                                sub.onCompleted();
                            } else {
                                sub.onError(new IOException());
                            }
                        } catch (IOException e) {
                            e.printStackTrace();
                        }

                    }
            );

            Subscriber<String> mySubscriber = new Subscriber<String>() {
                @Override
                public void onNext(String responseString) {
                    Log.d(TAG, "works: " + responseString);
                }
            };
            downloadObservable
                    .subscribeOn(Schedulers.newThread())
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(mySubscriber);
android rx-java okhttp okio
3个回答
24
投票

这就是我会做的来显示进度。

Observable<String> downloadObservable = Observable.create(
  sub -> {
          InputStream input = null;
          OutputStream output = null;
          try {
          Response response = http_client.newCall(request).execute();
           if (response.isSuccessful()) {                 
             input = response.body().byteStream();
             long tlength= response.body().contentLength();

             output = new FileOutputStream("/pathtofile");
             byte data[] = new byte[1024];

             sub.onNext("0%");
             long total = 0;
             int count;
             while ((count = input.read(data)) != -1) {
               total += count;

               sub.onNext(String.valueOf(total*100/tlength) + "%");

               output.write(data, 0, count);
             }
             output.flush();
             output.close();
             input.close();
           }
          } catch(IOException e){
            sub.onError(e);
          } finally {
                if (input != null){
                    try {
                        input.close();
                    }catch(IOException ioe){}
                }
                if (out != null){
                    try{
                        output.close();
                    }catch (IOException e){}                        
                }
          }
        sub.onCompleted();
   }
);

并使用具有完整抽象方法的订阅者。

Subscriber<String> mySubscriber = new Subscriber<String>() {

@Override
public void onCompleted() {
  // hide progress bar
}

@Override
public void onError(Throwable e) {
  // hide progress bar
}

@Override
public void onNext(String percentProgress) {
  // show percentage progress
}
};

0
投票

接受的答案仅显示保存下载文件的进度,而不显示实际的下载进度。

在 Kotlin 上创建了带有进度解决方案的 Rx 下载文件,希望有帮助:

private const val FULL_PROGRESS = 100L

class DownloadWithProgress {

    fun start(url: String, directory: File, fileName: String) =
        Observable.create<Download> { emitter ->
            try {
                val request = Request.Builder()
                    .url(url)
                    .build()

                OkHttpClient.Builder()
                    .addNetworkInterceptor { chain: Interceptor.Chain ->
                        val originalResponse = chain.proceed(chain.request())
                        originalResponse.newBuilder()
                            .body(originalResponse.body()?.let { responseBody ->
                                ProgressResponseBody(responseBody) { emitter.onNext(Download(it)) }
                            })
                            .build()
                    }
                    .build()
                    .newCall(request).execute().use { response ->
                        if (!response.isSuccessful) throw IOException("Unexpected code $response")
                        val bufferedSource = response.body()?.source() ?: throw NullPointerException("Response is null")

                        try {
                            val packageFile = File(directory, fileName).apply {
                                sink().buffer().run {
                                    writeAll(bufferedSource)
                                    close()
                                }
                            }

                            emitter.onNext(Download(FULL_PROGRESS, packageFile))
                        } catch (ioException: Exception) {
                            throw AppUpdateException("Io exception: " + ioException.localizedMessage)
                        }

                        emitter.onComplete()
                    }
            } catch (exception: Exception) {
                emitter.onError(exception)
            }
        }

    private class ProgressResponseBody(
        private val responseBody: ResponseBody,
        private val onProgress: (progress: Long) -> Unit
    ) : ResponseBody() {

        private var bufferedSource: BufferedSource? = null

        override fun contentType() = responseBody.contentType()

        override fun contentLength() = responseBody.contentLength()

        override fun source() = bufferedSource ?: source(responseBody.source()).buffer().also { bufferedSource = it }

        private fun source(source: Source): Source {
            return object : ForwardingSource(source) {
                var totalBytesRead = 0L

                override fun read(sink: Buffer, byteCount: Long): Long {
                    return super.read(sink, byteCount).apply {
                        val bytesRead = this
                        totalBytesRead += if (bytesRead != -1L) bytesRead else 0

                        if (bytesRead != -1L) {
                            val progress = FULL_PROGRESS * totalBytesRead / responseBody.contentLength()
                            onProgress(progress)
                        }
                    }
                }
            }
        }
    }
}

data class Download(
    val progress: Long,
    val file: File? = null
) {
    val isDownloaded = progress == FULL_PROGRESS && file != null
}

0
投票

这就是我和 Okio 一起做的:

new SingleFromCallable<>(() -> {
            Request.Builder requestBuilder = new Request.Builder();
            BufferedSink sink=null; BufferedSource source=null;
            try(Response response = okHttpClient.newCall(requestBuilder.url(downloadUrl).build()).execute()){
                if(response.isSuccessful()){
                    ResponseBody responseBody = response.body(); assert responseBody != null;
                    long totalSize = responseBody.contentLength(); 
                    source = responseBody.source();
                    File downloadFile = new File(filePath);
                    sink = Okio.buffer(Okio.sink(downloadFile));
                    int bufferSize = 8*1024; long downloadedSize=0;
                    for(long bytesRead; (bytesRead= source.read(sink.getBuffer(),bufferSize))!=-1;){
                        sink.emit(); downloadedSize+=bytesRead;
                        publishProgress(downloadedSize*100/totalSize);
                    }
                    Log.d(TAG, "File successfully downloaded and saved to storage.");
                    return CODE_SUCCESS;
                }
            }
            catch (IOException e) {e.printStackTrace();}
            finally {if(sink!=null){sink.flush(); sink.close();} if(source!=null)source.close();}
            return CODE_ERROR;
        }).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new SingleObserver<Long>() {
            @Override public void onSubscribe(@NonNull Disposable d) {Log.d(TAG, "Subscribed");}
            @Override public void onSuccess(@NonNull Long c) {Log.d(TAG, "Execution completed. Code:"+c);}
            @Override public void onError(@NonNull Throwable e) {e.printStackTrace();}
        });

您可以使用以下方法更新主线程上的 UI:

Handler handler = new Handler(Looper.getMainLooper());
handler.post(()-> {
     publishProgress(); //do UI stuff
});
© www.soinside.com 2019 - 2024. All rights reserved.