RxJava简析

注:本文章代码基于 RxJava:3.0.0-RC4

一、概述

官方描述: RxJava是反应式编程扩展的JVM实现:该库用于通过使用可观察的序列来组成异步和基于事件的程序。

即:RxJava 是一个用反应式编程特性来扩展观察者模式的事件处理库。

ReactiveX官网的流概念图 通过去抖实现可控的事件触发

二、RxJava的观察者模式

观察者模式即某物件(对象)能够被其它物件所订阅,在被观察者自身状态发生改变时通知所有订阅的观察者,这通常是通过调用观察者的回调方法实现的。 而RxJava 将一组事件抽象为更直观的事件流,被观察者发出的一系列事件都会被各个操作符自上而下一一处理后发送到观察者中,这便是反应式编程特性。

1. 反应式编程其可组合性

RxJava 提供了一系列运算符,用于过滤、选择、转换和合成被观察者,这使得一系列逻辑可以在时间和空间上重新组合。

在 RxJava 之前,我们通常使用 Java Features 之类的异步API用于单个异步操作的执行,但是如果将其嵌套起来使用,就会极大地增加复杂度。而 RxJava 的观察者模式,目的就是组合异步数据的流或序列。我们可以任意组合操作符对被观察者发出的数据流进行修改。

2. 事件源的灵活性

RxJava 不仅支持单个流发射,还支持值序列甚至无限流的发射,对发射的事件进行一系列修改,也表现出了它的可迭代特性。

三、基本实现及原理

1. 创建 Observable

Observable 即被观察者,它决定什么时候触发事件以及触发怎样的事件。 当 Observable 被订阅的时候就会调用 ObservableOnSubscribe#subscribe 方法。在这个方法中,我们通过 ObservableEmitter 发射2次 onNext() 和一次 onComplete() ,当 Observable 被订阅后会依次调用 Observer 的对应回调方法。

ObservableEmitter: 事件发射器,定义需要发送的事件, 用于向观察者发送事件。

Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(ObservableEmitter<String> emitter) throws Throwable {
        emitter.onNext("do next 1");
        emitter.onNext("do next 2");
        emitter.onComplete();
    }
});

2. 创建 Observer

Observer 即观察者,它决定事件触发的时候将有怎样的行为。

Observer<String> observer = new Observer<String>() {
    @Override
    public void onSubscribe(Disposable d) {
        System.out.println("on subscribe.");
    }

    @Override
    public void onNext(String s) {
        System.out.println(s);
    }

    @Override
    public void onError(Throwable e) {
        e.printStackTrace();
    }

    @Override
    public void onComplete() {
        System.out.println("on complete.");
    }
};

3. Subscribe

在创建了 ObservableObserver 后,就可以将它们绑定,一个最简单的数据流就开始工作了。这里必须要注意的是,绑定后可以在 Observer#onSubscribe() 中获得或是直接返回 Disposable,必须在特定生命周期通过 Disposable 取消订阅,防止内存泄漏。

observable.subscribe(observer);

4. Lambda

从 RxJava2 开始, RxJava 对 Java 8 lambda API 是非常友好的。结合链式调用,上面的示例代码可以这样实现:

Observable.<String>create(emitter -> {
    emitter.onNext("do next 1");
    emitter.onNext("do next 2");
    emitter.onComplete();
})
        .doOnSubscribe(d -> System.out.println("on subscribe."))
        .doOnNext(System.out::println)
        .doOnComplete(() -> System.out.println("on complete."))
        .doOnError(Throwable::printStackTrace)
        .subscribe();

注意这里并没有为其绑定 Observer。而例如 doOnNext() 操作符,是在事件被发射的途中对其进行监听。

5. 基本原理

RxJava 的反应式编程本身是以装饰器模式实现的,每使用一个操作符都会将当前的 Observable 用另一个 Observable 实现类进行包装,而在绑定 Observer 时又会从下到上包装 Observer,最终在创建操作符中包装为发射器 Emitter 以及用于解绑的 Disposable,达到增加新功能的目的。

四、线程切换

线程切换的本质也是使用 subscribeOn()/observeOn() 操作符对事件源和观察者进行包装。

@Override
public void subscribeActual(final Observer<? super T> observer) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);

observer.onSubscribe(parent);

parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
@Override
public void run() {
source.subscribe(parent);
}

subscribeOn 指定 Observable 在哪种线程进行绑定以及发射事件。ObservableSubscribeOnsubscribeActual() 中调用线程调度器 SchedulerscheduleDirect() 方法进行线程切换。实质是将 Observer 包装为 SubscribeTask ,然后将其放入对应线程池执行, SubscribeTask 实现了 Runnable 接口,在 run() 方法中调用了 Observablesubscribe(),所以最终事件源会在对应线程执行。

@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();

source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
@Override
public void run() {
    if (outputFused) {
        drainFused();
    } else {
        drainNormal();
    }
}

void drainNormal() {
    int missed = 1;

    final SimpleQueue<T> q = queue;
    final Observer<? super T> a = downstream;

    for (;;) {
        if (checkTerminated(done, q.isEmpty(), a)) {
            return;
        }

        for (;;) {
            boolean d = done;
            T v;

            try {
                v = q.poll();
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                disposed = true;
                upstream.dispose();
                q.clear();
                a.onError(ex);
                worker.dispose();
                return;
            }
            boolean empty = v == null;

            if (checkTerminated(d, empty, a)) {
                return;
            }

            if (empty) {
                break;
            }

            a.onNext(v);
        }

        missed = addAndGet(-missed);
        if (missed == 0) {
            break;
        }
    }
}

observeOn 同样是给 Observer 包装上 Runnable 接口,但它是在 Observer 中持有线程调度器 Scheduler 以及一个默认长度为128位的队列。当它从上游接收到事件时会将事件存入队列,并使事件计数器+1,然后使用线程调度器执行自身的 run() 方法,run() 方法中使用了两个死循环,外部循环用来判断观察者是否与被观察者解绑或是 onError/onComplete,如有则跳出循环,内部循环尝试从队列中取出事件,每次提取都使计数器-1,当计数器为0时表示队列中所有事件处理完毕,跳出内部循环。其实就是ObserveOnObserver 接收到上游消息后进行线程调度,然后再到对应线程向下发送消息。

五、背压

RxJava 既然能够在线程间切换,自然就会出现被观察者发出消息的速度和观察者消费事件的速度不一致的问题,因此 observeOn() 中会为观察者创建一个队列。但是默认的队列可以无限扩容,这可能导致OOM或者达不到我们想要的效果。因此 RxJava 提供了 Flowable 被观察者,我们能为其设置缓冲区长度和背压策略。

/**
* 背压
*/
private void backPressure() {
// 存放观察者接收到的事件
mBackPressureReceive = Collections.synchronizedList(new ArrayList<>());

Disposable intervalDisposable = Observable.interval(40, TimeUnit.MILLISECONDS)
.subscribe(aLong -> {
// 当 Flowable 绑定后开始每隔 40ms 请求一次值
if (mBackPressureSubscription != null) {
mBackPressureSubscription.request(1);
}
});

Flowable.create((FlowableOnSubscribe<String>) emitter -> {
// 发射256个事件
for (int i = 0; i < 256; i++) {
emitter.onNext(String.valueOf(i));
Thread.sleep(10);
}
emitter.onComplete();

// 抛弃超出缓冲区的事件
}, BackpressureStrategy.DROP)
.subscribeOn(Schedulers.computation())
// 为了使现象更明显,设置较小的缓冲区
.observeOn(Schedulers.computation(), false, 16)
.subscribe(new Subscriber<String>() {
@Override
public void onSubscribe(Subscription s) {
mBackPressureSubscription = s;
}

@Override
public void onNext(String s) {
mBackPressureReceive.add(s);
}

@Override
public void onError(Throwable t) {
Single.create(emitter -> log("BackPressure " + mBackPressureReceive.size()))
.subscribeOn(AndroidSchedulers.mainThread())
.subscribe();
intervalDisposable.dispose();
mBackPressureSubscription.cancel();
}

@Override
public void onComplete() {
Single.create(emitter -> log("BackPressure " + mBackPressureReceive.size()))
.subscribeOn(AndroidSchedulers.mainThread())
.subscribe();
intervalDisposable.dispose();
mBackPressureSubscription.cancel();
}
});
}

Subscriber 有别于一般的 Observer,在使用 Flowable 的情况下,Flowable 不会主动发出事件,但是我们可以主动调用 Subscriber#request() 来向 Flowable 的缓冲区请求事件。 但如果我们绑定的是 Consumer 则会自动调用 request(Long.MAX_VALUE) 来请求所有缓冲区内容。同样的,请求多少事件也是通过计数器加死循环实现的。

六、创建操作符

1. 功能

用于创建 Observable 对象及发射事件。

2. 目录

CreateDeferEmpty/Never/ThrowFromIntervalJustRangeRepeatStart, Timer

3. 介绍

注:仅选择部分方法 Class#Method() 作为操作符的例子

Create

Observable#create() 用于创建完整的 Observable

Defer

Observable#defer()Observer 订阅之前不会创建 Observable ,当 Observer 订阅后才会为每一个订阅的 Observer 创建一个新的 Observable

该方法接收一个 Supplier 对象,每次绑定产生的新 Observable 都是由 Supplier 提供的。

注:RxJava2 中 defer() 接收 java.util.concurrent.Callable,两者功能上没有任何差别

每一个订阅者都是一个新的流,有一个新的且内容相同的 Observable 与其对应

示例代码:

private int createTimes = 0;

@Test
public void deferExample() {
    Observable<String> create = Observable.just(createTimes++ + " ");
    Observable<String> defer = Observable.defer(() -> Observable.just(createTimes++ + " "));
    create.subscribe(System.out::print);
    create.subscribe(System.out::print);
    create.subscribe(System.out::print);
    System.out.println();
    defer.subscribe(System.out::print);
    defer.subscribe(System.out::print);
    defer.subscribe(System.out::print);
}

输出:
0 0 0 
1 2 3 

通过运行这段代码可以体现 defer() 和其它创建方式的差别。defer 对象的每次订阅都会使 createTimes 自增一次,即每次订阅都调用了 Observable.just() ,defer 绑定的 Consumer 所接收的 Observable 是在订阅时才创建的,而上面三个 Consumer 所绑定的是同一个 Observable

Empty

Observable#empty() 直接发送 onComplete() 事件。

Never

Observable#never() 订阅后不会发送任何事件。

Throw

Observable#error() 当接收 Throwable 时仅能发射一次 onError(), 接收 Supplier 时能够发射多个。

From

将其它对象和数据类型转换为 Observable

Observable#fromFuture() 接收一个 java.util.concurrent.Future,等待 Future 完成后返回结果。

Observable#fromPublisher()Publisher 流转为 Observable, 这会导致背压失效,源不再受任何限制,有关背压会在后面讲解。

Observable#fromArray() / fromIterable() 遍历数组、可迭代对象,通过 onNext() 发射每一个对象后 onComplete()

Interval

Observable#interval() 发出以给定时间间隔的整数序列。

Just

Observable#just() 按顺序发射10个以内对象,传递的参数将直接在 onNext() 方法中接收到。 其中2个以上参数方法的内部直接调用了 fromArray()

Range

Observable#range() 发出给定范围内的连续整数。

Repeat

Observable#repeat() 复读 重复调用 subscribe,直到停止时发出 onComplete()

示例代码:

Observable.create(emitter -> {
    emitter.onNext("Next1 ");
    emitter.onNext("Next2 ");
    emitter.onNext("Next3 ");
    emitter.onComplete();
})
        .repeat(2)
        .doOnNext(System.out::print)
        .doOnComplete(() -> System.out.println("\nonComplete"))
        .subscribe();

输出:
Next1 Next2 Next3 Next1 Next2 Next3 
onComplete

Start

Observable#fromCallable() 发送函数的返回值。

Timer

Observable#timer() 计时器,在给定延迟后发出特定值。

七、变换操作符

1. 功能

转换 Observable 发出的 items。

2. 目录

BufferFlatMapGroupByMapScan,  Window

3. 介绍

Buffer

Observable#range 将一定数量 Observable 发出的对象打包为 List ,然后发射。

示例代码:

Observable.range(0, 10)
//每次取3个,从第二次开始每次跳过2个
        .buffer(3, 2)
        .subscribe(list -> System.out.println("缓冲区剩余数量:" + list.size()
                + "  " + list.toString()));

输出:
缓冲区剩余数量:3  [0, 1, 2]
缓冲区剩余数量:3  [2, 3, 4]
缓冲区剩余数量:3  [4, 5, 6]
缓冲区剩余数量:3  [6, 7, 8]
缓冲区剩余数量:2  [8, 9]

FlatMap

Observable#flatMap()Observable 发出的对象转换为 Observable,然后再组合成一个新的 Observable

示例代码:

Observable.<Integer>create(emitter -> {
    emitter.onNext(0);
    emitter.onNext(1);
    emitter.onNext(2);
})
        .flatMap(i -> Observable.just(" repeat" + i, " repeat" + i))
        .subscribe(System.out::print);

输出:
repeat0 repeat0 repeat1 repeat1 repeat2 repeat2

GroupBy

Observable#groupBy() 将一个 Observable 划分为一组 Observable ,它们分别发射不同的原始 Observable 的子集。

Observable.range(0, 20)
        .groupBy(num -> {
            if (num < 10) {
                return "第一组";
            } else if (num < 15) {
                return "第二组";
            } else {
                return "第三组";
            }
        })
        .subscribe(groupedObservable -> {
            List<Integer> list = new ArrayList<>();
            groupedObservable
                    .doOnComplete(() -> {
                        String elements = list.toString();
                        System.out.println(groupedObservable.getKey() + elements);
                    })
                    .subscribe(list::add);
        });

输出:
第一组[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
第二组[10, 11, 12, 13, 14]
第三组[15, 16, 17, 18, 19]

Map

Observable#map() 将每一个 Observable 发射的事件通过函数转换为新的事件

示例代码:

Observable.range(1, 9)
        .map(item -> item * 10)
        .subscribe(mapped -> System.out.print(mapped + " "));

输出:
10 20 30 40 50 60 70 80 90 

Scan

Observable#scan() 按照顺序为每一个发射的事件调用一次方法并返回结果。第一个接收到的原始发射不参与方法直接发射。

Observable.range(1, 9)
        //第一个参数为上一次的结果,第二个参数为该次发射的值
        .scan((lastResult, item) -> lastResult * item)
        .subscribe(System.out::println);

输出:
1
2
6
24
120
720
5040
40320
362880

Window

Observable 发射的对象细分为多个 ObservableWindow,而不是将它们一次全部发射出来。

Window 类似于 Buffer,但是它不是从源Observable发出事件包(List),而是发出 Observable,每个 Observable 都从源 Observable 发出事件的子集,然后以onCompleted() 通知终止。

示例代码:

Consumer<Integer> consumer1 = integer -> System.out.println("Consumer1: " + integer);
Consumer<Integer> consumer2 = integer -> System.out.println("Consumer2: " + integer);
Consumer<Integer> consumer3 = integer -> System.out.println("Consumer3: " + integer);
AtomicInteger aInteger = new AtomicInteger(0);
Observable.range(1, 8)
        .window(3)
        .subscribe(integerObservable -> {
            int get = aInteger.getAndIncrement();
            if (get == 0) {
                integerObservable.subscribe(consumer1);
            } else if (get == 1) {
                integerObservable.subscribe(consumer2);
            } else {
                integerObservable.subscribe(consumer3);
            }
        });

输出:
Consumer1: 1
Consumer1: 2
Consumer1: 3
Consumer2: 4
Consumer2: 5
Consumer2: 6
Consumer3: 7
Consumer3: 8

八、过滤操作符

1. 功能

过滤 Observable 源发送的事件。

2. 目录

DebounceDistinctElementAtFilterFirst/ LastIgnoreElementsSampleSkipSkipLastTake/TakeLast

3. 介绍

Debounce

去抖动,如果两次事件时间间隔小于指定时间,丢弃前一次数据。

示例代码:

Observable.<Integer>create(emitter -> {
    emitter.onNext(1);
    Thread.sleep(900);
    emitter.onNext(2);
    Thread.sleep(2000);
    emitter.onNext(3);
    Thread.sleep(200);
    emitter.onNext(4);
})
        .debounce(1000, TimeUnit.MILLISECONDS, Schedulers.computation())
        .subscribe(System.out::println);

输出:
2

Distinct

丢弃事件流中的重复项。

ElementAt

指定发射某个索引值的元素。

Filter

过滤特定条件的事件。

示例代码:

Observable.just(1, 2, 1, 3, 4)
        .filter(integer -> integer < 3)
        .subscribe(System.out::println);

输出:
1
2
1

First / Last

仅发射 第一个 / 最后一个 事件。

IgnoreElements

忽略所有元素,直到 onComplete() / onError()

Sample

发射该段时间内最新的事件,在两次发射之间多次取样也只会取到一个。

示例代码:

Observable.create(emitter -> {
    for(int i = 0; i < 5; i++) {
        emitter.onNext(i);
        Thread.sleep(500);
    }
})
        .sample(800, TimeUnit.MILLISECONDS)
        .subscribe(System.out::println);

输出:
1
3
4

Skip / SkipLast

跳过 前面 / 后面 发出的 n 个事件

Take / TakeLast

只发射 前 / 后 n 个事件。

九、组合操作符

1. 功能

将多个源 Observable 组合创建单个 Observable

2. 目录

CombineLatestJoinMergeStartWithSwitch, Zip

3. 介绍

CombineLatest

当两个 Observable 中的任何一个发射数据时,使用一个函数结合每个 Observable 发射的最近数据项,并且基于这个函数的结果发射数据。

Observable<Integer> first = Observable.<Integer>create(emitter -> {
    for (int i = 0; i < 5; i++) {
        emitter.onNext(i);
        Thread.sleep(500);
    }
}).subscribeOn(Schedulers.computation());

Observable<String> second = Observable.<String>create(emitter -> {
    for (int i = 0; i < 7; i++) {
        emitter.onNext(String.valueOf(i));
        Thread.sleep(800);
    }
}).subscribeOn(Schedulers.computation());

Observable.combineLatest(first, second, (o1, o2) -> o1 + o2)
        .subscribe(System.out::println);

输出:
00
10
11
21
31
32
42
43
44
45
46

//TODO ……

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注