いままでずっと勘違いしてましたが、チェックすべきなんですね。
きっかけはこれ:
これは Observable.create(OnSubscribe)
で onNext()
や onComplete()
でイベントを放出する際に、Subscriber#isUnsubscribed()
でsubscriberの生死をチェックすべきではないかという話です。RxJava official wikiがそうしているから、そうするべきではないかというのが議論の発端ですが、釈然としないので自分でも考えたり検証してみました。
議論の対象となるコードは Selector#executeAsObservable()
です。
@NonNull
public Observable<Model> executeAsObservable() {
return Observable.create(new Observable.OnSubscribe<Model>() {
@Override
public void call(final Subscriber<? super Model> subscriber) {
forEach(new Action1<Model>() {
@Override
public void call(Model item) {
subscriber.onNext(item);
}
});
subscriber.onCompleted();
}
});
}
これをsubscribe()
してすぐunsubscribe()
すると、subscribe()
した側のsubscriberのonNext()
などは呼ばれません。つまりこのコードは動いているように見えます。
一方で、この Observable.OnSubscribe#call()
が中断されるわけではないので、イベントは送り続けます。その結果、subscribe()
する側からみると、subscribe()
前のmap()
などの関数は依然として呼ばれます。ここが釈然としないところで、unsubscribe()
によって OnSubscribe#call()
が中断されるものと思っていました。
これが実際に問題になるケースは多くないと思われますが、適切にunsubscribe()
をハンドルしないと無駄にCPUやメモリを使うことになるし、HTTP requestなどでは unsubscribe()
の時にキャンセルしたいでしょうし、 SQLiteDatabaseも API level 16 から CancellationSignal
でクエリのキャンセルを行えるので、それなりの対応は必要と思われます。
というわけで、 https://github.com/gfx/Android-Orma/pull/209 でその対応を行いました。
最初の対応は Cursor#close()
と Cursor#isClosed()
でのハンドリングでしたが:
@NonNull
public Observable<Model> executeAsObservable() {
return Observable.create(new Observable.OnSubscribe<Model>() {
@Override
public void call(final Subscriber<? super Model> subscriber) {
final Cursor cursor = execute();
subscriber.add(Subscriptions.create(new Action0() {
@Override
public void call() {
cursor.close();
}
}));
try {
for (int pos = 0; cursor.moveToPosition(pos); pos++) {
if (cursor.isClosed()) {
return;
}
subscriber.onNext(newModelFromCursor(cursor));
}
} finally {
if (!cursor.isClosed()) {
cursor.close();
}
}
if (cursor.isClosed()) {
return;
}
subscriber.onCompleted();
}
});
}
複雑すぎてバグを引き起こしやすいので((実際、コードにはバグがあります。finallyのなかで Cursor#close()
するので onComplete()
が決して呼ばれないのです。)) isUnsubscribed()
を使うコードに置き換えました。
@NonNull
public Observable<Model> executeAsObservable() {
return Observable.create(new Observable.OnSubscribe<Model>() {
@Override
public void call(final Subscriber<? super Model> subscriber) {
final Cursor cursor = execute();
try {
for (int pos = 0; !subscriber.isUnsubscribed() && cursor.moveToPosition(pos); pos++) {
subscriber.onNext(newModelFromCursor(cursor));
}
if (!subscriber.isUnsubscribed()) {
subscriber.onCompleted();
}
} finally {
cursor.close();
}
}
});
}
OnSubscribe#call()
のなかでunsubscribe()
というイベントを得る方法としては、isUnsubscribed()
でイベントをpullするよりも Subscriber#add()
にコールバックを設定してイベントをpushで受け取るほうが筋がいいと思いますが、今回はシンプルに書ける isUnsubscribed()
を採用することにしました。