CompositeSubscriptionは複数のSubscriptionをまとめてunsubscribeするためのクラスです。
これは、Grokking RxJava, Part 4: Reactive Android でも触れられているように一度 unsubscribe()
すると再利用できません。つまり、Androidのコンポーネントのように、一つのコントローラが何度もpause/resumeするクラスとは相性がよくないという問題があります。
たとえば、以下のようなactivityを考えてみます。 Activity#onResume()
で Observable#subscribe()
し、 Activity#onPause()
で CompositeSubscription#unsubscribe()
するのは一見動きそうですが、間違いです。一度activityをsuspend & resumeすると、続くイベントを受け取りません*1。これは、CompositeSubscriptionは一度unsubscribeしたあとは、以降 add(subscription)
したsubscriptionに対して即座に subscription.unsubscribe()
を呼び出すからです。そしてそのunsubscribeしたというフラグをクリアすることはできません。これが、CompositeSubscriptionを再利用できないという意味です。
あまりにも間違えやすい仕様なのでRxJavaのissuesでここの振る舞いについて聞いたところ*2、observableは非同期で動くので、オブジェクトの状態管理を正しく行うことはできず、CompositeSubscriptionを使用するたびに都度生成するのが正しいとのことでした。
// これは正しく動作しないコードです!使わないように! import android.os.Bundle; import android.support.v7.app.AppCompatActivity; import android.util.Log; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import rx.functions.Action1; import rx.subjects.PublishSubject; import rx.subscriptions.CompositeSubscription; public class MainActivity extends AppCompatActivity { final PublishSubject<String> eventPublishSubject = PublishSubject.create(); final CompositeSubscription subscription = new CompositeSubscription(); @Override protected void onCreate(Bundle savedInstanceState) { Log.d("XXX", "onCreate"); super.onCreate(savedInstanceState); setContentView(R.layout.activity_main); ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); scheduler.scheduleAtFixedRate(new Runnable() { @Override public void run() { eventPublishSubject.onNext(String.valueOf(System.currentTimeMillis())); } }, 1, 1, TimeUnit.SECONDS); } @Override protected void onResume() { super.onResume(); Log.d("XXX", "onResume; unsubscribed=" + subscription.isUnsubscribed()); subscription.add(eventPublishSubject .subscribe(new Action1<String>() { @Override public void call(String event) { Log.d("XXX", event); } })); } @Override protected void onPause() { super.onPause(); subscription.unsubscribe(); Log.d("XXX", "onPause; unsubscribed=" + subscription.isUnsubscribed()); } }
一枚ラッパーをかませば再利用可能なCompositeSubscriptionをつくるのは簡単なので、Androidアプリではそのようにしたほうがいいかもしれません。
import android.support.annotation.NonNull; import rx.Subscription; import rx.subscriptions.CompositeSubscription; public class ReusableCompositeSubscription { CompositeSubscription compositeSubscription; public void add(@NonNull Subscription subscription) { if (compositeSubscription == null) { compositeSubscription = new CompositeSubscription(); } compositeSubscription.add(subscription); } public void unsubscribe() { if (compositeSubscription != null) { compositeSubscription.unsubscribe(); compositeSubscription = null; } } }
*1:ただし、開発者メニューの「アクティビティを保持しない」が有効になっていると正しく動きます。それは、このオプションによりAndroidの通常のライフサイクルが無効化されるからです。
*2:Why doesn't CompositeSubscription have a way to re-initialize its instance? · Issue #2959 · ReactiveX/RxJava