
いままでずっと勘違いしてましたが、チェックすべきなんですね。
きっかけはこれ:
これは Observable.create(OnSubscribe) で onNext() や onComplete() でイベントを放出する際に、Subscriber#isUnsubscribed() でsubscriberの生死をチェックすべきではないかという話です。RxJava official wikiがそうしているから、そうするべきではないかというのが議論の発端ですが、釈然としないので自分でも考えたり検証してみました。
議論の対象となるコードは Selector#executeAsObservable() です。
@NonNull public Observable<Model> executeAsObservable() { return Observable.create(new Observable.OnSubscribe<Model>() { @Override public void call(final Subscriber<? super Model> subscriber) { forEach(new Action1<Model>() { @Override public void call(Model item) { subscriber.onNext(item); } }); subscriber.onCompleted(); } }); }
これをsubscribe()してすぐunsubscribe()すると、subscribe()した側のsubscriberのonNext() などは呼ばれません。つまりこのコードは動いているように見えます。
一方で、この Observable.OnSubscribe#call() が中断されるわけではないので、イベントは送り続けます。その結果、subscribe()する側からみると、subscribe()前のmap() などの関数は依然として呼ばれます。ここが釈然としないところで、unsubscribe() によって OnSubscribe#call() が中断されるものと思っていました。
これが実際に問題になるケースは多くないと思われますが、適切にunsubscribe()をハンドルしないと無駄にCPUやメモリを使うことになるし、HTTP requestなどでは unsubscribe() の時にキャンセルしたいでしょうし、 SQLiteDatabaseも API level 16 から CancellationSignalでクエリのキャンセルを行えるので、それなりの対応は必要と思われます。
というわけで、 https://github.com/gfx/Android-Orma/pull/209 でその対応を行いました。
最初の対応は Cursor#close() と Cursor#isClosed() でのハンドリングでしたが:
@NonNull public Observable<Model> executeAsObservable() { return Observable.create(new Observable.OnSubscribe<Model>() { @Override public void call(final Subscriber<? super Model> subscriber) { final Cursor cursor = execute(); subscriber.add(Subscriptions.create(new Action0() { @Override public void call() { cursor.close(); } })); try { for (int pos = 0; cursor.moveToPosition(pos); pos++) { if (cursor.isClosed()) { return; } subscriber.onNext(newModelFromCursor(cursor)); } } finally { if (!cursor.isClosed()) { cursor.close(); } } if (cursor.isClosed()) { return; } subscriber.onCompleted(); } }); }
複雑すぎてバグを引き起こしやすいので((実際、コードにはバグがあります。finallyのなかで Cursor#close() するので onComplete() が決して呼ばれないのです。)) isUnsubscribed() を使うコードに置き換えました。
@NonNull public Observable<Model> executeAsObservable() { return Observable.create(new Observable.OnSubscribe<Model>() { @Override public void call(final Subscriber<? super Model> subscriber) { final Cursor cursor = execute(); try { for (int pos = 0; !subscriber.isUnsubscribed() && cursor.moveToPosition(pos); pos++) { subscriber.onNext(newModelFromCursor(cursor)); } if (!subscriber.isUnsubscribed()) { subscriber.onCompleted(); } } finally { cursor.close(); } } }); }
OnSubscribe#call()のなかでunsubscribe() というイベントを得る方法としては、isUnsubscribed()でイベントをpullするよりも Subscriber#add() にコールバックを設定してイベントをpushで受け取るほうが筋がいいと思いますが、今回はシンプルに書ける isUnsubscribed() を採用することにしました。