赞
踩
Rxjava源码解析系列:
Rxjava 源码解析(一) - subscribe源码
Rxjava 源码解析(二) - 线程切换源码
Rxjava 源码解析(三) - Schedulers默认线程池
前文已经分析过rxjava中我们常见用法的一些源码,还没有了解的读者推荐看下:
Rxjava 源码解析(一) - subscribe源码
Rxjava 源码解析(二) - 线程切换源码
本文将分析下rxjava线程池相关的源码。
本文的内容大概有以下:
rxjava内置的线程池除了io,还有newThead,single等,它们的逻辑都是类似的,本文就以io为例子来进行解析。
demo代码:
Observable .create(new ObservableOnSubscribe<String>() { @Override public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Throwable { Log.i("RxJavaTest", "subscribe"); emitter.onNext("123"); } }).map(new Function<String, String>() { @Override public String apply(String s) throws Throwable { return s + "456"; } }) .subscribeOn(Schedulers.io()) .observeOn(Schedulers.io()) .subscribe(new Observer<String>() { @Override public void onSubscribe(@NonNull Disposable d) { Log.i("RxJavaTest", "onSubscribe"); } @Override public void onNext(@NonNull String s) { Log.i("RxJavaTest", "onNext: " + s); } @Override public void onError(@NonNull Throwable e) { Log.i("RxJavaTest", "onError"); } @Override public void onComplete() { Log.i("RxJavaTest", "onComplete"); } });
RxJavaPlugins.onIoScheduler是rxjava设置的代理,默认情况下没有,因此直接看下Schedulers.IO逻辑即可。
public static Scheduler io() {
return RxJavaPlugins.onIoScheduler(IO);
}
public static Scheduler onIoScheduler(@NonNull Scheduler defaultScheduler) {
Function<? super Scheduler, ? extends Scheduler> f = onIoHandler;
if (f == null) {
return defaultScheduler;
}
return apply(f, defaultScheduler);
}
IO是Schedulers的静态对象,在这个类被调用的时候会初始化。
static final Scheduler IO;
static {
SINGLE = RxJavaPlugins.initSingleScheduler(new SingleTask());
COMPUTATION = RxJavaPlugins.initComputationScheduler(new ComputationTask());
IO = RxJavaPlugins.initIoScheduler(new IOTask());
TRAMPOLINE = TrampolineScheduler.instance();
NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new NewThreadTask());
}
IOTask中最终返回的是IoHolder.DEFAULT对象。
static final class IOTask implements Supplier<Scheduler> {
@Override
public Scheduler get() {
return IoHolder.DEFAULT;
}
}
static final class IoHolder {
static final Scheduler DEFAULT = new IoScheduler();
}
IoScheduler中的逻辑就比较关键了,主要有以下几点:
public IoScheduler() {
this(WORKER_THREAD_FACTORY);
}
static final RxThreadFactory WORKER_THREAD_FACTORY;
static final CachedWorkerPool NONE;
final ThreadFactory threadFactory;
final AtomicReference<CachedWorkerPool> pool;
public IoScheduler(ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
this.pool = new AtomicReference<>(NONE);
start();
}
static {
————————————————省略
WORKER_THREAD_FACTORY = new RxThreadFactory(WORKER_THREAD_NAME_PREFIX, priority);
NONE = new CachedWorkerPool(0, null, WORKER_THREAD_FACTORY);
NONE.shutdown();
————————————————省略
}
RxThreadFactory就是一个普通的线程工厂,主要逻辑如下:
public final class RxThreadFactory extends AtomicLong implements ThreadFactory { private static final long serialVersionUID = -7789753024099756196L; final String prefix; final int priority; final boolean nonBlocking; public RxThreadFactory(String prefix) { this(prefix, Thread.NORM_PRIORITY, false); } public RxThreadFactory(String prefix, int priority) { this(prefix, priority, false); } public RxThreadFactory(String prefix, int priority, boolean nonBlocking) { this.prefix = prefix; this.priority = priority; this.nonBlocking = nonBlocking; } @Override public Thread newThread(@NonNull Runnable r) { StringBuilder nameBuilder = new StringBuilder(prefix).append('-').append(incrementAndGet()); String name = nameBuilder.toString(); Thread t = nonBlocking ? new RxCustomThread(r, name) : new Thread(r, name); t.setPriority(priority); t.setDaemon(true); return t; } @Override public String toString() { return "RxThreadFactory[" + prefix + "]"; } static final class RxCustomThread extends Thread implements NonBlockingThread { RxCustomThread(Runnable run, String name) { super(run, name); } }
从CachedWorkerPool中获取Worker时,会先去expiringWorkerQueue中查看是否有缓存的Worker,如果没有的话就会传入ThreadFactory来创建一个新的Worker。
static final class CachedWorkerPool implements Runnable { private final long keepAliveTime; private final ConcurrentLinkedQueue<ThreadWorker> expiringWorkerQueue; final CompositeDisposable allWorkers; private final ScheduledExecutorService evictorService; private final Future<?> evictorTask; private final ThreadFactory threadFactory; ————————————————————省略 ThreadWorker get() { if (allWorkers.isDisposed()) { return SHUTDOWN_THREAD_WORKER; } while (!expiringWorkerQueue.isEmpty()) { ThreadWorker threadWorker = expiringWorkerQueue.poll(); if (threadWorker != null) { return threadWorker; } } // No cached worker found, so create a new one. ThreadWorker w = new ThreadWorker(threadFactory); allWorkers.add(w); return w; } ————————————————————省略 }
主要逻辑如下:
对ioScheduler的两点总结:
- IoScheduler的线程池与Worker是一一对应关系
- IoScheduler的一个线程工厂会对应多个Worker和多个线程池
(线程池和Worker的对应关系每个Scheduler是不一样的,如SingleScheduler中就是一个线程池对应所有的Worker,当然开发者也可以完全自定义Scheduler的逻辑)
static final class ThreadWorker extends NewThreadWorker { long expirationTime; ThreadWorker(ThreadFactory threadFactory) { super(threadFactory); this.expirationTime = 0L; } public long getExpirationTime() { return expirationTime; } public void setExpirationTime(long expirationTime) { this.expirationTime = expirationTime; } }
public NewThreadWorker(ThreadFactory threadFactory) {
executor = SchedulerPoolFactory.create(threadFactory);
}
public static ScheduledExecutorService create(ThreadFactory factory) {
final ScheduledThreadPoolExecutor exec = new ScheduledThreadPoolExecutor(1, factory);
exec.setRemoveOnCancelPolicy(PURGE_ENABLED);
return exec;
}
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) { Runnable decoratedRun = RxJavaPlugins.onSchedule(run); ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent); if (parent != null) { if (!parent.add(sr)) { return sr; } } Future<?> f; try { if (delayTime <= 0) { f = executor.submit((Callable<Object>)sr); } else { f = executor.schedule((Callable<Object>)sr, delayTime, unit); } sr.setFuture(f); } catch (RejectedExecutionException ex) { if (parent != null) { parent.remove(sr); } RxJavaPlugins.onError(ex); } return sr; }
Worker在rxjava中的使用到的地方非常多,可以说Worker就是rxjava实现线程切换的关键。
此处以ObservableObserveOn这个类为例来说下Worker的创建与使用。
如果不了解ObservableObserveOn,或者不了解rxjava常用方法的源码的读者,可以先看下前文:
Rxjava 源码解析(一) - subscribe源码
Rxjava 源码解析(二) - 线程切换源码
ObservableObserveOn中使用Worker的主要逻辑:
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<>(observer, w, delayError, bufferSize));
}
}
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
schedule();
}
ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
this.downstream = actual;
this.worker = worker;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
void schedule() {
if (getAndIncrement() == 0) {
worker.schedule(this);
}
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。