
在这篇文章中,我们会先分析一下 RxJava2 中的 Subject;然后,我们会使用 Subject 制作一个类似于 EventBus 的全局的通信工具。在了解本篇文章的内容之前,你需要先了解 RxJava2 中的一些基本的用法,比如 Observable 以及背压的概念。
1、Subject
1.1 Subject 的两个特性Subject 可以同时代表 Observer 和 Observable,允许从数据源中多次发送结果给多个观察者。除了 onSubscribe()、onNext()、onError() 和 onComplete() 之外,所有的方法都是线程安全的。此外,你还可以使用 toSerialized() 方法,也就是转换成串行的,将这些方法设置成线程安全的。
如果你已经了解了 Observable 和 Observer ,那么也许直接看 Subject 的源码定义会更容易理解:
public abstract class Subjectextends Observable implements Observer { // ... } 复制代码
从上面看出,Subject 同时继承了 Observable 和 Observer 两个接口,说明它既是被观察的对象,同时又是观察对象,也就是可以生产、可以消费、也可以自己生产自己消费。所以,我们可以项下面这样来使用它。这里我们用到的是该接口的一个实现 PublishSubject :
public static void main(String...args) {
PublishSubject subject = PublishSubject.create();
subject.subscribe(System.out::println);
Executor executor = Executors.newFixedThreadPool(5);
Disposable disposable = Observable.range(1, 5).subscribe(i ->
executor.execute(() -> {
try {
Thread.sleep(i * 200);
subject.onNext(i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}));
}
复制代码
根据程序的执行结果,程序在第 200, 400, 600, 800, 1000 毫秒依次输出了 1 到 5 的数字。
在这里,我们用 PublishSubject 创建了一个主题并对其监听,然后在线程当中又通知该主题内容变化,整个过程我们都只操作了 PublishSubject 一个对象。显然,使用 Subject 我们可以达到对一个指定类型的值的结果进行监听的目的——我们把值改变之后对应的逻辑写在 subscribe() 方法中,然后每次调用 onNext() 等方法通知结果之后就可以自动调用 subscribe() 方法进行更新操作。
同时,因为 Subject 实现了 Observer 接口,并且在 Observable 等的 subscribe() 方法中存在一个以 Observer 作为参数的方法(如下),所以,Subject 也是可以作为消费者来对事件进行消费的。
public final void subscribe(Observer super T> observer) 复制代码
以上就是 Subject 的两个主要的特性。
1.2 Subject 的实现类在 RxJava2 ,Subject 有几个默认的实现,下面我们对它们之间的区别做简单的说明:
对比 PublishSubject 和 ReplaySubject,它们的区别在于新注册的 Observer 是否能够收到在它注册之前发送的事件。这个类似于 EventBus 中的 StickyEvent 即黏性事件,为了说明这一点,我们准备了下面两段代码:
private static void testPublishSubject() throws InterruptedException {
PublishSubject subject = PublishSubject.create();
subject.subscribe(i -> System.out.print("(1: " + i + ") "));
Executor executor = Executors.newFixedThreadPool(5);
Disposable disposable = Observable.range(1, 5).subscribe(i -> executor.execute(() -> {
try {
Thread.sleep(i * 200);
subject.onNext(i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}));
Thread.sleep(500);
subject.subscribe(i -> System.out.print("(2: " + i + ") "));
Observable.timer(2, TimeUnit.SECONDS).subscribe(i -> ((ExecutorService) executor).shutdown());
}
private static void testReplaySubject() throws InterruptedException {
ReplaySubject subject = ReplaySubject.create();
subject.subscribe(i -> System.out.print("(1: " + i + ") "));
Executor executor = Executors.newFixedThreadPool(5);
Disposable disposable = Observable.range(1, 5).subscribe(i -> executor.execute(() -> {
try {
Thread.sleep(i * 200);
subject.onNext(i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}));
Thread.sleep(500);
subject.subscribe(i -> System.out.print("(2: " + i + ") "));
Observable.timer(2, TimeUnit.SECONDS).subscribe(i -> ((ExecutorService) executor).shutdown());
}
复制代码
它们的输出结果依次是
从上面的结果对比中,我们可以看出前者与后者的区别在于新注册的 Observer 并没有收到在它注册之前发送的事件。试验的结果与上面的叙述是一致的。
其他的测试代码这不一并给出了,详细的代码可以参考 Github - Java Advanced。
2、用 RxJava 打造 EventBus 2.1 打造 EventBus清楚了 Subject 的概念之后,让我们来做一个实践——用 RxJava 打造 EventBus。
我们先考虑用一个全局的 PublishSubject 来解决这个问题,当然,这意味着我们发送的事件不是黏性事件。不过,没关系,只要这种实现方式搞懂了,用 ReplaySubject 做一个发送黏性事件的 EventBus 也非难事。
考虑一下,如果要实现这个功能我们需要做哪些准备:
好了,首先是全局的 Subject 的问题,我们可以实现一个静态的或者单例的 Subject。这里我们选择使用后者,所以,我们需要一个单例的方式来使用 Subject:
public class RxBus {
private static volatile RxBus rxBus;
private final Subject
这里我们应用了 DCL 的单例模式提供一个单例的 RxBus,对应一个唯一的 Subject. 这里我们用到了 Subject 的toSerialized(),我们上面已经提到过它的作用,就是用来保证 onNext() 等方法的线程安全性。
另外,因为 Observalbe 本身是不支持背压的,所以,我们还需要将该 Observable 转换成 Flowable 来实现背压的效果:
publicFlowable getObservable(Class type){ return subject.toFlowable(BackpressureStrategy.BUFFER).ofType(type); } 复制代码
这里我们用到的背压的策略是 BackpressureStrategy.BUFFER,它会缓存发射结果,直到有消费者订阅了它。而这里的 ofType() 方法的作用是用来过滤发射的事件的类型,只有指定类型的事件会被发布。
然后,我们需要记录订阅者的信息以便在适当的时机取消订阅,这里我们用一个 Map
public void addSubscription(Object o, Disposable disposable) {
String key = o.getClass().getName();
if (disposableMap.get(key) != null) {
disposableMap.get(key).add(disposable);
} else {
CompositeDisposable disposables = new CompositeDisposable();
disposables.add(disposable);
disposableMap.put(key, disposables);
}
}
public void unSubscribe(Object o) {
String key = o.getClass().getName();
if (!disposableMap.containsKey(key)) {
return;
}
if (disposableMap.get(key) != null) {
disposableMap.get(key).dispose();
}
disposableMap.remove(key);
}
复制代码
最后,对外提供一下 Subject 的订阅和发布方法,整个 EventBus 就制作完成了:
public void post(Object o){
subject.onNext(o);
}
public Disposable doSubscribe(Class type, Consumer next, Consumer error){
return getObservable(type)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(next,error);
}
复制代码
2.2 测试效果
我们只需要在最顶层的 Activity 基类中加入如下的代码。这样,我们就不需要在各个 Activity 中取消注册了。然后,就可以使用这些顶层的方法来进行操作了。
protected void postEvent(Object object) {
RxBus.getRxBus().post(object);
}
protected void addSubscription(Class eventType, Consumer action) {
Disposable disposable = RxBus.getRxBus().doSubscribe(eventType, action, LogUtils::d);
RxBus.getRxBus().addSubscription(this, disposable);
}
protected void addSubscription(Class eventType, Consumer action, Consumer error) {
Disposable disposable = RxBus.getRxBus().doSubscribe(eventType, action, error);
RxBus.getRxBus().addSubscription(this, disposable);
}
@Override
protected void onDestroy() {
super.onDestroy();
RxBus.getRxBus().unSubscribe(this);
}
复制代码
在第一个 Activity 中我们对指定的类型的结果进行监听:
addSubscription(RxMessage.class, rxMessage -> ToastUtils.makeToast(rxMessage.message)); 复制代码
然后,我们在另一个 Activity 中发布事件:
postEvent(new RxMessage("Hello world!"));
复制代码
这样当第二个 Activity 中调用指定的发送事件的方法之后,第一个 Activity 就可以接收到发射的事件了。
总结好了,以上就是 Subject 的使用,如果要用一个词来形容它的话,那么只能是“自给自足”了。就是说,它同时做了 Observable 和 Observer 的工作,既可以发射事件又可以对事件进行消费,可谓身兼数职。它在那种想要对某个值进行监听并处理的情形特别有用。因为它不需要你写多个冗余的类,只要它一个就完成了其他两个类来完成的任务,因而代码更加简洁。
以上,感谢阅读~