写在伊始
上一篇介绍了线程的一些基础知识和工作这么久以后对于多线程部分的使用经验之路,这篇主要对RxJava线程控制部分进行分析,及相应的源码介绍。
RxJava认知
说实话,近一年多一直在用rxjava进行项目架构的编写及封装及一些异步请求的处理等等。真的很好用,但本文只对其线程部分进行分析。如果你想学习rxjava的话,推荐您看一下如下几篇文档,也是一点一点学过来的,希望可以帮到您。
- 扔物线大神,匠心之作
- ,RxJava系列文章。
- ,RxJava系列文章。
RxJava之Scheduler (调度器)(附带源码分析,本文就RxJava2.0分析)
网上找的对于Scheduler 的2.0介绍都是写的扔物线大神对于Scheduler在RxJava1.0中的介绍,所以对于新操作符single()是自己的理解什么的,不对的还请指出)。
在RxJava 中,Scheduler ——调度器,相当于线程控制器,RxJava 通过它来指定每一段代码应该运行在什么样的线程。RxJava 已经内置了几个 Scheduler ,它们已经适合大多数的使用场景,他们分别为:
- Schedulers.io():最常用的Scheduler ,主要用于IO 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。行为模式和 newThread() 差不多,区别在于 io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 比 newThread() 更有效率。不要把计算工作放在 io() 中,可以避免创建不必要的线程。
- Schedulers.computation():计算所使用的 Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU。
- Schedulers.newThread():总是启用新线程,并在新线程执行操作。
- Schedulers.trampoline(),直接在当前线程运行,相当于不指定线程。这是默认的 Scheduler。代替RxJava1.0的immediate 调度器。
- Schedulers.single(): (我没使用过)拥有一个线程单例,所有的任务都在这一个线程中执行,当此线程中有任务执行时,其他任务将会按照先进先出的顺序依次执行。
- Schedulers.from(executor),接收一个Executor,允许我们自定义Scheduler。
- 另外, Android 还有一个专用的 AndroidSchedulers.mainThread(),它指定的操作将在 Android 主线程运行。(也就是onNext方法的执行线程)
有了这几个 Scheduler ,就可以使用 subscribeOn() 和 observeOn() 两个方法来对线程进行控制了。subscribeOn(): 指定 subscribe() 所发生的线程,即 Observable.OnSubscribe 被激活时所处的线程。或者叫做事件产生的线程。 observeOn(): 指定 Subscriber 所运行在的线程。或者叫做事件消费的线程。
//例如 DisposableSubscriber sub = RetorfitUtil.getInstance().create().getData() .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) //类型转换 变成你想要的类型 .onBackpressureBuffer() ...复制代码
下文会对这些Scheduler进行单独的介绍。但是在这之前,先看一下Schedulers这个类。
话不多说,请撸这段源码。在subscribeOn这个方法中,无论我们使用哪一个Scheduler,都会首先走进这个类中。我们发现,在一开始的时候,他用一个静态代码块,初始化了五个Scheduler 供我们使用。
static final class SingleHolder { static final Scheduler DEFAULT = new SingleScheduler(); } .... static final class NewThreadHolder { static final Scheduler DEFAULT = new NewThreadScheduler(); } static { SINGLE = RxJavaPlugins.initSingleScheduler(new Callable() { @Override public Scheduler call() throws Exception { return SingleHolder.DEFAULT; } }); .... IO = RxJavaPlugins.initIoScheduler(new Callable () { @Override public Scheduler call() throws Exception { return IoHolder.DEFAULT; } }); TRAMPOLINE = TrampolineScheduler.instance();//默认Scheduler .... }复制代码
一旦我们调用这些方法,他就会把我们的任务放进对应的Scheduler (线程池,准确的说,应该是线程组)中进行执行。这也就是线程切换的概念。
... public static Scheduler io() { return RxJavaPlugins.onIoScheduler(IO); } ... public static Scheduler single() { return RxJavaPlugins.onSingleScheduler(SINGLE); }复制代码
那么他在new这些Scheduler的时候又做了些什么呢?这些线程池(组)又是如何的创建的呢?一个一个跟进源码看一看我们。
Schedulers.io():
之前说这个Scheduler的内部实现是是用一个无数量上限的线程池(其实是线程组),可以重用空闲的线程,那么他是如何做的呢?
首先,他们所有的Thread都是由RxThreadFactory创建,通过初始化独特的属于自己的字符串常量来确保不同的Scheduler在创建不同的线程的标志性。如下(其余同,就不介绍此部分了):
再者发现一个有意思的事情,rxjava所有的线程都被设置为守护线程。如下图:也就是它的线程不会影响JVM的退出,关于守护线程和用户线程不明白的,请自行百度。
之前在使用的过程中,一直以为无数量上限的线程池的底层是newCachedThreadPool实现的,后来发现不然,在IoScheduler的CachedWorkerPool类中你会发现,它创建了一个创建一个定长线程池,并且大小是1.
那么无数量限制呢?懵逼了!!后来发现,Schedulers.io()所用的线程池(到这里发现不对了吧,准确的来说,应该叫的线程组,因为他是由一个一个的定长线程池组成),是一个由ConcurrentLinkedQueue(顶层父类其实是Collection)组成的多个CachedWorkerPool,通过CachedWorkerPool的get()方法每次从Queue中获取可用worker线程,来进行任务的操作。如下图:
当然,如果expiringWorkerQueue没有worker线程,则会单独为此线程创建一个worker线程
Schedulers.newThread()
Schedulers.newThread(): 总是启用新线程,并在新线程执行操作。那么,在NewThreadScheduler它是怎么做的呢?如下图:图中第一个和第二个箭头说明不同的Scheduler在创建不同的线程的标志性。第三个箭头说明他每次创建一个线程就把这个线程初始化为worker线程,并执行操作。
Schedulers.computation()
计算所使用的 Scheduler。这个线程池由newFixedThreadPool直接实现。这个计算指的是 CPU 密集型计算。那为什么这个线程池适合执行Cpu密集型计算呢,因为它的个数等于CPU 核数,能够最大效率的使用CPU,提高效率。如图:
获取CPU核数:
创建固定大小的线程池:
Schedulers.computation()模式下是用RoundRobin(是一种算法,请自行百度)方式轮训获取worker线程,这就是为什么起名叫叫EventLoop吧,如下图。
在EventLoopWorker中,会调用schedule方法执行线程中的任务。
Schedulers.trampoline():(默认使用的线程组)
Schedulers.trampoline()的意思是直接在当前线程运行,相当于不指定线程。这是默认的 Scheduler,代替RxJava1.0的immediate 调度器。Schedulers.trampoline()所用的线程组,组合了一个PriorityBlockingQueue,以提交事件的时间进行排序,依次执行任务,如下图。
这个单词你懂了就懂了。enqueue,不信你去查查!
Schedulers.single()
Schedulers.single(): 拥有一个线程单例,所有的任务都在这一个线程中执行,当此线程中有任务执行时,其他任务将会按照先进先出的顺序依次执行。他只有一个worker线程,如图:
在ScheduledWorker中,会执行我们所有的任务。
在CompositeDisposable中,有一个OpenHashSet回来保存我们的任务,在add方法中,通过同步代码块,保证同一时间只有一个任务在执行。
Schedulers.from(executor):
接收一个Executor,允许我们自定义Scheduler。关于这方面,先不准备讲解,使用较少,如有需要请告知,我可以一起和你百度,共同实践哈哈!
问题求解
还有一个任务,就是在RxJavaPlugins这个类中,初始化了很多的handler,是为了Funtion中回调数据吗?还是什么?没找到相关的地方,求帮助!
写在最后
还是那句话,希望得到大家的中肯的意见,让我认识自己的不足,一起学习,共同进步。这一篇就先到这里,分析的过程中,发现写的真是无法企及的高度。自惭中……