RxJavaのリソース管理: イベント放出の際にisUnsubscribed()をチェックすべきだった!

f:id:gfx:20160228114108p:plain

いままでずっと勘違いしてましたが、チェックすべきなんですね。

きっかけはこれ:

これは Observable.create(OnSubscribe)onNext()onComplete() でイベントを放出する際に、Subscriber#isUnsubscribed() でsubscriberの生死をチェックすべきではないかという話です。RxJava official wikiがそうしているから、そうするべきではないかというのが議論の発端ですが、釈然としないので自分でも考えたり検証してみました。

議論の対象となるコードは Selector#executeAsObservable() です。

@NonNull
public Observable<Model> executeAsObservable() {
    return Observable.create(new Observable.OnSubscribe<Model>() {
        @Override
        public void call(final Subscriber<? super Model> subscriber) {
            forEach(new Action1<Model>() {
                @Override
                public void call(Model item) {
                    subscriber.onNext(item);
                }
            });
            subscriber.onCompleted();
        }
    });
}

これをsubscribe()してすぐunsubscribe()すると、subscribe()した側のsubscriberのonNext() などは呼ばれません。つまりこのコードは動いているように見えます。

一方で、この Observable.OnSubscribe#call() が中断されるわけではないので、イベントは送り続けます。その結果、subscribe()する側からみると、subscribe()前のmap() などの関数は依然として呼ばれます。ここが釈然としないところで、unsubscribe() によって OnSubscribe#call() が中断されるものと思っていました。

これが実際に問題になるケースは多くないと思われますが、適切にunsubscribe()をハンドルしないと無駄にCPUやメモリを使うことになるし、HTTP requestなどでは unsubscribe() の時にキャンセルしたいでしょうし、 SQLiteDatabaseも API level 16 から CancellationSignalでクエリのキャンセルを行えるので、それなりの対応は必要と思われます。

というわけで、 https://github.com/gfx/Android-Orma/pull/209 でその対応を行いました。

最初の対応は Cursor#close()Cursor#isClosed() でのハンドリングでしたが:

@NonNull
public Observable<Model> executeAsObservable() {
    return Observable.create(new Observable.OnSubscribe<Model>() {
        @Override
        public void call(final Subscriber<? super Model> subscriber) {
            final Cursor cursor = execute();
            subscriber.add(Subscriptions.create(new Action0() {
                @Override
                public void call() {
                    cursor.close();
                }
            }));
            try {
                for (int pos = 0; cursor.moveToPosition(pos); pos++) {
                    if (cursor.isClosed()) {
                        return;
                    }
                    subscriber.onNext(newModelFromCursor(cursor));
                }
            } finally {
                if (!cursor.isClosed()) {
                    cursor.close();
                }
            }
            if (cursor.isClosed()) {
                return;
            }
            subscriber.onCompleted();
        }
    });
}

複雑すぎてバグを引き起こしやすいので((実際、コードにはバグがあります。finallyのなかで Cursor#close() するので onComplete() が決して呼ばれないのです。)) isUnsubscribed() を使うコードに置き換えました。

@NonNull
public Observable<Model> executeAsObservable() {
    return Observable.create(new Observable.OnSubscribe<Model>() {
        @Override
        public void call(final Subscriber<? super Model> subscriber) {
            final Cursor cursor = execute();
            try {
                for (int pos = 0; !subscriber.isUnsubscribed() && cursor.moveToPosition(pos); pos++) {
                    subscriber.onNext(newModelFromCursor(cursor));
                }
                if (!subscriber.isUnsubscribed()) {
                    subscriber.onCompleted();
                }
            } finally {
                cursor.close();
            }
        }
    });
}

OnSubscribe#call()のなかでunsubscribe() というイベントを得る方法としては、isUnsubscribed()でイベントをpullするよりも Subscriber#add() にコールバックを設定してイベントをpushで受け取るほうが筋がいいと思いますが、今回はシンプルに書ける isUnsubscribed() を採用することにしました。