RxJava OutOfMemory运行时崩溃无法创建线程

问题描述 投票:0回答:3

我正在使用Retrofit2 + RxJava + RxAndroid进行网络呼叫。

我尝试过不同的Schedulers同样io()newThread()。我知道使用newThread()的后果,但仍然应用程序正在关闭OOM的原因。

场景:

  • 过滤列表(过滤后65项)
  • 从URL下载gzip
  • 写字节并存储在域中

以上网络通话发生65次。

码:

/*form version download method*/
public void startFormDownload(List<Form> form, ApiClient apiClient) {
    Observable.fromIterable(form)
            .concatMapIterable(Form::getFormVersions) // get form version list from single form object
            .doOnSubscribe(disposable -> AppLogger.i(tag, "Form Versions download is subscribed")) // subscribe process to rx
            .filter(this::checkFormVersionToDownloadOrNot) // only get those versions who needed to downloaded
            .doOnNext(formVersion -> { // get formVersion object

                AppLogger.i(tag, "download this form ---------> " + formVersion.getFormUrl());
                AppLogger.i(tag, formVersion.getFormUrl());
                String constructURL = formVersion.getFormUrl();

                /* download form gzip process starts from here */
                apiClient.getJsonByFormURL(constructURL)
                        .subscribeOn(Schedulers.newThread())
                        .observeOn(AndroidSchedulers.mainThread())
                        .subscribe(new SingleObserver<Response<ResponseBody>>() {
                            @Override
                            public void onSubscribe(Disposable disposable) {
                                AppLogger.i(tag, "gzip download ->" + "subscribed");
                            }

                            @Override
                            public void onSuccess(Response<ResponseBody> responseBodyResponse) {
                                AppLogger.i(tag, "gzip downloaded " + " success");
                                AppLogger.i(tag, "gzip downloaded ->" + responseBodyResponse.toString());

                                if (responseBodyResponse.code() == 401) {
                                    //not authorized
                                    return;
                                }

                                ByteArrayInputStream bais = null;
                                if (responseBodyResponse.body() != null)
                                    try {
                                        bais = new ByteArrayInputStream(responseBodyResponse.body().bytes());

                                        GZIPInputStream gzis = new GZIPInputStream(bais);
                                        InputStreamReader reader = new InputStreamReader(gzis);
                                        BufferedReader in = new BufferedReader(reader);

                                        String readed;
                                        StringBuilder stringBuilder = new StringBuilder();
                                        while ((readed = in.readLine()) != null) {
                                            stringBuilder.append(readed);
                                        }

                                        baseRealm = Realm.getDefaultInstance();
                                        formVersion.setJsonString(stringBuilder.toString());
                                        baseRealm.executeTransaction(realm ->
                                                baseRealm.copyToRealmOrUpdate(formVersion));



                                    } catch (IOException e) {
                                        AppLogger.e(tag, "gzip extract exception->" + e.getLocalizedMessage(), e);
                                    }
                            }

                            @Override
                            public void onError(Throwable throwable) {
                                AppLogger.e(tag, "gzip failed->" + throwable.getMessage(), throwable);
                            }
                        });


            })
            .doOnTerminate(() -> AppLogger.i(tag, "Form Versions terminated"))
            .doOnError(Throwable::printStackTrace)
            .subscribe(
                    formVersion -> AppLogger.i(tag, "Form Versions download completed"),
                    throwable -> AppLogger.e(tag, throwable.getMessage(), throwable)
            );
}

可能有InputStream导致OOM的情况,但我不知道在这里可以做些什么。

PS。 OOM有时只发生,而不是总是发生。

崩溃报告

java.lang.OutOfMemoryError: Could not allocate JNI Env
at java.lang.Thread.nativeCreate(Native Method)
at java.lang.Thread.start(Thread.java:730)
at java.util.concurrent.ThreadPoolExecutor.addWorker(ThreadPoolExecutor.java:941)
at java.util.concurrent.ThreadPoolExecutor.ensurePrestart(ThreadPoolExecutor.java:1582)
at java.util.concurrent.ScheduledThreadPoolExecutor.delayedExecute(ScheduledThreadPoolExecutor.java:313)
at java.util.concurrent.ScheduledThreadPoolExecutor.schedule(ScheduledThreadPoolExecutor.java:550)
at java.util.concurrent.ScheduledThreadPoolExecutor.submit(ScheduledThreadPoolExecutor.java:651)
at io.reactivex.internal.schedulers.NewThreadWorker.scheduleActual(NewThreadWorker.java:146)
at io.reactivex.internal.schedulers.NewThreadWorker.schedule(NewThreadWorker.java:51)
at io.reactivex.Scheduler.scheduleDirect(Scheduler.java:135)
at io.reactivex.Scheduler.scheduleDirect(Scheduler.java:111)
at io.reactivex.internal.operators.single.SingleSubscribeOn.subscribeActual(SingleSubscribeOn.java:37)
at io.reactivex.Single.subscribe(Single.java:2779)
at io.reactivex.internal.operators.single.SingleObserveOn.subscribeActual(SingleObserveOn.java:35)
at io.reactivex.Single.subscribe(Single.java:2779)
at com.visualogyx.app.fragments.VisualogyxBaseFragment.lambda$startFormDownload$10$VisualogyxBaseFragment(VisualogyxBaseFragment.java:623)
at com.visualogyx.app.fragments.VisualogyxBaseFragment$$Lambda$9.accept(Unknown Source)
at io.reactivex.internal.operators.observable.ObservableDoOnEach$DoOnEachObserver.onNext(ObservableDoOnEach.java:95)
at io.reactivex.internal.operators.observable.ObservableFilter$FilterObserver.onNext(ObservableFilter.java:52)
at io.reactivex.internal.observers.DisposableLambdaObserver.onNext(DisposableLambdaObserver.java:58)
at io.reactivex.internal.operators.observable.ObservableFlattenIterable$FlattenIterableObserver.onNext(ObservableFlattenIterable.java:111)
at io.reactivex.internal.operators.observable.ObservableFromIterable$FromIterableDisposable.run(ObservableFromIterable.java:98)
at io.reactivex.internal.operators.observable.ObservableFromIterable.subscribeActual(ObservableFromIterable.java:58)
at io.reactivex.Observable.subscribe(Observable.java:10910)
at io.reactivex.internal.operators.observable.ObservableFlattenIterable.subscribeActual(ObservableFlattenIterable.java:44)
at io.reactivex.Observable.subscribe(Observable.java:10910)
at io.reactivex.internal.operators.observable.ObservableDoOnLifecycle.subscribeActual(ObservableDoOnLifecycle.java:33)
at io.reactivex.Observable.subscribe(Observable.java:10910)
at io.reactivex.internal.operators.observable.ObservableFilter.subscribeActual(ObservableFilter.java:30)
at io.reactivex.Observable.subscribe(Observable.java:10910)
at io.reactivex.internal.operators.observable.ObservableDoOnEach.subscribeActual(ObservableDoOnEach.java:42)
at io.reactivex.Observable.subscribe(Observable.java:10910)
at io.reactivex.internal.operators.observable.ObservableDoOnEach.subscribeActual(ObservableDoOnEach.java:42)
at io.reactivex.Observable.subscribe(Observable.java:10910)
at io.reactivex.internal.operators.observable.ObservableDoOnEach.subscribeActual(ObservableDoOnEach.java:42)
at io.reactivex.Observable.subscribe(Observable.java:10910)
at io.reactivex.Observable.subscribe(Observable.java:10896)
at io.reactivex.Observable.subscribe(Observable.java:10825)
at com.visualogyx.app.fragments.VisualogyxBaseFragment.startFormDownload(VisualogyxBaseFragment.java:683)
at com.visualogyx.app.fragments.VisualogyxBaseFragment.updateJobAndDownloadCheckListForm(VisualogyxBaseFragment.java:239)
at com.visualogyx.app.fragments.VisualogyxBaseFragment.lambda$null$1$VisualogyxBaseFragment(VisualogyxBaseFragment.java:221)
at com.visualogyx.app.fragments.VisualogyxBaseFragment$$Lambda$15.test(Unknown Source)
at io.reactivex.internal.operators.observable.ObservableAllSingle$AllObserver.onNext(ObservableAllSingle.java:69)
at io.reactivex.internal.operators.observable.ObservableFromIterable$FromIterableDisposable.run(ObservableFromIterable.java:98)
at io.reactivex.internal.operators.observable.ObservableFromIterable.subscribeActual(ObservableFromIterable.java:58)
at io.reactivex.Observable.subscribe(Observable.java:10910)
at io.reactivex.internal.operators.observable.ObservableAllSingle.subscribeActual(ObservableAllSingle.java:34)
at io.reactivex.Single.subscribe(Single.java:2779)
at io.reactivex.Single.subscribe(Single.java:2765)
at io.reactivex.Single.subscribe(Single.java:2686)
at com.visualogyx.app.fragments.VisualogyxBaseFragment.lambda$storeAndRetrieveListFromDB$2$VisualogyxBaseFragment(VisualogyxBaseFragment.java:226)
at com.visualogyx.app.fragments.VisualogyxBaseFragment$$Lambda$0.execute(Unknown Source)
at io.realm.Realm.executeTransaction(Realm.java:1394)
at com.visualogyx.app.fragments.VisualogyxBaseFragment.storeAndRetrieveListFromDB(VisualogyxBaseFragment.java:200)
at com.visualogyx.app.fragments.HomeFragment$1.onSuccess(HomeFragment.java:59)
at com.visualogyx.app.fragments.HomeFragment$1.onSuccess(HomeFragment.java:54)
at com.visualogyx.app.controller.ApiCallback.onResponse(ApiCallback.java:36)
at com.visualogyx.app.controller.ApiClient$11.onResponse(ApiClient.java:264)
at retrofit2.ExecutorCallAdapterFactory$ExecutorCallbackCall$1$1.run(ExecutorCallAdapterFactory.java:70)
at android.os.Handler.handleCallback(Handler.java:836)
at android.os.Handler.dispatchMessage(Handler.java:103)
at android.os.Looper.loop(Looper.java:203)
at android.app.ActivityThread.main(ActivityThread.java:6293)
at java.lang.reflect.Method.invoke(Native Method)
at com.android.internal.os.ZygoteInit$MethodAndArgsCaller.run(ZygoteInit.java:1065)
at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:926)
android rx-java retrofit2 rx-android
3个回答
2
投票

你可以使用bytestream而不是byte []

String body = null;
String charset = "UTF-8"; // You should determine it based on response header.

try (
    InputStream gzippedResponse = responseBodyResponse.body().byteStream();
    InputStream ungzippedResponse = new GZIPInputStream(gzippedResponse);
    Reader reader = new InputStreamReader(ungzippedResponse, charset);
    Writer writer = new StringWriter();
) {
    char[] buffer = new char[10240];
    for (int length = 0; (length = reader.read(buffer)) > 0;) {
        writer.write(buffer, 0, length);
    }
    body = writer.toString();
}

0
投票

也许您应该将日志添加到代码的不同部分并找出内存占用内存,哪一部分占用内存。或者你可以使用AndroidStuido内存分配工具来监控内存分配。


0
投票

我不得不在doFinally()块中移动与事务相关的操作。到现在为止还挺好。

Observable.fromIterable(form)
            .concatMapIterable(Form::getFormVersions) // get form version list from single form object
            .doOnSubscribe(disposable -> AppLogger.i(tag, "Form Versions download is subscribed")) // subscribe process to rx
            .filter(formVersion -> formVersion.getJsonString() == null) // only get those versions who is not downloaded yet
            .doOnNext(formVersion -> { // get formVersion object

                AppLogger.i(tag, "download this form ---------> " + formVersion.getFormUrl());
                AppLogger.i(tag, formVersion.getFormUrl());
                String constructURL = formVersion.getFormUrl();

                /* download form gzip process starts from here */
                apiClient.getJsonByFormURL(constructURL)
                        .subscribeOn(Schedulers.io())
                        .subscribe(new SingleObserver<Response<ResponseBody>>() {
                            @Override
                            public void onSubscribe(Disposable disposable) {
                                AppLogger.i(tag, "gzip download ->" + "subscribed");

                            }

                            @Override
                            public void onSuccess(Response<ResponseBody> responseBodyResponse) {
                                AppLogger.i(tag, "gzip downloaded " + " success");
                                AppLogger.i(tag, "gzip downloaded ->" + responseBodyResponse.toString());

                                if (responseBodyResponse.code() == 401) {
                                    //not authorized
                                    return;
                                }
                                // You should determine it based on response header.

                                if (responseBodyResponse.body() != null)
                                    try {
                                        gzipBody = null;

                                        gzis = responseBodyResponse.body().byteStream();
                                        ungzippedResponse = new GZIPInputStream(gzis);
                                        reader = new InputStreamReader(ungzippedResponse, charset);
                                        writer = new java.io.StringWriter();

                                        buffer = new char[10240];
                                        for (int length = 0; (length = reader.read(buffer)) > 0; ) {
                                            writer.write(buffer, 0, length);
                                        }
                                        gzipBody = writer.toString();

                                        formVersion.setJsonString(gzipBody);
                                        AppLogger.i(tag, "set json string ->" + gzipBody);

                                    } catch (IOException e) {
                                        AppLogger.e(tag, "gzip extract exception->" + e.getLocalizedMessage(), e);
                                    }
                            }

                            @Override
                            public void onError(Throwable throwable) {
                                AppLogger.e(tag, "gzip failed->" + throwable.getMessage(), throwable);
                            }
                        });


            })
            .doOnTerminate(() -> AppLogger.i(tag, "Form Versions terminated"))
            .doOnError(Throwable::printStackTrace)
            .doFinally(() -> {
                baseRealm = Realm.getDefaultInstance();
                baseRealm.executeTransaction(realm ->
                        realm.copyToRealmOrUpdate(form));
            })
            .subscribe(
                    formVersion -> AppLogger.i(tag, "Form Versions download completed"),
                    throwable -> AppLogger.e(tag, throwable.getMessage(), throwable)
            );
© www.soinside.com 2019 - 2024. All rights reserved.