Subscribed unsubscribe Subscribe Subscribe

Islands in the byte stream

Technical notes by a software engineer

RxJavaのsubscribeOn()とobserveOn()を使いこなしたい

RxJava Advent Calendar 2015 の 12月12日分です。

RxJavaのSchedulersは、RxJavaのコールバックの実行スレッドを制御するためのコンポーネントです。

恥ずかしながら、最近まで subscribeOn()observeOn() の使い方を理解していませんでした。よって本稿では、 subscribeOn()observeOn() の現状の私の理解したところを書きます。

SchedulersとsubscribeOn() / observeOn()

RxJavaのSchedulersまわり一番難しいのは subscribeOn()observeOn() がどう違うのか、という点だと思います。

これは実用的には以下のように考えるとよいかと思います。

  • subscribeOn()Observable.OnSubscribe#call() のスケジューラを決める
  • observeOn() は指定した箇所以降のコールバックのスケジューラを決める

つまり Observable.create(OnSubscribe ) でobservableをつくるとき、以下のようにすると重い処理をバックグラウンドスレッドで実行しつつ計算結果をメインスレッドで受け取ることができるというわけです。

Observable<T> observable = Observable.create(subscriber -> {
    // ここは Schedulers.computation()
    // なにか重い処理を行う
    subscriber.onNext(計算結果);
    subscriber.onComplete();
})
.subscribeOn(Schedulers.computation())
.observeOn(AndroidSchedulers.mainThread());

observable.subscribe(result -> {
    // ここは AndroidSchedulers.mainThread()
    // 計算結果を使う
});

実際に使っている例は Android-Orma/example/BenchmarkActivity.java など 。このクラスのL148からは、重い処理を行いそれをメインスレッドでArrayAdapterに反映させる、というのを繰り返し行う複雑なオペレータチェインを行っており、うまく subscribeOn()observeOn() を組み合わせて実現しています。

ここまで踏まえてオフィシャルのsubscribeOn()の図をみるとよくわかります。

図の中の▶はどのScheduler(スレッド)で実行するかを示す色分けです。

subscribeOn(▶) はオペレータチェインのどの位置で行っても常にオペレータチェインの根本に影響し、observeOn(▶) はオペレータチェインで指定した次のオペレータに影響する、と読めますね。

Kobito.A20RvB.png

以上が私の現状の理解です。しかしここまで分かってから公式ドキュメントを読みなおしてもやっぱりよく分からないのでした。