RxJavaのリソース管理: イベント放出の際にisUnsubscribed()をチェックすべきだった!

f:id:gfx:20160228114108p:plain

いままでずっと勘違いしてましたが、チェックすべきなんですね。

きっかけはこれ:

これは Observable.create(OnSubscribe)onNext()onComplete() でイベントを放出する際に、Subscriber#isUnsubscribed() でsubscriberの生死をチェックすべきではないかという話です。RxJava official wikiがそうしているから、そうするべきではないかというのが議論の発端ですが、釈然としないので自分でも考えたり検証してみました。

議論の対象となるコードは Selector#executeAsObservable() です。

@NonNull
public Observable<Model> executeAsObservable() {
    return Observable.create(new Observable.OnSubscribe<Model>() {
        @Override
        public void call(final Subscriber<? super Model> subscriber) {
            forEach(new Action1<Model>() {
                @Override
                public void call(Model item) {
                    subscriber.onNext(item);
                }
            });
            subscriber.onCompleted();
        }
    });
}

これをsubscribe()してすぐunsubscribe()すると、subscribe()した側のsubscriberのonNext() などは呼ばれません。つまりこのコードは動いているように見えます。

一方で、この Observable.OnSubscribe#call() が中断されるわけではないので、イベントは送り続けます。その結果、subscribe()する側からみると、subscribe()前のmap() などの関数は依然として呼ばれます。ここが釈然としないところで、unsubscribe() によって OnSubscribe#call() が中断されるものと思っていました。

これが実際に問題になるケースは多くないと思われますが、適切にunsubscribe()をハンドルしないと無駄にCPUやメモリを使うことになるし、HTTP requestなどでは unsubscribe() の時にキャンセルしたいでしょうし、 SQLiteDatabaseも API level 16 から CancellationSignalでクエリのキャンセルを行えるので、それなりの対応は必要と思われます。

というわけで、 https://github.com/gfx/Android-Orma/pull/209 でその対応を行いました。

最初の対応は Cursor#close()Cursor#isClosed() でのハンドリングでしたが:

@NonNull
public Observable<Model> executeAsObservable() {
    return Observable.create(new Observable.OnSubscribe<Model>() {
        @Override
        public void call(final Subscriber<? super Model> subscriber) {
            final Cursor cursor = execute();
            subscriber.add(Subscriptions.create(new Action0() {
                @Override
                public void call() {
                    cursor.close();
                }
            }));
            try {
                for (int pos = 0; cursor.moveToPosition(pos); pos++) {
                    if (cursor.isClosed()) {
                        return;
                    }
                    subscriber.onNext(newModelFromCursor(cursor));
                }
            } finally {
                if (!cursor.isClosed()) {
                    cursor.close();
                }
            }
            if (cursor.isClosed()) {
                return;
            }
            subscriber.onCompleted();
        }
    });
}

複雑すぎてバグを引き起こしやすいので((実際、コードにはバグがあります。finallyのなかで Cursor#close() するので onComplete() が決して呼ばれないのです。)) isUnsubscribed() を使うコードに置き換えました。

@NonNull
public Observable<Model> executeAsObservable() {
    return Observable.create(new Observable.OnSubscribe<Model>() {
        @Override
        public void call(final Subscriber<? super Model> subscriber) {
            final Cursor cursor = execute();
            try {
                for (int pos = 0; !subscriber.isUnsubscribed() && cursor.moveToPosition(pos); pos++) {
                    subscriber.onNext(newModelFromCursor(cursor));
                }
                if (!subscriber.isUnsubscribed()) {
                    subscriber.onCompleted();
                }
            } finally {
                cursor.close();
            }
        }
    });
}

OnSubscribe#call()のなかでunsubscribe() というイベントを得る方法としては、isUnsubscribed()でイベントをpullするよりも Subscriber#add() にコールバックを設定してイベントをpushで受け取るほうが筋がいいと思いますが、今回はシンプルに書ける isUnsubscribed() を採用することにしました。

Orma v2の新機能と今後の展望

Ormaの v2.1をリリースしました。Ormaはセマンティックバージョニングを採用しているので、"v2"は単に"v1"と互換性のない変更を行った、というだけの意味です。とはいえ機能もいくつか追加しているので紹介します。

なお Orma入門はv2.1の内容にアップデートしています。

マイグレーション

v2.0, v2.1でマイグレーションの起動条件が変わりました。以前はリリースビルドとデバッグビルドでマイグレーション起動条件が違っていてデバッグが難しかったのですが、この変更によってリリースビルドとデバッグビルドで処理を同じにできたので、より安定して開発できるはずです。

これに伴い、内部もかなり変わっています。いままではSQLiteOpenHelperに依存していたのですが、v2.1ではもはや内部的にSQLiteOpenHelperは使っていません。OrmaDatabaseはOrmaConnectionにDBのopenを任せ、OrmaConnectionは単に MigrationEngine#start() を呼び出し、 MigrationEngine の実装が実際の処理を全て行います。

なお、マイグレーション起動のロジック(正確にはManualStepMigrationのロジック)の仕様自体はSQLiteOpenHelperが使う SQLiteDatabase#version (実体はSQLiteの PRAMGA user_version)に依存しているため、v1.xから移行する際、または他のSQLiteOpenHelperを使うORMから移行する際も問題にはならないはずです。

また OrmaDatabase#migrate() というメソッドでマイグレーションを明示的に起動できるようになりました。

現状の課題としては、現状はSchemaDiffMigrationでカバーできないスキーマの変更(具体的にはカラムやテーブルのリネーム)がやはりまだ難しいです。「あるカラムの名前が現状よりも古い場合に最新の名前に変える」という動的に状況を確認しつつマイグレーションを行えるようになれば改善すると思われますので、近いうちに実装するつもりです。

has-one関係の直接的な記述(direct associations)

v2.0の目玉機能です。Ormaのモデルクラスは、他のモデルクラスを直接持つことができるようになりました。

droidkaigi/konifar/Session.java がまさにこの機能を使っています。Speaker, Category, Placeが他のOrmaモデルクラスで、見ての通りJavaコード的には普通のクラスとおなじように持っているだけですね。内部的にはSessionテーブルは Speaker#id などのプライマリキーだけを持っていて、SELECT 時にまとめてとってくるようになっています。

これはまだ制限があり、v2.1.0現在は1つのモデルにつき1つだけしか持てません。2つ以上のモデルを持つためには、JOINのための複雑なテーブルエイリアスの管理が必要だからです。

モデルの関連は非常に機能が多いので、今後については不明ですが、一つ一つ実装するしかないかなーと思っています。

その他

ライブラリの新機能というわけではないですが、最近exampleアプリでOrmaが生成するコードをリポジトリにいれるようにしました。

特に *_Schema.java はモデルとSQLiteDatabaseのアダプタとなるクラスで、 Schema#bindArgs()Schema#newModelFromCursor() をみて「自分で書くのと同じだな」と思えるならOrmaを使うほうがいいということになります。

またexample/MainActivity.java にはマイグレーションのデモがあります。