赞
踩
- compile 'io.reactivex:rxjava:1.1.3'
- compile 'io.reactivex:rxandroid:1.1.0'
- Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
- @Override
- public void call(Subscriber<? super String> subscriber) {
- subscriber.onNext("Hello");
- subscriber.onNext("Hi");
- subscriber.onNext("Aloha");
- subscriber.onCompleted();
- }
- });
- Observable observable = Observable.just("Hello", "Hi", "Aloha");
- String[] words = {"Hello", "Hi", "Aloha"}; Observable observable = Observable.from(words);
- Subscriber subscriber = new Subscriber() {
- @Override
- public void onCompleted() {
- }
- @Override
- public void onError(Throwable e) {
- }
- @Override
- public void onNext(Object o) {
- }
- @Override
- public void onStart() {
- super.onStart();
- }
- };
- observable.subscribe(observer);
- observable.subscribe(subscriber);
到此为止我们利用RxJava实现了一个简单的观察者模式。但是并没有使用到线程的动态切换功能,该部分才是RxJava跟普通观察者模式的最大区别;下面我们对该部分如何使用进行介绍。但是在正式介绍线程动态切换方法之前,我们先来了解一下Observable的map和flatmap方法,因为RxJava线程动态切换往往伴随着这两个方法的出现。
- Observable.just("images/logo.png") // 输入类型 String
- .map(new Func1<String, Bitmap>() {
- @Override
- public Bitmap call(String filePath) { // 参数类型 String
- return getBitmapFromPath(filePath); // 返回类型 Bitmap
- }
- })
- .subscribe(new Action1<Bitmap>() {
- @Override
- public void call(Bitmap bitmap) { // 参数类型 Bitmap
- showBitmap(bitmap);
- }
- });
- Student[] students = ...;
- Subscriber<Course> subscriber = new Subscriber<Course>() {
- @Override
- public void onNext(Course course) {
- Log.d(tag, course.getName());
- }
- ...
- };
- Observable.from(students)
- .flatMap(new Func1<Student, Observable<Course>>() {
- @Override
- public Observable<Course> call(Student student) {
- return Observable.from(student.getCourses());
- }
- })
- .subscribe(subscriber);
- Observable.just(1, 2, 3, 4)
- .subscribeOn(Schedulers.io()) //决定调用observable.subscribe(subscriber)方法时的执行线程
- .observeOn(Schedulers.newThread()) //决定下面mapOperator方法的执行线程
- .map(mapOperator)
- .observeOn(Schedulers.io()) //决定下面mapOperator2方法的执行线程
- .map(mapOperator2)
- .observeOn(AndroidSchedulers.mainThread) //决定下面subscriber对象的onNext()、onCompleted()、onCompleted()和onError()方法执行线程。即UI线程
- .subscribe(subscriber);
- final OnSubscribe<T> onSubscribe;
- static final RxJavaObservableExecutionHook hook = RxJavaPlugins.getInstance().getObservableExecutionHook();
- //RxJavaPlugins.getInstance()是一个单例模式,获取一个RxJavaPlugins对象,然后这个对象定义了Hook,然而这对Hook并没有做任何事情,至少目前是这样的。
- //rxJavaPlugins.getObservableExecutionHook()近似等价于得到一个RxJavaObservableExecutionHook对象,该对象目前不干任何事,完全可以把它看成透明的,你传什么进去它回什么。
- protected Observable(OnSubscribe<T> f) {
- this.onSubscribe = f;
- }
- public static <T> Observable<T> create(OnSubscribe<T> f) {
- return new Observable<T>(hook.onCreate(f));
- }
- public static <T> Observable<T> just(final T value) {
- return ScalarSynchronousObservable.create(value); //note1
- }
- public static <T> Observable<T> just(T t1, T t2) {
- return from((T[])new Object[] { t1, t2 }); //note2
- }
- public static <T> Observable<T> from(T[] array) {
- int n = array.length;
- if (n == 0) {
- return empty();//note1
- } else
- if (n == 1) {
- return just(array[0]);//note2
- }
- return create(new OnSubscribeFromArray<T>(array));
- }
- private static final class EmptyHolder {
- final static Observable<Object> INSTANCE = create(new OnSubscribe<Object>() {
- @Override
- public void call(Subscriber<? super Object> subscriber) {
- subscriber.onCompleted();
- }
- });
- }
- public final Subscription subscribe(final Observer<? super T> observer) {
- if (observer instanceof Subscriber) {
- return subscribe((Subscriber<? super T>)observer); //note1
- }
- return subscribe(new Subscriber<T>() { //note2
- @Override
- public void onCompleted() {
- observer.onCompleted();
- }
- @Override
- public void onError(Throwable e) {
- observer.onError(e);
- }
- @Override
- public void onNext(T t) {
- observer.onNext(t);
- }
- });
- }
- public final Subscription subscribe(Subscriber<? super T> subscriber) {
- return Observable.subscribe(subscriber, this);
- }
- private static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
- if (subscriber == null) {
- throw new IllegalArgumentException("observer can not be null");
- }
- if (observable.onSubscribe == null) {
- throw new IllegalStateException("onSubscribe function can not be null.");
- }
- subscriber.onStart();//note1
- if (!(subscriber instanceof SafeSubscriber)) {
- subscriber = new SafeSubscriber<T>(subscriber);
- }
- try {
- hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);//note2
- return hook.onSubscribeReturn(subscriber);//note3
- } catch (Throwable e) {
- Exceptions.throwIfFatal(e);
- try {
- subscriber.onError(hook.onSubscribeError(e)); //note4
- } catch (Throwable e2) {
- Exceptions.throwIfFatal(e2);
- RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
- hook.onSubscribeError(r);
- throw r;
- }
- return Subscriptions.unsubscribed();
- }
- }
- public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
- return lift(new OperatorMap<T, R>(func));
- }
- public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
- return new Observable<R>(new OnSubscribe<R>() {
- @Override
- public void call(Subscriber<? super R> o) {
- try {
- Subscriber<? super T> st = hook.onLift(operator).call(o); //note1
- try {
- st.onStart(); //note2
- onSubscribe.call(st); //note3
- } catch (Throwable e) {
- Exceptions.throwIfFatal(e);
- st.onError(e); //note4
- }
- } catch (Throwable e) {
- Exceptions.throwIfFatal(e);
- o.onError(e); //note5
- }
- }
- });
- }
- Observable.just("images/logo.png") //note1
- .map(new Func1<String, Bitmap>() {
- @Override
- public Bitmap call(String filePath) {
- return getBitmapFromPath(filePath);
- }
- }) //note2
- .subscribe(new Action1<Bitmap>() { //note3
- @Override
- public void call(Bitmap bitmap) {
- showBitmap(bitmap);
- }
- });
- Observable.just("images/logo.png") // 对应Observable<String>
- .map(new Func1<String, T1>() { }) // 对应Observable<T1>
- .map(new Func1<T1, T2>() { }) // 对应Observable<T2>
- .map(new Func1<T2, R>() { }) // 对应Observable<R>
- .subscribe(new Action1<R>() { // 对应Observable<R>
- @Override
- public void call(R r) {
- //
- };
- Observable.just("images/logo.png") // 对应Observable<String>
- .subscribe(new Action1<R>() { // 对应Observable<R>
- @Override
- public void call(R new Func1<T1, T2>().call (new Func1<T1, T2>().call(new Func1<String, T1>().call(String) ) ) ) {
- };
- public final <R> Observable<R> flatMap(Func1<? super T, ? extends Observable<? extends R>> func) {
- if (getClass() == ScalarSynchronousObservable.class) {
- return ((ScalarSynchronousObservable<T>)this).scalarFlatMap(func); //note1
- }
- return merge(map(func));
- }
- public static <T> Observable<T> merge(Observable<? extends Observable<? extends T>> source) { //note0
- if (source.getClass() == ScalarSynchronousObservable.class) {
- return ((ScalarSynchronousObservable<T>)source).scalarFlatMap((Func1)UtilityFunctions.identity()); //note1
- }
- return source.lift(OperatorMerge.<T>instance(false)); //note2
- }
- public final class OperatorMerge<T> implements Operator<T, Observable<? extends T>>
- public Subscriber<Observable<? extends T>> call(final Subscriber<? super T> child) {
- MergeSubscriber<T> subscriber = new MergeSubscriber<T>(child, delayErrors, maxConcurrent); //note1
- MergeProducer<T> producer = new MergeProducer<T>(subscriber);
- subscriber.producer = producer;
- child.add(subscriber);
- child.setProducer(producer);
- return subscriber;
- }
- static final class MergeSubscriber<T> extends Subscriber<Observable<? extends T>> //该Subscriber接受的参数是一个Observable
- final Subscriber<? super T> child;
- volatile Queue<Object> queue;
- volatile InnerSubscriber<?>[] innerSubscribers;
- public void onNext(Observable<? extends T> t) { //note1
- if (t == null) {
- return;
- }
- if (t instanceof ScalarSynchronousObservable) { //note2
- tryEmit(((ScalarSynchronousObservable<? extends T>)t).get());
- } else {
- InnerSubscriber<T> inner = new InnerSubscriber<T>(this, uniqueId++); //note3
- addInner(inner);
- t.unsafeSubscribe(inner); //note4
- emit(); //note5
- }
- }
- public final Observable<T> observeOn(Scheduler scheduler) {
- return observeOn(scheduler, RxRingBuffer.SIZE);
- }
- public final Observable<T> observeOn(Scheduler scheduler, int bufferSize) {
- return observeOn(scheduler, false, bufferSize);
- }
- public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) { //note1
- if (this instanceof ScalarSynchronousObservable) {
- return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler); //note2
- }
- return lift(new OperatorObserveOn<T>(scheduler, delayError, bufferSize)); //note3
- }
- public final Observable<T> subscribeOn(Scheduler scheduler) {
- if (this instanceof ScalarSynchronousObservable) {
- return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);//note1
- }
- return create(new OperatorSubscribeOn<T>(this, scheduler)); //note2
- }
- public final class OperatorSubscribeOn<T> implements OnSubscribe<T> {
- final Scheduler scheduler;
- final Observable<T> source;
- public OperatorSubscribeOn(Observable<T> source, Scheduler scheduler) {
- this.scheduler = scheduler;
- this.source = source;
- }
- @Override
- public void call(final Subscriber<? super T> subscriber) {
- final Worker inner = scheduler.createWorker(); //inner 工作线程别名。
- inner.schedule(new Action0() { //note1
- @Override
- public void call() {
- final Thread t = Thread.currentThread();
- Subscriber<T> s = new Subscriber<T>(subscriber) { //note2
- @Override
- public void onNext(T t) { subscriber.onNext(t); }
- @Override
- public void onError(Throwable e) { subscriber.onError(e); inner.unsubscribe(); }
- @Override
- public void setProducer(final Producer p) {
- subscriber.setProducer(new Producer() {
- @Override
- public void request(final long n) {
- if (t == Thread.currentThread()) {
- p.request(n);
- } else {
- inner.schedule(new Action0() {
- @Override
- public void call() { p.request(n); }
- });
- } //end of else
- } //end of request
- }); //end of subscriber.setProducer
- } //end of setProducer
- };//end of new Subscriber<T>(subscriber)
- source.unsafeSubscribe(s); //note3
- }//end of inner call
- }); // end of inner.schedule(new Action0()
- }// end of outter call
- }//end of class
- public final Subscription unsafeSubscribe(Subscriber<? super T> subscriber) {
- try {
- subscriber.onStart(); //note1
- onSubscribe.call(subscriber); //note2
- return subscriber;
- } catch (Throwable e) {
- try {
- subscriber.onError(e);
- } catch (Throwable e2) {
- ...
- }
- return Subscriptions.unsubscribed();
- }
- }
- @Override public Subscriber<? super T> call(Subscriber<? super T> child) {
- if (scheduler instanceof ImmediateScheduler) { //note1
- return child;
- } else if (scheduler instanceof TrampolineScheduler) { //note2
- return child;
- } else {
- ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child, delayError, bufferSize); //note3
- parent.init();
- return parent;
- }
- }
- final Subscriber<? super T> child;
- final Scheduler.Worker recursiveScheduler;
- final NotificationLite<T> on;
- final Queue<Object> queue;
- volatile boolean finished; //当前流的状态
- long emitted;//当前被处理的事件数
- final int limit; //处理的门限,达到该值需要重新调用request方法
- final AtomicLong requested = new AtomicLong();
- final AtomicLong counter = new AtomicLong();
- public ObserveOnSubscriber(Scheduler scheduler, Subscriber<? super T> child, boolean delayError, int bufferSize) {
- this.child = child; //note1
- this.recursiveScheduler = scheduler.createWorker(); //note2
- this.on = NotificationLite.instance(); //note3
- this.limit = calculatedSize - (calculatedSize >> 2);
- int calculatedSize = (bufferSize > 0) ? bufferSize : RxRingBuffer.SIZE;
- if (UnsafeAccess.isUnsafeAvailable()) { //note4
- queue = new SpscArrayQueue<Object>(calculatedSize);
- } else {
- queue = new SpscAtomicArrayQueue<Object>(calculatedSize); //note5
- }
- request(calculatedSize); //note6
- }
- void init() {
- Subscriber<? super T> localChild = child;
- localChild.setProducer(new Producer() { //note0
- @Override
- public void request(long n) {
- if (n > 0L) {
- BackpressureUtils.getAndAddRequest(requested, n); //note1
- schedule(); //note2
- }
- }
- });
- localChild.add(recursiveScheduler); //note3
- localChild.add(this); //note4
- }
- protected void schedule() {
- if (counter.getAndIncrement() == 0) {//note1
- recursiveScheduler.schedule(this); //note2
- }
- }
- @Override public void onNext(final T t) {
- if (isUnsubscribed() || finished) {
- return;
- }
- if (!queue.offer(on.next(t))) { //note1
- onError(new MissingBackpressureException());
- return;
- }
- schedule();//note2
- }
- @Override public void onCompleted() {
- if (isUnsubscribed() || finished) {
- return;
- }
- finished = true; //note1
- schedule(); //note2
- }
- @Override public void onError(final Throwable e) {
- if (isUnsubscribed() || finished) {
- RxJavaPlugins.getInstance().getErrorHandler().handleError(e);
- return;
- }
- error = e; //ntoe1
- finished = true; //note1
- schedule(); //note2
- }
- @Override public void call() {
- long missed = 1L; //任务数门限
- long currentEmission = emitted; //当前任务数
- final Queue<Object> q = this.queue; //任务队列
- final Subscriber<? super T> localChild = this.child; //子Subscriber
- final NotificationLite<T> localOn = this.on;
- for (;;) {
- long requestAmount = requested.get(); //Subscriber所能接收的最大事件数
- while (requestAmount != currentEmission) { //当前完成任务数没有达到门限值
- boolean done = finished;
- Object v = q.poll(); //note1
- boolean empty = v == null;
- if (checkTerminated(done, empty, localChild, q)) {//note3
- return;
- }
- if (empty) {
- break;
- }
- localChild.onNext(localOn.getValue(v)); //note2
- currentEmission++;//当前任务数+1
- if (currentEmission == limit) {
- requestAmount = BackpressureUtils.produced(requested, currentEmission);//将requestted值减去当前执行结束的任务数,即得到Subscriber所能接收的最大事件数减少。
- request(currentEmission); //调用Subscriber的request方法参数为当前执行的任务数
- currentEmission = 0L; //当前任务数为0
- }
- }
-
- if (requestAmount == currentEmission) {//Subscriber所能接收的最大事件数等于当前执行完的任务数
- if (checkTerminated(finished, q.isEmpty(), localChild, q)) { //note3
- return;
- }
- }
- emitted = currentEmission; //执行到这里表明还有没有被消费的事件,记录当前消费到的事件位置
- missed = counter.addAndGet(-missed); //note4
- if (missed == 0L) {
- break;
- }
- }
- public abstract Worker createWorker();//note1
- public abstract static class Worker implements Subscription {
- public abstract Subscription schedule(Action0 action);//note2
- public abstract Subscription schedule(final Action0 action, final long delayTime, final TimeUnit unit);
- }
- private static final String THREAD_NAME_PREFIX = "RxNewThreadScheduler-";//note1
- private static final RxThreadFactory THREAD_FACTORY = new RxThreadFactory(THREAD_NAME_PREFIX);//note2
- public Worker createWorker() {
- return new NewThreadWorker(THREAD_FACTORY); //note3
- }
- public NewThreadWorker(ThreadFactory threadFactory) {
- ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, threadFactory); //note1
- ....
- executor = exec;
- }
- public Subscription schedule(final Action0 action) {
- return schedule(action, 0, null);
- }
- public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit) {
- if (isUnsubscribed) {//note2
- return Subscriptions.unsubscribed();
- }
- return scheduleActual(action, delayTime, unit);
- }
- public ScheduledAction scheduleActual(final Action0 action, long delayTime, TimeUnit unit) {
- ScheduledAction run = new ScheduledAction(action); //note1
- Future<?> f;
- if (delayTime <= 0) {
- f = executor.submit(run); //note2
- } else {
- f = executor.schedule(run, delayTime, unit);
- }
- return run;
- }
- public static Scheduler mainThread() {
- Scheduler scheduler = RxAndroidPlugins.getInstance().getSchedulersHook().getMainThreadScheduler(); //note1
- return scheduler != null ? scheduler : MainThreadSchedulerHolder.MAIN_THREAD_SCHEDULER; //note2
- }
- private final Handler handler;
- HandlerScheduler(Handler handler) {
- this.handler = handler;
- }
- public Worker createWorker() {
- return new HandlerWorker(handler);
- }
- static class HandlerWorker extends Worker {
- private final Handler handler;
- HandlerWorker(Handler handler) {
- this.handler = handler;
- }
- @Override
- public Subscription schedule(final Action0 action) {
- return schedule(action, 0, TimeUnit.MILLISECONDS);
- }
- @Override
- public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
- if (compositeSubscription.isUnsubscribed()) {//note0
- return Subscriptions.unsubscribed();
- }
- action = RxAndroidPlugins.getInstance().getSchedulersHook().onSchedule(action); //note1
- final ScheduledAction scheduledAction = new ScheduledAction(action); //note2
- ...
- handler.postDelayed(scheduledAction, unit.toMillis(delayTime)); //note3
- ....
- return scheduledAction;
- }
-
- }
- final T t;
- protected ScalarSynchronousObservable(final T t) {
- super(new OnSubscribe<T>() {
- @Override
- public void call(Subscriber<? super T> s) {
- s.setProducer(createProducer(s, t)); //note1
- }
- });
- this.t = t;
- }
- public static <T> ScalarSynchronousObservable<T> create(T t) {
- return new ScalarSynchronousObservable<T>(t);
- }
- static <T> Producer createProducer(Subscriber<? super T> s, T v) {
- if (STRONG_MODE) { //默认是false
- return new SingleProducer<T>(s, v);
- }
- return new WeakSingleProducer<T>(s, v); 返回这个Producer
- }
- static final class WeakSingleProducer<T> implements Producer{
- final Subscriber<? super T> actual;
- final T value;
- boolean once;
- public void request(long n) {
- if (once) {
- return;
- }
- if (n < 0L) {
- throw new IllegalStateException("n >= required but it was " + n);
- }
- if (n != 0L) {
- once = true;
- Subscriber<? super T> a = actual;
- if (a.isUnsubscribed()) {
- return;
- }
- T v = value;
- try {
- a.onNext(v); //note1
- } catch (Throwable e) {
- Exceptions.throwOrReport(e, a, v);
- return;
- }
- if (a.isUnsubscribed()) {
- return;
- }
- a.onCompleted(); //note2
- }
- }
- }
- public <R> Observable<R> scalarFlatMap(final Func1<? super T, ? extends Observable<? extends R>> func) {
- return create(new OnSubscribe<R>() {
- @Override public void call(final Subscriber<? super R> child) {
- Observable<? extends R> o = func.call(t); //note1
- if (o instanceof ScalarSynchronousObservable) {
- child.setProducer(createProducer(child, ((ScalarSynchronousObservable<? extends R>)o).t)); //note2
- } else {
- o.unsafeSubscribe(child); //note3
- }
- }
- });
- }
- @Override public void call(Subscriber<? super T> child) {
- child.setProducer(new FromArrayProducer<T>(child, array));
- }
- static final class FromArrayProducer<T> extends AtomicLong implements Producer
- final Subscriber<? super T> child;
- final T[] array;
- int index;
- public FromArrayProducer(Subscriber<? super T> child, T[] array) {
- this.child = child;
- this.array = array;
- }
- @Override public void request(long n) {
- if (n < 0) {
- throw new IllegalArgumentException("n >= 0 required but it was " + n);
- }
- if (n == Long.MAX_VALUE) {
- if (BackpressureUtils.getAndAddRequest(this, n) == 0) { //note1
- fastPath();
- }
- } else if (n != 0) {
- if (BackpressureUtils.getAndAddRequest(this, n) == 0) {
- slowPath(n);
- }
- }
- }
- void fastPath() {
- final Subscriber<? super T> child = this.child;
- for (T t : array) {
- if (child.isUnsubscribed()) {
- return;
- }
- child.onNext(t); //note1
- }
- if (child.isUnsubscribed()) {
- return;
- }
- child.onCompleted(); //note2
- }
- void slowPath(long r) {
- final Subscriber<? super T> child = this.child;
- final T[] array = this.array;
- final int n = array.length;
- long e = 0L;
- int i = index; //若是第一次执行这里的index=0
- for (;;) {
- while (r != 0L && i != n) { //note1
- if (child.isUnsubscribed()) {
- return;
- }
- child.onNext(array[i]);
- i++;
- if (i == n) {
- if (!child.isUnsubscribed()) {
- child.onCompleted();
- }
- return;
- }
- r--;
- e--;
- } //note2
- r = get() + e; //note3
- if (r == 0L) { //note2
- index = i; //记录当前执行到的任务数
- r = addAndGet(e);
- if (r == 0L) {
- return;
- }
- e = 0L;
- }
- }
- }
- private final SubscriptionList subscriptions; //构造器中赋初值
- private final Subscriber<?> subscriber; //构造器中赋初值
- private Producer producer; //事件制造者
- private long requested = NOT_SET; //最大接受任务数
- private static final Long NOT_SET = Long.MIN_VALUE;
- protected Subscriber() { this(null, false); }
- protected Subscriber(Subscriber<?> subscriber) { this(subscriber, true); }
- protected Subscriber(Subscriber<?> subscriber, boolean shareSubscriptions) {
- this.subscriber = subscriber;
- this.subscriptions = shareSubscriptions && subscriber != null ? subscriber.subscriptions : new SubscriptionList(); //note1
- }
- public final void add(Subscription s) {
- subscriptions.add(s);
- }
- <pre name="code" class="java">public void onStart() {
- // do nothing by default
- }
- @Override public final void unsubscribe() {
- subscriptions.unsubscribe();
- }
- @Override public final boolean isUnsubscribed() {
- return subscriptions.isUnsubscribed();
- }
- protected final void request(long n) {
- if (n < 0) {
- throw new IllegalArgumentException("number requested cannot be negative: " + n);
- }
- Producer producerToRequestFrom = null;
- synchronized (this) {
- if (producer != null) {
- producerToRequestFrom = producer;
- } else {
- addToRequested(n); //note1
- return;
- }
- }
- producerToRequestFrom.request(n); //note2
- }
- private void addToRequested(long n) {
- if (requested == NOT_SET) {
- requested = n;
- } else {
- final long total = requested + n;
- if (total < 0) {
- requested = Long.MAX_VALUE; //note1
- } else {
- requested = total;
- }
- }
- }
- public void setProducer(Producer p) {
- long toRequest;
- boolean passToSubscriber = false;
- synchronized (this) {
- toRequest = requested; //note1
- producer = p;
- if (subscriber != null) {//一般情况下该结果为假
- if (toRequest == NOT_SET) {
- passToSubscriber = true;
- }
- }
- }
- if (passToSubscriber) {//一般情况该行结果为假
- subscriber.setProducer(producer);
- } else {
- if (toRequest == NOT_SET) { //note2
- producer.request(Long.MAX_VALUE);
- } else {
- producer.request(toRequest);
- }
- }
-
- import rx.Observable;
- import rx.subjects.PublishSubject;
- import rx.subjects.SerializedSubject;
- import rx.subjects.Subject;
- public class RxBus {
- private final Subject<Object, Object> _bus = new SerializedSubject<>(PublishSubject.create());
- public void send(Object o) { _bus.onNext(o); }
- public Observable<Object> toObserverable() { return _bus; }
- public boolean hasObservers() { return _bus.hasObservers(); }
- }
- public class SerializedSubject<T, R> extends Subject<T, R> {
- private final SerializedObserver<T> observer;
- private final Subject<T, R> actual;
- public SerializedSubject(final Subject<T, R> actual) {
- super(new OnSubscribe<R>() {
- @Override
- public void call(Subscriber<? super R> child) {
- actual.unsafeSubscribe(child);
- }
- });
- this.actual = actual;
- this.observer = new SerializedObserver<T>(actual);
- }
- @Override public void onCompleted() { observer.onCompleted(); }
- @Override public void onError(Throwable e) { observer.onError(e); }
- @Override public void onNext(T t) { observer.onNext(t); }
- @Override public boolean hasObservers() { return actual.hasObservers(); }
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。