Islands in the byte stream

Technical notes by a software engineer

CompositeSubscriptionは再利用できない #RxJava

CompositeSubscriptionは複数のSubscriptionをまとめてunsubscribeするためのクラスです。

これは、Grokking RxJava, Part 4: Reactive Android でも触れられているように一度 unsubscribe() すると再利用できません。つまり、Androidコンポーネントのように、一つのコントローラが何度もpause/resumeするクラスとは相性がよくないという問題があります。

たとえば、以下のようなactivityを考えてみます。 Activity#onResume()Observable#subscribe() し、 Activity#onPause()CompositeSubscription#unsubscribe() するのは一見動きそうですが、間違いです。一度activityをsuspend & resumeすると、続くイベントを受け取りません*1。これは、CompositeSubscriptionは一度unsubscribeしたあとは、以降 add(subscription) したsubscriptionに対して即座に subscription.unsubscribe() を呼び出すからです。そしてそのunsubscribeしたというフラグをクリアすることはできません。これが、CompositeSubscriptionを再利用できないという意味です。

あまりにも間違えやすい仕様なのでRxJavaのissuesでここの振る舞いについて聞いたところ*2、observableは非同期で動くので、オブジェクトの状態管理を正しく行うことはできず、CompositeSubscriptionを使用するたびに都度生成するのが正しいとのことでした。

// これは正しく動作しないコードです!使わないように!
import android.os.Bundle;
import android.support.v7.app.AppCompatActivity;
import android.util.Log;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import rx.functions.Action1;
import rx.subjects.PublishSubject;
import rx.subscriptions.CompositeSubscription;

public class MainActivity extends AppCompatActivity {

    final PublishSubject<String> eventPublishSubject = PublishSubject.create();

    final CompositeSubscription subscription = new CompositeSubscription();

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        Log.d("XXX", "onCreate");
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);

        ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
        scheduler.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                eventPublishSubject.onNext(String.valueOf(System.currentTimeMillis()));
            }
        }, 1, 1, TimeUnit.SECONDS);
    }

    @Override
    protected void onResume() {
        super.onResume();

        Log.d("XXX", "onResume; unsubscribed=" + subscription.isUnsubscribed());
        subscription.add(eventPublishSubject
                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String event) {
                        Log.d("XXX", event);
                    }
                }));
    }

    @Override
    protected void onPause() {
        super.onPause();

        subscription.unsubscribe();
        Log.d("XXX", "onPause; unsubscribed=" + subscription.isUnsubscribed());
    }
}

一枚ラッパーをかませば再利用可能なCompositeSubscriptionをつくるのは簡単なので、Androidアプリではそのようにしたほうがいいかもしれません。

import android.support.annotation.NonNull;

import rx.Subscription;
import rx.subscriptions.CompositeSubscription;

public class ReusableCompositeSubscription {

    CompositeSubscription compositeSubscription;

    public void add(@NonNull Subscription subscription) {
        if (compositeSubscription == null) {
            compositeSubscription = new CompositeSubscription();
        }
        compositeSubscription.add(subscription);
    }

    public void unsubscribe() {
        if (compositeSubscription != null) {
            compositeSubscription.unsubscribe();
            compositeSubscription = null;
        }
    }
}

*1:ただし、開発者メニューの「アクティビティを保持しない」が有効になっていると正しく動きます。それは、このオプションによりAndroidの通常のライフサイクルが無効化されるからです。

*2:Why doesn't CompositeSubscription have a way to re-initialize its instance? · Issue #2959 · ReactiveX/RxJava