当前位置 博文首页 > 刘桂林:掌握RxJava的葵花宝典

    刘桂林:掌握RxJava的葵花宝典

    作者:[db:作者] 时间:2021-08-03 12:41

    本文特长,阅读需要二十分钟


    640?wx_fmt=png


    各位少侠,老夫在黑木崖恭候大驾,欲练此功,必先哈哈。


    今天我们分享的是RxJava的知识点,让你快速掌握,所以我们会从0开始讲带RxJava的Api使用和设计理念等,本文较长,所以需要耐心看下去。


    目录

    • 一.RxJava相关资料

    • 二.RxJava的基本使用

    • 三.RxJava的调度器

    • 四.RxJava的操作符

    • 五.RxJava的订阅与反订阅

    • 六.RxJava的背压支持

    • 七.RxJava结合OkHttp

    • 八.RxJava结合Retrofit


    我们基于截止到本篇最新的RxJava版本 [ 2.2.8 ] 来讲解,事实上RxJava2.0 对比 RxJava1.0来说,更新了很多的东西,这些下文都将说到,所以这里提示你一下,RxJava 2.0 和 1.0 Api是不兼容的,同时也存在很多坑,如果你的老代码使用了RxJava1.0,你现在想要升级为RxJava2.0的话,是需要大面积的去修改的,好了,说了这么多,我们就不再介绍RxJava到底是什么了,因为这根框架出来了挺久了,相信大家都耳熟能详了,所以我尽可能的对设计理念,逻辑,对象,方法,接口等方面下手,让大家都有一个清晰的认识,做到知其然也知所以然,好了,我们直接进入正题吧。


    一.RxJava相关资料


    资料是肯定要的,学习也好,温习也罢,都是不可或缺的。


    • Github:https://github.com/ReactiveX/RxJava

    • 文档:https://mcxiaoke.gitbooks.io/rxdocs/content/


    二.RxJava的基本使用


    app/build.gradle中引入


    implementation "io.reactivex.rxjava2:rxjava:2.2.8"


    如果想充分了解RxJava的话,我建议你通读几遍RxJava中文文档,或许就已经得心应手了,我先来教大家RxJava的一些基础概念和用法


    首先,我们要明白观察者模式,这个模式我就不长篇大论了,给大家讲一个故事:小明在复习功课,小明的妹妹则是在看一个电视节目叫做我是歌手,小明告诉妹妹要是出现了周杰伦就叫他,这就有点类似观察者模式了,其中小明是观察者,妹妹是被观察者,通过叮嘱建立关系,叫做订阅,我们不需要小明隔一会儿就问一下妹妹周杰伦上场了没,而是订阅(叮嘱)后,妹妹看到周杰伦主动告诉小明,这个例子与抛物线的有区别,他举例的小偷并不会主动告诉警察自己在犯罪,在RxJava中,我们有三个阶段,分别如下:

    640?wx_fmt=jpeg? ??

    我们围绕这段代码来讲解,首先我要告诉你,代码看起来有些长,其实RxJava是很简洁的,有很多简洁的写法,这是一段初学者的代码,我先解释一下流程:小明(Observer)叮嘱(subscribe)妹妹(Observable)当出现周杰伦的时候叫他(onNext)


    Observer就是小明,他是一个接口,里面有四个回调方法,首先是第一个onSubscribe,这个方法是当你订阅成功之后回调的,里面传了一个Disposable,这个对象可以追述到RxJava1.0的时候的东西,可以不讲,这个对象可以用来判断订阅关系和解除订阅关系,也就是我在上面代码块中onComplete里的操作,继续来看,onNext方法就是与Observable,也就是被观察者通信用的,当被观察者有什么数据就是通过他来传输的,onError顾名思义就是发生错误的回调,还有onComplete,完成的回调, onNext,onError,onComplete,这三个都是通过被观察者Observable回调中的ObservableEmitter调用的,称为事件发射器


    Observable就是妹妹了,也是被观察者,Observable和Observer容易被混淆,这点一定要分清,至少我以前经常把他们搞混,我们来看他里面的代码,里面我写死的数据判断周杰伦,这个时候,妹妹判断到周杰伦就通过onNext()叫小明过来,然后再调用onComplete() 结束自己的任务,到此,观察者和被观察者的任务就完成了


    接着,我们再来说一个特性,叫做链式调用,用在我们的创建对象上也是可以做到简化的效果的,看如下代码:


    640?wx_fmt=jpeg ? ? ?


    这里只是为了告诉你链式调用这个概念,因为后面会经常用到,所以在这里先抛砖引玉,同时你可能有些疑问,为什么订阅的代码是Observable订阅Observer,即被观察者订阅观察者,形象一点就是 报纸订阅人,这明显是有些奇怪的,难道不应该是人订阅报纸,观察者订阅被观察者吗?实际上这是跟他的设计理念有些关系,这里给大家解惑一下,就是佛系,不要太在意。


    接着,我们就要来继续简化这段操作了,在RxJava1中的onAction,onFunc等方法已经被修改了,所以你现在应该记住如下的快捷方法:


    • Consumer?即观察者,用于接收单个值,

    • BiConsumer?则是接收两个值,

    • Function?用于变换对象,

    • Predicate?用于判断。


    记住这四个方法,我们一一来讲解一下:


    Consumer和BiConsumer,其实就是一个简化的Observer,区分于Consumer接收单个值,而BiConsumer?则接收两个值,我们来看如下用法:


    640?wx_fmt=jpeg? ? ? ?

    这里的代码使用Consumer完全替代了Observer,也能接收到onNext传输的数据,这就做到了简化的代码了,从最开始的一大段代码到现在的这些代码,不过这个还可以继续优化,但是目前你还是不知道为好,我们继续往下看,由于BiConsumer?是双参传入,我们到后面再来补充使用方式,现在讲需要用到其他函数,不利于阅读性,至少到目前为止,你已经知道了观察者和被观察者建立订阅传输数据,其实这已经足够了,我写着篇文章就是慢慢引导,最终掌握技能。


    我们接着来看Function?,Function?是一个变换对象的函数,如果你有一个Drawable需要转换成Bitmap或者你有一个String需要转Int都可以使用到他,不过我们这个例子用到了just和map,这两个你暂时可以不用理解,你只要明白这段代码的含义就好了:


    640?wx_fmt=jpeg? ? ? ?

    我们看到代码中首先Observable.just里传入了一个字符串,但是我们的接口可能需要int,那么就需要对他进行转换了,通过map的Function把字符串1传入回调函数进行处理后再return回来给到Observer中的Consumer?处理,在Consumer?中进行了相加的运算得到的结果为2,说明一切正常


    接着来看下Predicate?,这个方法是用来做一些条件判断并且拦截事件的,我们来看下如下这一段代码:


    640?wx_fmt=jpeg? ? ? ?

    从这段代码中我们看到,首先我们通过create创建被观察者,他发送出来的数据是3,然后通过filter方法实现了一个Predicate接口,filter方法你可以暂时不要理会,我们先来明白Predicate接口,可以看到他的返回值是一个布尔类型,这里我判断是否大于5,而我们传入的数据是3,小于5,那么这里返回的应该是false,接着我们实现Consumer,用来打印,这里返回false,那么事件就会被拦截,Consumer就不会打印,如果Predicate返回true,那么事件正常执行。


    好了,读到这里,我相信你已经随手能写出一段基础RxJava代码了,我们接着要来理解一个新的概念,那就是调度器


    三.RxJava的调度器


    调度器 Scheduler的作用很多,我们这里来说一下几个特性,其中关于Android部分的我们后面补充,先单指RxJava中的线程调度器:


    Schedulers.computation()

    用于计算任务,用于事件循环或和回调处理,不要用于IO操作,默认线程数等同于处理器的数量


    Schedulers.from(executor)

    使用指定的Executor作为调度器


    Schedulers.immediate(?)

    在当前线程立即开始执行任务,这也是默认的调度器


    Schedulers.io(?)

    用于IO操作的调度器,这个调度器的线程池会根据需要增长,很像一个CachedThreadScheduler线程池


    Schedulers.newThread(?)

    每个任务都开启一个新的线程


    Schedulers.trampoline(?)

    当其它排队的任务完成后,在当前线程排队开始执行


    这里同时需要明白两个概念,那就是subscribeOn和observeOn,前者是指定工作线程,后一个是指定回调线程,我们来看如下代码:


    640?wx_fmt=jpeg ? ? ?


    这里我用subscribeOn指定了Schedulers.newThread()这个调度器,意思就是Observable在工作的时候每次执行任务都创建一个新的线程,比如我在里面处理了一个耗时的任务,这个时候是处于子线程中,我们想要Observer中直接更新UI,那么应该怎么做?这里这一句就是关键:observeOn(AndroidSchedulers.mainThread()),指定线程为主线程,AndroidSchedulers这个调度器是RxAndroid中的,所以一般我们是配合使用的,一法通万法,只要你学会了RxJava,其实RxAndroid很多东西也能迎刃而解的,所以我们总结一下:subscribeOn是用来指定工作线程,而observeOn是用来指定回调线程


    RxAndroid Github:https://github.com/ReactiveX/RxAndroid


    app/build.gradle:


    implementation 'io.reactivex.rxjava2:rxandroid:2.1.1'


    代码中多次提到了一些操作符如create,map等,每一个都有特殊的含义与作用,同时Rx真正的强大也在于这些操作符,所以我们接下来花大篇幅讲解一下这些操作符的作用。


    四.RxJava的操作符


    RxJava的操作符还是很有魅力的,在上面几个章节中,我们提到了create,just,map,filter 这些都是它的操作符,操作符一共分为十种,分别如下,下文摘抄自中文文档:


    创建操作符 / 用于创建Observable的操作符

    • Create?— 通过调用观察者的方法从头创建一个Observable

    • Defer?— 在观察者订阅之前不创建这个Observable,为每一个观察者创建一个新的Observable

    • Empty/Never/Throw?— 创建行为受限的特殊Observable

    • From?— 将其它的对象或数据结构转换为Observable

    • Interval?— 创建一个定时发射整数序列的Observable

    • Just?— 将对象或者对象集合转换为一个会发射这些对象的Observable

    • Range?— 创建发射指定范围的整数序列的Observable

    • Repeat?— 创建重复发射特定的数据或数据序列的Observable

    • Start?— 创建发射一个函数的返回值的Observable

    • Timer?— 创建在一个指定的延迟之后发射单个数据的Observable


    变换操作符 / 用于对Observable发射的数据进行变换

    • Buffer?— 缓存,可以简单的理解为缓存,它定期从Observable收集数据到一个集合,然后把这些数据集合打包发射,而不是一次发射一个

    • FlatMap?— 扁平映射,将Observable发射的数据变换为Observables集合,然后将这些Observable发射的数据平坦化的放进一个单独的Observable,可以认为是一个将嵌套的数据结构展开的过程。

    • GroupBy?— 分组,将原来的Observable分拆为Observable集合,将原始Observable发射的数据按Key分组,每一个Observable发射一组不同的数据

    • Map?— 映射,通过对序列的每一项都应用一个函数变换Observable发射的数据,实质是对序列中的每一项执行一个函数,函数的参数就是这个数据项

    • Scan?— 扫描,对Observable发射的每一项数据应用一个函数,然后按顺序依次发射这些值

    • Window?— 窗口,定期将来自Observable的数据分拆成一些Observable窗口,然后发射这些窗口,而不是每次发射一项。类似于Buffer,但Buffer发射的是数据,Window发射的是Observable,每一个Observable发射原始Observable的数据的一个子集


    过滤操作符 / 用于从Observable发射的数据中进行选择

    • Debounce?— 只有在空闲了一段时间后才发射数据,通俗的说,就是如果一段时间没有操作,就执行一次操作

    • Distinct?— 去重,过滤掉重复数据项

    • ElementAt?— 取值,取特定位置的数据项

    • Filter?— 过滤,过滤掉没有通过谓词测试的数据项,只发射通过测试的

    • First?— 首项,只发射满足条件的第一条数据

    • IgnoreElements?— 忽略所有的数据,只保留终止通知(onError或onCompleted)

    • Last?— 末项,只发射最后一条数据

    • Sample?— 取样,定期发射最新的数据,等于是数据抽样,有的实现里叫ThrottleFirst

    • Skip?— 跳过前面的若干项数据

    • SkipLast?— 跳过后面的若干项数据

    • Take?— 只保留前面的若干项数据

    • TakeLast?— 只保留后面的若干项数据


    组合操作符 / 用于将多个Observable组合成一个单一的Observable

    • And/Then/When?— 通过模式(And条件)和计划(Then次序)组合两个或多个Observable发射的数据集

    • CombineLatest?— 当两个Observables中的任何一个发射了一个数据时,通过一个指定的函数组合每个Observable发射的最新数据(一共两个数据),然后发射这个函数的结果

    • Join?— 无论何时,如果一个Observable发射了一个数据项,只要在另一个Observable发射的数据项定义的时间窗口内,就将两个Observable发射的数据合并发射

    • Merge?— 将两个Observable发射的数据组合并成一个

    • Switch?— 将一个发射Observable序列的Observable转换为这样一个Observable:它逐个发射那些Observable最近发射的数据

    • Zip?— 打包,使用一个指定的函数将多个Observable发射的数据组合在一起,然后将这个函数的结果作为单项数据发射


    错误处理操作符 / 用于从错误通知中恢复

    • Catch?— 捕获,继续序列操作,将错误替换为正常的数据,从onError通知中恢复

    • Retry?— 重试,如果Observable发射了一个错误通知,重新订阅它,期待它正常终止


    辅助操作符 / 用于处理Observable的操作符

    • Delay?— 延迟一段时间发射结果数据

    • Do?— 注册一个动作占用一些Observable的生命周期事件,相当于Mock某个操作

    • Materialize/Dematerialize?— 将发射的数据和通知都当做数据发射,或者反过来

    • ObserveOn?— 指定观察者观察Observable的调度程序(工作线程)

    • Serialize?— 强制Observable按次序发射数据并且功能是有效的

    • Subscribe?— 收到Observable发射的数据和通知后执行的操作

    • SubscribeOn?— 指定Observable应该在哪个调度程序上执行

    • TimeInterval?— 将一个Observable转换为发射两个数据之间所耗费时间的Observable

    • Timeout?— 添加超时机制,如果过了指定的一段时间没有发射数据,就发射一个错误通知

    • Timestamp?— 给Observable发射的每个数据项添加一个时间戳

    • Using?— 创建一个只在Observable的生命周期内存在的一次性资源


    条件和布尔操作符 / 用于单个或多个数据项,也可用于Observable

    • All?— 判断Observable发射的所有的数据项是否都满足某个条件

    • Amb?— 给定多个Observable,只让第一个发射数据的Observable发射全部数据

    • Contains?— 判断Observable是否会发射一个指定的数据项

    • DefaultIfEmpty?— 发射来自原始Observable的数据,如果原始Observable没有发射数据,就发射一个默认数据

    • SequenceEqual?— 判断两个Observable是否按相同的数据序列

    • SkipUntil?— 丢弃原始Observable发射的数据,直到第二个Observable发射了一个数据,然后发射原始Observable的剩余数据

    • SkipWhile?— 丢弃原始Observable发射的数据,直到一个特定的条件为假,然后发射原始Observable剩余的数据

    • TakeUntil?— 发射来自原始Observable的数据,直到第二个Observable发射了一个数据或一个通知

    • TakeWhile?— 发射原始Observable的数据,直到一个特定的条件为真,然后跳过剩余的数据


    算数和聚合操作符 / 用于整个数据序列

    • Average?— 计算Observable发射的数据序列的平均值,然后发射这个结果

    • Concat?— 不交错的连接多个Observable的数据

    • Count?— 计算Observable发射的数据个数,然后发射这个结果

    • Max?— 计算并发射数据序列的最大值

    • Min?— 计算并发射数据序列的最小值

    • Reduce?— 按顺序对数据序列的每一个应用某个函数,然后返回这个值

    • Sum?— 计算并发射数据序列的和


    连接操作符 / 有精确可控的订阅行为的特殊Observable

    • Connect?— 指示一个可连接的Observable开始发射数据给订阅者

    • Publish?— 将一个普通的Observable转换为可连接的

    • RefCount?— 使一个可连接的Observable表现得像一个普通的Observable

    • Replay?— 确保所有的观察者收到同样的数据序列,即使他们在Observable开始发射数据之后才订阅

    转换操作符 / 用于转换成其他对象

    • To?— 将Observable转换为其它的对象或数据结构

    • Blocking?阻塞Observable的操作符


    可以看到其中的操作符非常的多,这也意味着使用起来更加灵活,我们篇幅有限,也不可能全部讲一遍,不过我们可以挑一些重点的来讲,这里我也尽量保证多赘述一些,让大家尽可能的理解,不过要是太啰嗦了,也就没得办法了,只能说,要想学会RxJava,这些都是必经之路的。


    创建操作符


    1.Create

    这应该是我们第一个学习的操作符了,create操作符的作用是从头开始创建一个Observable,并且回调一个事件发射器ObservableEmitter用来调用onNext,onError,onCompleted,一个正确的Observable应该正好调用一次onCompleted或者onError,我们来看这段示例代码:


    640?wx_fmt=jpeg ? ? ?

    实际上这段代码我们看了很多遍了,Observable创建后订阅一个Observer


    2.Defer

    defe和create的区别在于只有等关系订阅之后Observable才会去建立,相当于懒加载,我们来看下示例代码:

    640?wx_fmt=jpeg? ? ? ?

    也就是说当订阅关系建立的时候,才会return一个Observable出去,这就做到了只有等关系订阅之后Observable才会去建立的特性了,只要能创建一个Observable,不管是just还是create都是可以的,如下:

    640?wx_fmt=jpeg ? ? ?

    3.Empty/Never/Throw

    这三个放在一起讲师应该这三个比较特殊


    Empty:创建一个不发射任何数据但是正常终止的Observable

    Never:创建一个不发射数据也不终止的Observable

    下一篇:没有了