注:本文章代码基于 RxJava:3.0.0-RC4
一、概述
官方描述: RxJava是反应式编程扩展的JVM实现:该库用于通过使用可观察的序列来组成异步和基于事件的程序。
即:RxJava 是一个用反应式编程特性来扩展观察者模式的事件处理库。
二、RxJava的观察者模式
观察者模式即某物件(对象)能够被其它物件所订阅,在被观察者自身状态发生改变时通知所有订阅的观察者,这通常是通过调用观察者的回调方法实现的。 而RxJava 将一组事件抽象为更直观的事件流,被观察者发出的一系列事件都会被各个操作符自上而下一一处理后发送到观察者中,这便是反应式编程特性。
1. 反应式编程其可组合性
RxJava 提供了一系列运算符,用于过滤、选择、转换和合成被观察者,这使得一系列逻辑可以在时间和空间上重新组合。
在 RxJava 之前,我们通常使用 Java Features 之类的异步API用于单个异步操作的执行,但是如果将其嵌套起来使用,就会极大地增加复杂度。而 RxJava 的观察者模式,目的就是组合异步数据的流或序列。我们可以任意组合操作符对被观察者发出的数据流进行修改。
2. 事件源的灵活性
RxJava 不仅支持单个流发射,还支持值序列甚至无限流的发射,对发射的事件进行一系列修改,也表现出了它的可迭代特性。
三、基本实现及原理
1. 创建 Observable
Observable
即被观察者,它决定什么时候触发事件以及触发怎样的事件。 当 Observable
被订阅的时候就会调用 ObservableOnSubscribe#subscribe
方法。在这个方法中,我们通过 ObservableEmitter
发射2次 onNext()
和一次 onComplete()
,当 Observable
被订阅后会依次调用 Observer
的对应回调方法。
ObservableEmitter
: 事件发射器,定义需要发送的事件, 用于向观察者发送事件。
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> emitter) throws Throwable { emitter.onNext("do next 1"); emitter.onNext("do next 2"); emitter.onComplete(); } });
2. 创建 Observer
Observer
即观察者,它决定事件触发的时候将有怎样的行为。
Observer<String> observer = new Observer<String>() { @Override public void onSubscribe(Disposable d) { System.out.println("on subscribe."); } @Override public void onNext(String s) { System.out.println(s); } @Override public void onError(Throwable e) { e.printStackTrace(); } @Override public void onComplete() { System.out.println("on complete."); } };
3. Subscribe
在创建了 Observable
和 Observer
后,就可以将它们绑定,一个最简单的数据流就开始工作了。这里必须要注意的是,绑定后可以在 Observer#onSubscribe()
中获得或是直接返回 Disposable
,必须在特定生命周期通过 Disposable 取消订阅,防止内存泄漏。
observable.subscribe(observer);
4. Lambda
从 RxJava2 开始, RxJava 对 Java 8 lambda API 是非常友好的。结合链式调用,上面的示例代码可以这样实现:
Observable.<String>create(emitter -> { emitter.onNext("do next 1"); emitter.onNext("do next 2"); emitter.onComplete(); }) .doOnSubscribe(d -> System.out.println("on subscribe.")) .doOnNext(System.out::println) .doOnComplete(() -> System.out.println("on complete.")) .doOnError(Throwable::printStackTrace) .subscribe();
注意这里并没有为其绑定 Observer
。而例如 doOnNext()
操作符,是在事件被发射的途中对其进行监听。
5. 基本原理
RxJava 的反应式编程本身是以装饰器模式实现的,每使用一个操作符都会将当前的 Observable
用另一个 Observable
实现类进行包装,而在绑定 Observer
时又会从下到上包装 Observer
,最终在创建操作符中包装为发射器 Emitter
以及用于解绑的 Disposable
,达到增加新功能的目的。
四、线程切换
线程切换的本质也是使用 subscribeOn()/observeOn()
操作符对事件源和观察者进行包装。
@Override
public void subscribeActual(final Observer<? super T> observer) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);
observer.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
@Override
public void run() {
source.subscribe(parent);
}
subscribeOn
指定 Observable
在哪种线程进行绑定以及发射事件。ObservableSubscribeOn
在 subscribeActual()
中调用线程调度器 Scheduler
的 scheduleDirect()
方法进行线程切换。实质是将 Observer
包装为 SubscribeTask
,然后将其放入对应线程池执行, SubscribeTask
实现了 Runnable
接口,在 run()
方法中调用了 Observable
的 subscribe()
,所以最终事件源会在对应线程执行。
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
@Override public void run() { if (outputFused) { drainFused(); } else { drainNormal(); } } void drainNormal() { int missed = 1; final SimpleQueue<T> q = queue; final Observer<? super T> a = downstream; for (;;) { if (checkTerminated(done, q.isEmpty(), a)) { return; } for (;;) { boolean d = done; T v; try { v = q.poll(); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); disposed = true; upstream.dispose(); q.clear(); a.onError(ex); worker.dispose(); return; } boolean empty = v == null; if (checkTerminated(d, empty, a)) { return; } if (empty) { break; } a.onNext(v); } missed = addAndGet(-missed); if (missed == 0) { break; } } }
observeOn
同样是给 Observer
包装上 Runnable
接口,但它是在 Observer
中持有线程调度器 Scheduler
以及一个默认长度为128位的队列。当它从上游接收到事件时会将事件存入队列,并使事件计数器+1,然后使用线程调度器执行自身的 run()
方法,run()
方法中使用了两个死循环,外部循环用来判断观察者是否与被观察者解绑或是 onError/onComplete
,如有则跳出循环,内部循环尝试从队列中取出事件,每次提取都使计数器-1,当计数器为0时表示队列中所有事件处理完毕,跳出内部循环。其实就是ObserveOnObserver
接收到上游消息后进行线程调度,然后再到对应线程向下发送消息。
五、背压
RxJava 既然能够在线程间切换,自然就会出现被观察者发出消息的速度和观察者消费事件的速度不一致的问题,因此 observeOn()
中会为观察者创建一个队列。但是默认的队列可以无限扩容,这可能导致OOM或者达不到我们想要的效果。因此 RxJava 提供了 Flowable
被观察者,我们能为其设置缓冲区长度和背压策略。
/**
* 背压
*/
private void backPressure() {
// 存放观察者接收到的事件
mBackPressureReceive = Collections.synchronizedList(new ArrayList<>());
Disposable intervalDisposable = Observable.interval(40, TimeUnit.MILLISECONDS)
.subscribe(aLong -> {
// 当 Flowable 绑定后开始每隔 40ms 请求一次值
if (mBackPressureSubscription != null) {
mBackPressureSubscription.request(1);
}
});
Flowable.create((FlowableOnSubscribe<String>) emitter -> {
// 发射256个事件
for (int i = 0; i < 256; i++) {
emitter.onNext(String.valueOf(i));
Thread.sleep(10);
}
emitter.onComplete();
// 抛弃超出缓冲区的事件
}, BackpressureStrategy.DROP)
.subscribeOn(Schedulers.computation())
// 为了使现象更明显,设置较小的缓冲区
.observeOn(Schedulers.computation(), false, 16)
.subscribe(new Subscriber<String>() {
@Override
public void onSubscribe(Subscription s) {
mBackPressureSubscription = s;
}
@Override
public void onNext(String s) {
mBackPressureReceive.add(s);
}
@Override
public void onError(Throwable t) {
Single.create(emitter -> log("BackPressure " + mBackPressureReceive.size()))
.subscribeOn(AndroidSchedulers.mainThread())
.subscribe();
intervalDisposable.dispose();
mBackPressureSubscription.cancel();
}
@Override
public void onComplete() {
Single.create(emitter -> log("BackPressure " + mBackPressureReceive.size()))
.subscribeOn(AndroidSchedulers.mainThread())
.subscribe();
intervalDisposable.dispose();
mBackPressureSubscription.cancel();
}
});
}
Subscriber
有别于一般的 Observer
,在使用 Flowable
的情况下,Flowable
不会主动发出事件,但是我们可以主动调用 Subscriber#request()
来向 Flowable
的缓冲区请求事件。 但如果我们绑定的是 Consumer
则会自动调用 request(Long.MAX_VALUE)
来请求所有缓冲区内容。同样的,请求多少事件也是通过计数器加死循环实现的。
六、创建操作符
1. 功能
用于创建 Observable
对象及发射事件。
2. 目录
Create
, Defer
, Empty
/Never
/Throw
, From
, Interval
, Just
, Range
, Repeat
,
, StartTimer
3. 介绍
注:仅选择部分方法 Class#Method()
作为操作符的例子
Create
Observable#create()
用于创建完整的 Observable
。
Defer
Observable#defer()
在 Observer
订阅之前不会创建 Observable
,当 Observer
订阅后才会为每一个订阅的 Observer
创建一个新的 Observable
该方法接收一个 Supplier
对象,每次绑定产生的新 Observable
都是由 Supplier
提供的。
注:RxJava2 中 defer()
接收 java.util.concurrent.Callable,两者功能上没有任何差别
示例代码:
private int createTimes = 0; @Test public void deferExample() { Observable<String> create = Observable.just(createTimes++ + " "); Observable<String> defer = Observable.defer(() -> Observable.just(createTimes++ + " ")); create.subscribe(System.out::print); create.subscribe(System.out::print); create.subscribe(System.out::print); System.out.println(); defer.subscribe(System.out::print); defer.subscribe(System.out::print); defer.subscribe(System.out::print); } 输出: 0 0 0 1 2 3
通过运行这段代码可以体现 defer()
和其它创建方式的差别。defer 对象的每次订阅都会使 createTimes 自增一次,即每次订阅都调用了 Observable.just()
,defer 绑定的 Consumer
所接收的 Observable
是在订阅时才创建的,而上面三个 Consumer
所绑定的是同一个 Observable
。
Empty
Observable#empty()
直接发送 onComplete()
事件。
Never
Observable#never()
订阅后不会发送任何事件。
Throw
Observable#error()
当接收 Throwable
时仅能发射一次 onError()
, 接收 Supplier
时能够发射多个。
From
将其它对象和数据类型转换为 Observable
。
Observable#fromFuture()
接收一个 java.util.concurrent.Future
,等待 Future
完成后返回结果。
Observable#fromPublisher()
将 Publisher
流转为 Observable
, 这会导致背压失效,源不再受任何限制,有关背压会在后面讲解。
Observable#fromArray() / fromIterable()
遍历数组、可迭代对象,通过 onNext()
发射每一个对象后 onComplete()
。
Interval
Observable#interval()
发出以给定时间间隔的整数序列。
Just
Observable#just()
按顺序发射10个以内对象,传递的参数将直接在 onNext()
方法中接收到。 其中2个以上参数方法的内部直接调用了 fromArray()
。
Range
Observable#range()
发出给定范围内的连续整数。
Repeat
Observable#repeat()
复读 重复调用 subscribe
,直到停止时发出 onComplete()
。
示例代码:
Observable.create(emitter -> { emitter.onNext("Next1 "); emitter.onNext("Next2 "); emitter.onNext("Next3 "); emitter.onComplete(); }) .repeat(2) .doOnNext(System.out::print) .doOnComplete(() -> System.out.println("\nonComplete")) .subscribe(); 输出: Next1 Next2 Next3 Next1 Next2 Next3 onComplete
Start
Observable#fromCallable()
发送函数的返回值。
Timer
Observable#timer()
计时器,在给定延迟后发出特定值。
七、变换操作符
1. 功能
转换 Observable
发出的 items。
2. 目录
Buffer
, FlatMap
, GroupBy
, Map
, Scan
, Window
3. 介绍
Buffer
Observable#range
将一定数量 Observable
发出的对象打包为 List
,然后发射。
示例代码:
Observable.range(0, 10) //每次取3个,从第二次开始每次跳过2个 .buffer(3, 2) .subscribe(list -> System.out.println("缓冲区剩余数量:" + list.size() + " " + list.toString())); 输出: 缓冲区剩余数量:3 [0, 1, 2] 缓冲区剩余数量:3 [2, 3, 4] 缓冲区剩余数量:3 [4, 5, 6] 缓冲区剩余数量:3 [6, 7, 8] 缓冲区剩余数量:2 [8, 9]
FlatMap
Observable#flatMap()
将 Observable
发出的对象转换为 Observable
,然后再组合成一个新的 Observable
。
示例代码:
Observable.<Integer>create(emitter -> { emitter.onNext(0); emitter.onNext(1); emitter.onNext(2); }) .flatMap(i -> Observable.just(" repeat" + i, " repeat" + i)) .subscribe(System.out::print); 输出: repeat0 repeat0 repeat1 repeat1 repeat2 repeat2
GroupBy
Observable#groupBy()
将一个 Observable
划分为一组 Observable
,它们分别发射不同的原始 Observable
的子集。
Observable.range(0, 20) .groupBy(num -> { if (num < 10) { return "第一组"; } else if (num < 15) { return "第二组"; } else { return "第三组"; } }) .subscribe(groupedObservable -> { List<Integer> list = new ArrayList<>(); groupedObservable .doOnComplete(() -> { String elements = list.toString(); System.out.println(groupedObservable.getKey() + elements); }) .subscribe(list::add); }); 输出: 第一组[0, 1, 2, 3, 4, 5, 6, 7, 8, 9] 第二组[10, 11, 12, 13, 14] 第三组[15, 16, 17, 18, 19]
Map
Observable#map()
将每一个 Observable
发射的事件通过函数转换为新的事件
示例代码:
Observable.range(1, 9) .map(item -> item * 10) .subscribe(mapped -> System.out.print(mapped + " ")); 输出: 10 20 30 40 50 60 70 80 90
Scan
Observable#scan()
按照顺序为每一个发射的事件调用一次方法并返回结果。第一个接收到的原始发射不参与方法直接发射。
Observable.range(1, 9) //第一个参数为上一次的结果,第二个参数为该次发射的值 .scan((lastResult, item) -> lastResult * item) .subscribe(System.out::println); 输出: 1 2 6 24 120 720 5040 40320 362880
Window
将 Observable
发射的对象细分为多个 ObservableWindow
,而不是将它们一次全部发射出来。
Window
类似于 Buffer
,但是它不是从源Observable发出事件包(List
),而是发出 Observable
,每个 Observable
都从源 Observable
发出事件的子集,然后以onCompleted()
通知终止。
示例代码:
Consumer<Integer> consumer1 = integer -> System.out.println("Consumer1: " + integer); Consumer<Integer> consumer2 = integer -> System.out.println("Consumer2: " + integer); Consumer<Integer> consumer3 = integer -> System.out.println("Consumer3: " + integer); AtomicInteger aInteger = new AtomicInteger(0); Observable.range(1, 8) .window(3) .subscribe(integerObservable -> { int get = aInteger.getAndIncrement(); if (get == 0) { integerObservable.subscribe(consumer1); } else if (get == 1) { integerObservable.subscribe(consumer2); } else { integerObservable.subscribe(consumer3); } }); 输出: Consumer1: 1 Consumer1: 2 Consumer1: 3 Consumer2: 4 Consumer2: 5 Consumer2: 6 Consumer3: 7 Consumer3: 8
八、过滤操作符
1. 功能
过滤 Observable
源发送的事件。
2. 目录
Debounce
, Distinct
, ElementAt
, Filter
, First
/ Last
, IgnoreElements
, Sample
, Skip
, SkipLast
, Take
/TakeLast
3. 介绍
Debounce
去抖动,如果两次事件时间间隔小于指定时间,丢弃前一次数据。
示例代码:
Observable.<Integer>create(emitter -> { emitter.onNext(1); Thread.sleep(900); emitter.onNext(2); Thread.sleep(2000); emitter.onNext(3); Thread.sleep(200); emitter.onNext(4); }) .debounce(1000, TimeUnit.MILLISECONDS, Schedulers.computation()) .subscribe(System.out::println); 输出: 2
Distinct
丢弃事件流中的重复项。
ElementAt
指定发射某个索引值的元素。
Filter
过滤特定条件的事件。
示例代码:
Observable.just(1, 2, 1, 3, 4) .filter(integer -> integer < 3) .subscribe(System.out::println); 输出: 1 2 1
First / Last
仅发射 第一个 / 最后一个 事件。
IgnoreElements
忽略所有元素,直到 onComplete() / onError()
。
Sample
发射该段时间内最新的事件,在两次发射之间多次取样也只会取到一个。
示例代码:
Observable.create(emitter -> { for(int i = 0; i < 5; i++) { emitter.onNext(i); Thread.sleep(500); } }) .sample(800, TimeUnit.MILLISECONDS) .subscribe(System.out::println); 输出: 1 3 4
Skip / SkipLast
跳过 前面 / 后面 发出的 n 个事件
Take / TakeLast
只发射 前 / 后 n 个事件。
九、组合操作符
1. 功能
将多个源 Observable
组合创建单个 Observable
。
2. 目录
CombineLatest
, Join
, Merge
, StartWith
, Switch
, Zip
3. 介绍
CombineLatest
当两个 Observable
中的任何一个发射数据时,使用一个函数结合每个 Observable
发射的最近数据项,并且基于这个函数的结果发射数据。
Observable<Integer> first = Observable.<Integer>create(emitter -> { for (int i = 0; i < 5; i++) { emitter.onNext(i); Thread.sleep(500); } }).subscribeOn(Schedulers.computation()); Observable<String> second = Observable.<String>create(emitter -> { for (int i = 0; i < 7; i++) { emitter.onNext(String.valueOf(i)); Thread.sleep(800); } }).subscribeOn(Schedulers.computation()); Observable.combineLatest(first, second, (o1, o2) -> o1 + o2) .subscribe(System.out::println); 输出: 00 10 11 21 31 32 42 43 44 45 46
//TODO ……