RxJavaのリソース管理: イベント放出の際にisUnsubscribed()をチェックすべきだった!
いままでずっと勘違いしてましたが、チェックすべきなんですね。
きっかけはこれ:
これは Observable.create(OnSubscribe)
で onNext()
や onComplete()
でイベントを放出する際に、Subscriber#isUnsubscribed()
でsubscriberの生死をチェックすべきではないかという話です。RxJava official wikiがそうしているから、そうするべきではないかというのが議論の発端ですが、釈然としないので自分でも考えたり検証してみました。
議論の対象となるコードは Selector#executeAsObservable()
です。
@NonNull public Observable<Model> executeAsObservable() { return Observable.create(new Observable.OnSubscribe<Model>() { @Override public void call(final Subscriber<? super Model> subscriber) { forEach(new Action1<Model>() { @Override public void call(Model item) { subscriber.onNext(item); } }); subscriber.onCompleted(); } }); }
これをsubscribe()
してすぐunsubscribe()
すると、subscribe()
した側のsubscriberのonNext()
などは呼ばれません。つまりこのコードは動いているように見えます。
一方で、この Observable.OnSubscribe#call()
が中断されるわけではないので、イベントは送り続けます。その結果、subscribe()
する側からみると、subscribe()
前のmap()
などの関数は依然として呼ばれます。ここが釈然としないところで、unsubscribe()
によって OnSubscribe#call()
が中断されるものと思っていました。
これが実際に問題になるケースは多くないと思われますが、適切にunsubscribe()
をハンドルしないと無駄にCPUやメモリを使うことになるし、HTTP requestなどでは unsubscribe()
の時にキャンセルしたいでしょうし、 SQLiteDatabaseも API level 16 から CancellationSignal
でクエリのキャンセルを行えるので、それなりの対応は必要と思われます。
というわけで、 https://github.com/gfx/Android-Orma/pull/209 でその対応を行いました。
最初の対応は Cursor#close()
と Cursor#isClosed()
でのハンドリングでしたが:
@NonNull public Observable<Model> executeAsObservable() { return Observable.create(new Observable.OnSubscribe<Model>() { @Override public void call(final Subscriber<? super Model> subscriber) { final Cursor cursor = execute(); subscriber.add(Subscriptions.create(new Action0() { @Override public void call() { cursor.close(); } })); try { for (int pos = 0; cursor.moveToPosition(pos); pos++) { if (cursor.isClosed()) { return; } subscriber.onNext(newModelFromCursor(cursor)); } } finally { if (!cursor.isClosed()) { cursor.close(); } } if (cursor.isClosed()) { return; } subscriber.onCompleted(); } }); }
複雑すぎてバグを引き起こしやすいので((実際、コードにはバグがあります。finallyのなかで Cursor#close()
するので onComplete()
が決して呼ばれないのです。)) isUnsubscribed()
を使うコードに置き換えました。
@NonNull public Observable<Model> executeAsObservable() { return Observable.create(new Observable.OnSubscribe<Model>() { @Override public void call(final Subscriber<? super Model> subscriber) { final Cursor cursor = execute(); try { for (int pos = 0; !subscriber.isUnsubscribed() && cursor.moveToPosition(pos); pos++) { subscriber.onNext(newModelFromCursor(cursor)); } if (!subscriber.isUnsubscribed()) { subscriber.onCompleted(); } } finally { cursor.close(); } } }); }
OnSubscribe#call()
のなかでunsubscribe()
というイベントを得る方法としては、isUnsubscribed()
でイベントをpullするよりも Subscriber#add()
にコールバックを設定してイベントをpushで受け取るほうが筋がいいと思いますが、今回はシンプルに書ける isUnsubscribed()
を採用することにしました。