Rx是微软的.NET的一个响应式扩展。Rx借助可观测的序列提供一种简单的方式来创建异步的,基于事件驱动的程序。
简单点说,Rx就是一种响应式编程,来创建基于事件的异步程序。
Rx其实是一种编程思想,用很多语言都可以实现,比如RxJava、RxJS、RxPHP等等。
RxJava就是Java对Rx的实现,一个在JVM上使用可观测的序列来组成异步的、基于事件的程序的库。
RxJava的异步实现,是通过一种扩展的观察者模式来实现的。
基本使用
RxJava2的基本实现主要是三点:
Observer(观察者)
Observer即观察者,它决定事件触发的时候将有怎样的行为。创建方式如下:
1 | Observer<String> observer = new Observer<String>() { |
- onSubscribe:它会在事件还未发送之前被调用,可以用来做一些准备操作。里面的Disposable则是用来切断上下游关系的。
- onNext:普通的事件
- onError:事件队列异常,在事件处理过程中出现异常情况时,此方法会被调用。同时队列将会终止,也就是不允许再有事件发出。
- onComplete:事件队列完成。
Observable(被观察者)
Observable即被观察者,它决定什么时候触发事件以及触发怎样的事件。创建方式如下:
1 | Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() { |
ObservableEmitter是被观察者用来发送事件的。它可以发出三种类型的事件,通过调用Emitter的onNext(T value)、onError(Throwable error)、onComplete()就可以分别发出next事件、error事件、complete事件。
onNext:普通事件。onCompleted:事件队列完结。RxJava不仅把每个事件单独处理,还会把它们看做一个队列。RxJava规定,当不会再有新的onNext发出时,需要触发onCompleted方法作为标志。onError:事件队列异常。在事件处理过程中出异常时,onError会被触发,同时队列自动终止,不允许再有事件发出。- 在一个正确运行的事件序列中,
onCompleted和onError有且只有一个,并且是事件序列中的最后一个。需要注意的是,onCompleted和onError二者也是互斥的,即在队列中调用了其中一个,就不应该再调用另一个。
Subscribe(订阅)
创建了Observable和Observer之后,再用subscribe()方法将它们联结起来,整条链子就可以工作了。只需要一行代码:
1 | observable.subscribe(observer); |
看看运行效果:

和之前介绍的一样,先调用onSubscribe,然后调用onNext,最后以onComplete结尾。
操作符
创建操作符
just
just是一个简单的发射器,将传入的参数依次发射出来。
1 | Observable<String> observable = Observable.just("Hello", "Rxjava2", "My name is Silence", "What's your name"); |
fromarray
将传入的数组通过坐标依次发送出去。
1 | String[] words = {"Hello", "Rxjava2", "My name is Silence", "What's your name"}; |
timer
timer相当于一个定时任务,延时一定时间然后开始执行任务。间隔执行的功能属于interval操作符。
1 | logger.info("start"); |
执行结果:

interval
返回一个Observable,它按固定的时间间隔发射一个无限递增的整数序列。
1 | Observable.interval(1, TimeUnit.SECONDS).subscribe( |
执行结果:

每隔1s打印一次日志
变换操作符
map
map操作符通过指定一个Function对象,将Observable转换为一个新的Observable对象并发射,观察者将收到新的Observable。
1 | Observable.create(new ObservableOnSubscribe<Integer>() { |
执行结果:

map方法中,我们把一个integer对象转换成了一个String对象。然后当map调用结束时,事件的参数类型也从integer转换成了String。这就是最常见的变换操作。
flatMap
flatMap的操作符是将Observable发射的数据集合变成一个Observable集合。也就是说它可以将一个观察对象变换成多个观察对象,但是并不能保证事件的顺序。想要保证事件的顺序需要使用下面讲到的concatMap。
1 | Observable.create(new ObservableOnSubscribe<Integer>() { |

我们看到,对于一个integer,我们将它映射成了3个String类型的Observable对象。通过一个100ms的延迟,能看到发射的时间不能保证顺序。
concatMap
除了保证顺序,concatMap和flatMap一模一样。
1 | Observable.create(new ObservableOnSubscribe<Integer>() { |
执行结果:

compose
过滤操作符
filter
filter操作符是对Observable产生的结果进行有规则的过滤。只有满足规则的结果才会提交到观察者手中。
1 | Observable.just(1, 2, 3).filter(new Predicate<Integer>() { |
执行结果:

Observable发送1, 2, 3。当我们加上了一个filter操作符,让它只返回小于3的内容。那么观察者只能收到的两个数据。
distinct
distinct的作用就是去重。它只允许还没有发射的数据项通过,发射过的数据项直接pass。
1 | Observable.just(1, 2, 3, 4, 2, 3, 5, 6, 1, 3) |
执行结果:

buffer
buffer的作用主要是缓存,把Observable转换成一个新的Observable。这个新的Observable每次发射的是一组List,而不是单独的一个个的发送数据
buffer可以接受一个参数buffer(count):作用是将Observable中的数据分成最大不超过count的buffer,然后生成一个Observable。
1 | Observable.just(1, 2, 3, 4, 5, 6) |
执行结果:

buffer也可以接受两个参数buffer(count, skip):作用是将Observable中的数据按skip(步长)分成最大不超过count的buffer,然后生成一个Observable。
1 | Observable.just(1, 2, 3, 4, 5, 6) |
执行结果:

skip、skipLast
skip将Observable发射的数据过滤掉前n项。skipLast则是从后往前进行过滤。
1 | Observable.just(1, 2, 3, 4, 5, 6) |
执行结果:

take、takeLast
take只取Observable发射数据的前n项。takeLast则是从后往前取。
1 | Observable.just(1, 2, 3, 4, 5, 6) |
执行结果:

doOnNext
doOnNext的作用是让订阅者在接收到数据之前干点有意思的事情。
1 | Observable.just(1, 2, 3, 4) |
执行结果:

doOnEach
doOnEach可以给Observable加上这样的一个回调:Observable每发射一个数据的时候就会触发这个回调,不仅包括onNext还包括onError和onCompleted。
debounce
debounce的作用是发送频率过快的项。
1 | Observable.create(new ObservableOnSubscribe<Integer>() { |
执行结果:

debounce(500, TimeUnit.MILLISECONDS)的作用是去除发送间隔小于500毫秒的发射事件,所以1和3被过滤掉了
defer
defer操作符每次订阅都会创建一个新的Observable,如果没有被订阅,就不会产生新的Observable。
1 | Observable<Integer> observable = Observable.defer(new Callable<ObservableSource<Integer>>() { |
执行结果:

last
last操作符仅取出可观察到的最后一个值,或者是满足某些条件的最后一项。
1 | Observable.just(1, 2, 3) |
执行结果:

组合操作符
merge
merge将多个操作符合并到一个Observable中进行发射。merge可能让合并到Observable的数据发射错了。(并行无序)
1 | Observable<Integer> observable1 = Observable.just(1, 2, 3); |
执行结果:

reduce
reduce操作符每次用一个方法处理一个值,可以有一个seed作为初始值
1 | Observable.just(1, 2, 3) |
执行结果:

scan
scan操作符作用和上面的reduce一致,唯一的区别是reduce是输出结果,而scan会把每个步骤都输出。
1 | Observable.just(1, 2, 3) |
执行结果:

window
window安装时间划分窗口,将数据发送给不同的Observable。
1 | Observable.interval(1, TimeUnit.SECONDS) |
执行结果:

startWith
在数据序列的开头插入一条指定的项
concat
concat将多个操作符合并到一个Observable中进行发射,和merge不同的是,merge是无序的,而concat是有序的。(串行有序)没有发射完前一个它一定不会发射后一个。
1 | Observable<Integer> observable1 = Observable.just(1, 2, 3); |
执行结果:

zip
zip合并多个Observable发送的数据项,根据它们的类型进行重新变换,并发射一个新的值。
1 | Observable<Integer> observable1 = Observable.just(1, 2, 3); |
执行结果:

concatEager
前面说到concat是串行有序,而concatEager是并行有序的。
1 | Observable<Integer> observable1 = Observable.just(1, 2, 3); |
执行结果:

https://github.com/mcxiaoke/RxDocs
http://gank.io/post/560e15be2dca930e00da1083
https://juejin.im/post/5a43a842f265da432d2863ab#heading-26
https://www.daidingkang.cc/2017/05/19/Rxjava/
https://www.jianshu.com/p/b39afa92807e
https://github.com/IamXiaRui/Android_5.0_ViewDemo/tree/master/FirstRxJavaDemo
http://rxmarbles.com/
https://blog.csdn.net/qq_20198405/article/details/51307198