RxJavaのミニマム実装でScheduler周りを書いてみるゾ!

RxJava Advent Calendar 2015 の23日分です。

はい、さっそくですがRxInTheBoxにSchedulerを実装しました。 observeOn()subscribeOn() が本家RxJava同様の振る舞いをします。

add subscribeOn() and observeOn() to RxInTheBox by gfx · Pull Request #2 · gfx/RxInTheBox · GitHub

これは何

半年ほど前に社内勉強会のネタとしてRxJavaのミニマムな実装をしてみました。それがRxInTheBoxで、主な実装が約20行です。

RxJava的なものを最小限に実装してコンポーネントの関係を理解する - Qiita

そして時は流れ、subscribeOn()とobserveOn()の挙動にハマったりしながらなんとかRxJavaをそれなりに使えるかなと思えてきたので、RxInTheBoxにもsubscribeOn()observeOn()を実装してみることにしました。

軽い気持ちで始めたら差分が200行を突破してちょっと驚きましたが、これはsubscribeOn()の実装のために lift()OperatorSchedulerなどなど多くの実装が必要だったからですね。

実装を読む前に

まずこれらのエントリを読むといいと思います。後者は長いので、とりあえず observeOn() に言及しているところだけでもいいかも。

RxInTheBoxのおさらい

まずおさらいですが、 Observble<T>Observable<T>.OnSubscribe を1つ保持しているだけのオブジェクトでした。 そして、 Observable<T>#subscribe(Subscriber<T> subscriber) の中身はただ1行、 onSubscribe.call(subscriber) でしたね。

class Observable<T> {
    // ...

    final OnSubscribe<T> onSubscribe;

    Observable(OnSubscribe<T> onSubscribe) {
        this.onSubscribe = onSubscribe;
    }

    interface OnSubscribe<T> {
        void call(Subscriber<T> subscriber);
    }

    void subscribe(Subscriber<T> subscriber) {
        onSubscribe.call(subscriber);
    }

   // ...
}

subscribeOn()observeOn() の実装

これから subscriber が度々出てきますが、これは様々なラッパーをかましつつ、最終的にはユーザーが #subscribe() に与えたオブジェクトで、つまりは Observable<T> に与えるイベントリスナです。

さて、今回実装したコア部分は Observable#lift(), #subscribeOn(), #observeOn() です。これらの実装は非常に簡単です。

class Observable<T> {
   // ...

    public interface Operator<R, T> {

        Subscriber<T> call(Subscriber<R> value);
    }

    public <R> Observable<R> lift(final Operator<R, T> operator) {
        return new Observable<>(new OnSubscribe<R>() {
            @Override
            public void call(Subscriber<R> subscriber) {
                onSubscribe.call(operator.call(subscriber));
            }
        });
    }

    public Observable<T> subscribeOn(Scheduler scheduler) {
        return Observable.just(this).lift(new OperatorSubscribeOn<T>(scheduler));
    }

    public Observable<T> observeOn(Scheduler scheduler) {
        return this.lift(new OperatorObserveOn<T>(scheduler));
    }
}

class OperatorSubscribeOn<T> implements Observable.Operator<T, Observable<T>> { /* ... */ }
class OperatorObserveOn<T> implements Observable.Operator<T, T> { /* ... */ }

lift() については前述のエントリなどで語られているから省略するとして、どうやら subscribeOn()observeOn() の実体は Operator なのですね。

OperatorSubscribeOn<T>

ここで Operator<引数の型, 戻り値の型> なので、まず OperatorSubscribeOn<T>SubscriberをうけとりSubscriber<Observable>` を返すOperator と考えます。

実装は次のとおり。 worker.schedule(Runnable) は特定のスレッドのもとでRunnableを実行するとしましょう。 observable.subscribe(subscriber) の実体は observable.onSubscribe.call(subscriber) でしたね。

つまり、OperatorSubscribeOn の仕事は、 Observable#subscribe() を特定のスレッドのもとで実行するというものです。

class OperatorSubscribeOn<T> implements Observable.Operator<T, Observable<T>> {

    final Scheduler scheduler;

    public OperatorSubscribeOn(Scheduler scheduler) {
        this.scheduler = scheduler;
    }

    @Override
    public Subscriber<Observable<T>> call(final Subscriber<T> subscriber) {
        final Scheduler.Worker worker = scheduler.createWorker();
        return new Subscriber<Observable<T>>() {
            @Override
            public void onNext(final Observable<T> observable) {
                worker.schedule(new Runnable() {
                    @Override
                    public void run() {
                        observable.subscribe(subscriber);
                    }
                });
            }

            @Override
            public void onComplete() {

            }

            @Override
            public void onError(Throwable e) {
                subscriber.onError(e);
            }
        };
    }
}

OperatorObserveOn

これに対して OperatorObserveOn<T> は次のとおりです。

OperatorSubscribeOn<T> とは対照的に、subscriber の各種リスナである onNext(), onComplete(), onError() のために Worker#schedule() でスレッドを指定しています。つまり、ユーザーが指定したイベントリスナがどのスレッドで実行するかを制御しているというわけです。

class OperatorObserveOn<T> implements Observable.Operator<T, T> {

    final Scheduler scheduler;

    public OperatorObserveOn(Scheduler scheduler) {
        this.scheduler = scheduler;
    }

    @Override
    public Subscriber<T> call(final Subscriber<T> subscriber) {
        final Scheduler.Worker worker = scheduler.createWorker();

        return new Subscriber<T>() {
            @Override
            public void onNext(final T value) {
                worker.schedule(new Runnable() {
                    @Override
                    public void run() {
                        subscriber.onNext(value);
                    }
                });
            }

            @Override
            public void onComplete() {
                worker.schedule(new Runnable() {
                    @Override
                    public void run() {
                        subscriber.onComplete();
                    }
                });
            }

            @Override
            public void onError(final Throwable e) {
                worker.schedule(new Runnable() {
                    @Override
                    public void run() {
                        subscriber.onError(e);
                    }
                });
            }
        };
    }
}

Scheduler

Scheduler は特定のスレッドでタスクを実行するためのインターフェイスです。RxInTheBoxでは、JavaExecutorService ベースのものと、AndroidHandler ベースのものを実装しておきました。長くなるのでコードは紹介しませんが、これらはミニマム実装だと非常に簡単なのでソースコードを参照してみてください。

おわりに

RxInTheBoxはRxJavaの実装を眺めて理解の妨げになる部分を削ったコードなので、RxJava自体の実装にかなり近いものになっています。

しかし実際のRxJavaのコードはこれよりもずっと複雑で、特に OperatorObserveOn はback pressureに対応するために一見して何をしているかわからないコードになってしまっています。

lift(), subscribeOn(), observeOn() はRxJavaの中でも特に難しい部分なので、まずは単純化されたコードをじっくり眺めると理解の一助となるかもしれません。

簡単ですが以上です。