Rx是微软的.NET的一个响应式扩展。Rx借助可观测的序列提供一种简单的方式来创建异步的,基于事件驱动的程序。
简单点说,Rx就是一种响应式编程,来创建基于事件的异步程序。
Rx其实是一种编程思想,用很多语言都可以实现,比如RxJava、RxJS、RxPHP等等。
RxJava就是Java对Rx的实现,一个在JVM上使用可观测的序列来组成异步的、基于事件的程序的库。
RxJava的异步实现,是通过一种扩展的观察者模式来实现的。
基本使用
RxJava2的基本实现主要是三点:
Observer(观察者)
Observer
即观察者,它决定事件触发的时候将有怎样的行为。创建方式如下:
1 | Observer<String> observer = new Observer<String>() { |
- onSubscribe:它会在事件还未发送之前被调用,可以用来做一些准备操作。里面的Disposable则是用来切断上下游关系的。
- onNext:普通的事件
- onError:事件队列异常,在事件处理过程中出现异常情况时,此方法会被调用。同时队列将会终止,也就是不允许再有事件发出。
- onComplete:事件队列完成。
Observable(被观察者)
Observable
即被观察者,它决定什么时候触发事件以及触发怎样的事件。创建方式如下:
1 | Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() { |
ObservableEmitter
是被观察者用来发送事件的。它可以发出三种类型的事件,通过调用Emitter
的onNext(T value)
、onError(Throwable error)
、onComplete()
就可以分别发出next事件、error事件、complete事件。
onNext
:普通事件。onCompleted
:事件队列完结。RxJava不仅把每个事件单独处理,还会把它们看做一个队列。RxJava规定,当不会再有新的onNext
发出时,需要触发onCompleted
方法作为标志。onError
:事件队列异常。在事件处理过程中出异常时,onError
会被触发,同时队列自动终止,不允许再有事件发出。- 在一个正确运行的事件序列中,
onCompleted
和onError
有且只有一个,并且是事件序列中的最后一个。需要注意的是,onCompleted
和onError
二者也是互斥的,即在队列中调用了其中一个,就不应该再调用另一个。
Subscribe(订阅)
创建了Observable
和Observer
之后,再用subscribe()
方法将它们联结起来,整条链子就可以工作了。只需要一行代码:
1 | observable.subscribe(observer); |
看看运行效果:
和之前介绍的一样,先调用onSubscribe
,然后调用onNext
,最后以onComplete
结尾。
操作符
创建操作符
just
just
是一个简单的发射器,将传入的参数依次发射出来。
1 | Observable<String> observable = Observable.just("Hello", "Rxjava2", "My name is Silence", "What's your name"); |
fromarray
将传入的数组通过坐标依次发送出去。
1 | String[] words = {"Hello", "Rxjava2", "My name is Silence", "What's your name"}; |
timer
timer
相当于一个定时任务,延时一定时间然后开始执行任务。间隔执行的功能属于interval
操作符。
1 | logger.info("start"); |
执行结果:
interval
返回一个Observable,它按固定的时间间隔发射一个无限递增的整数序列。
1 | Observable.interval(1, TimeUnit.SECONDS).subscribe( |
执行结果:
每隔1s打印一次日志
变换操作符
map
map
操作符通过指定一个Function
对象,将Observable
转换为一个新的Observable
对象并发射,观察者将收到新的Observable
。
1 | Observable.create(new ObservableOnSubscribe<Integer>() { |
执行结果:
map
方法中,我们把一个integer对象转换成了一个String对象。然后当map
调用结束时,事件的参数类型也从integer转换成了String。这就是最常见的变换操作。
flatMap
flatMap
的操作符是将Observable
发射的数据集合变成一个Observable
集合。也就是说它可以将一个观察对象变换成多个观察对象,但是并不能保证事件的顺序。想要保证事件的顺序需要使用下面讲到的concatMap
。
1 | Observable.create(new ObservableOnSubscribe<Integer>() { |
我们看到,对于一个integer,我们将它映射成了3个String类型的Observable对象。通过一个100ms的延迟,能看到发射的时间不能保证顺序。
concatMap
除了保证顺序,concatMap
和flatMap
一模一样。
1 | Observable.create(new ObservableOnSubscribe<Integer>() { |
执行结果:
compose
过滤操作符
filter
filter
操作符是对Observable
产生的结果进行有规则的过滤。只有满足规则的结果才会提交到观察者手中。
1 | Observable.just(1, 2, 3).filter(new Predicate<Integer>() { |
执行结果:
Observable
发送1
, 2
, 3
。当我们加上了一个filter
操作符,让它只返回小于3的内容。那么观察者只能收到的两个数据。
distinct
distinct
的作用就是去重。它只允许还没有发射的数据项通过,发射过的数据项直接pass。
1 | Observable.just(1, 2, 3, 4, 2, 3, 5, 6, 1, 3) |
执行结果:
buffer
buffer
的作用主要是缓存,把Observable
转换成一个新的Observable
。这个新的Observable
每次发射的是一组List,而不是单独的一个个的发送数据
buffer
可以接受一个参数buffer(count)
:作用是将Observable
中的数据分成最大不超过count的buffer,然后生成一个Observable
。
1 | Observable.just(1, 2, 3, 4, 5, 6) |
执行结果:
buffer
也可以接受两个参数buffer(count, skip)
:作用是将Observable
中的数据按skip
(步长)分成最大不超过count的buffer,然后生成一个Observable
。
1 | Observable.just(1, 2, 3, 4, 5, 6) |
执行结果:
skip、skipLast
skip
将Observable
发射的数据过滤掉前n项。skipLast
则是从后往前进行过滤。
1 | Observable.just(1, 2, 3, 4, 5, 6) |
执行结果:
take、takeLast
take
只取Observable
发射数据的前n项。takeLast
则是从后往前取。
1 | Observable.just(1, 2, 3, 4, 5, 6) |
执行结果:
doOnNext
doOnNext
的作用是让订阅者在接收到数据之前干点有意思的事情。
1 | Observable.just(1, 2, 3, 4) |
执行结果:
doOnEach
doOnEach
可以给Observable
加上这样的一个回调:Observable
每发射一个数据的时候就会触发这个回调,不仅包括onNext
还包括onError
和onCompleted
。
debounce
debounce
的作用是发送频率过快的项。
1 | Observable.create(new ObservableOnSubscribe<Integer>() { |
执行结果:
debounce(500, TimeUnit.MILLISECONDS)
的作用是去除发送间隔小于500毫秒的发射事件,所以1和3被过滤掉了
defer
defer
操作符每次订阅都会创建一个新的Observable
,如果没有被订阅,就不会产生新的Observable
。
1 | Observable<Integer> observable = Observable.defer(new Callable<ObservableSource<Integer>>() { |
执行结果:
last
last
操作符仅取出可观察到的最后一个值,或者是满足某些条件的最后一项。
1 | Observable.just(1, 2, 3) |
执行结果:
组合操作符
merge
merge
将多个操作符合并到一个Observable
中进行发射。merge
可能让合并到Observable
的数据发射错了。(并行无序)
1 | Observable<Integer> observable1 = Observable.just(1, 2, 3); |
执行结果:
reduce
reduce
操作符每次用一个方法处理一个值,可以有一个seed作为初始值
1 | Observable.just(1, 2, 3) |
执行结果:
scan
scan
操作符作用和上面的reduce
一致,唯一的区别是reduce
是输出结果,而scan
会把每个步骤都输出。
1 | Observable.just(1, 2, 3) |
执行结果:
window
window
安装时间划分窗口,将数据发送给不同的Observable
。
1 | Observable.interval(1, TimeUnit.SECONDS) |
执行结果:
startWith
在数据序列的开头插入一条指定的项
concat
concat
将多个操作符合并到一个Observable
中进行发射,和merge
不同的是,merge
是无序的,而concat
是有序的。(串行有序)没有发射完前一个它一定不会发射后一个。
1 | Observable<Integer> observable1 = Observable.just(1, 2, 3); |
执行结果:
zip
zip
合并多个Observable
发送的数据项,根据它们的类型进行重新变换,并发射一个新的值。
1 | Observable<Integer> observable1 = Observable.just(1, 2, 3); |
执行结果:
concatEager
前面说到concat
是串行有序,而concatEager
是并行有序的。
1 | Observable<Integer> observable1 = Observable.just(1, 2, 3); |
执行结果:
https://github.com/mcxiaoke/RxDocs
http://gank.io/post/560e15be2dca930e00da1083
https://juejin.im/post/5a43a842f265da432d2863ab#heading-26
https://www.daidingkang.cc/2017/05/19/Rxjava/
https://www.jianshu.com/p/b39afa92807e
https://github.com/IamXiaRui/Android_5.0_ViewDemo/tree/master/FirstRxJavaDemo
http://rxmarbles.com/
https://blog.csdn.net/qq_20198405/article/details/51307198