本文详述RxJava的基础用法, RxJava是一个用法很简单的神奇框架,但内部实现有点复杂,代码逻辑有点绕。网上关于RxJava源码分析的文章,源码贴很少且不全,下面罗列了完整的源码解析,供参考。
1.rxjava 基础用法
Observable.create(new Observable.OnSubscribe
2.先看 .subscribe(new Observer
public final Subscription subscribe(final Observer super T> observer) {
if (observer instanceof Subscriber) {
return subscribe((Subscriber super T>)observer);
}
if (observer == null) {
throw new NullPointerException("observer is null");
}
return subscribe(new ObserverSubscriber(observer));
} 这里只是将传进来的observer 参数进行了简单的封装(ObserverableSubscriber)
继续看subscribe 方法
public final Subscription subscribe(Subscriber super T> subscriber) {
return Observable.subscribe(subscriber, this);
}staticSubscription subscribe(Subscriber super T> subscriber, Observable observable) { // validate and proceed if (subscriber == null) { throw new IllegalArgumentException("subscriber can not be null"); } if (observable.onSubscribe == null) { throw new IllegalStateException("onSubscribe function can not be null."); /* * the subscribe function can also be overridden but generally that's not the appropriate approach * so I won't mention that in the exception */ } // new Subscriber so onStart it subscriber.onStart(); /* * See https://github.com/ReactiveX/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls * to user code from within an Observer" */ // if not already wrapped if (!(subscriber instanceof SafeSubscriber)) { // assign to `observer` so we return the protected version subscriber = new SafeSubscriber (subscriber); } // The code below is exactly the same an unsafeSubscribe but not used because it would // add a significant depth to already huge call stacks. try { // allow the hook to intercept and/or decorate RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber); return RxJavaHooks.onObservableReturn(subscriber); } catch (Throwable e) { // special handling for certain Throwable/Error/Exception types Exceptions.throwIfFatal(e); // in case the subscriber can't listen to exceptions anymore if (subscriber.isUnsubscribed()) { RxJavaHooks.onError(RxJavaHooks.onObservableError(e)); } else { // if an unhandled error occurs executing the onSubscribe we will propagate it try { subscriber.onError(RxJavaHooks.onObservableError(e)); } catch (Throwable e2) { Exceptions.throwIfFatal(e2); // if this happens it means the onError itself failed (perhaps an invalid function implementation) // so we are unable to propagate the error correctly and will just throw RuntimeException r = new OnErrorFailedException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2); // TODO could the hook be the cause of the error in the on error handling. RxJavaHooks.onObservableError(r); // TODO why aren't we throwing the hook's return value. throw r; // NOPMD } } return Subscriptions.unsubscribed(); } }
这里面只需要注意
立即学习“Java免费学习笔记(深入)”;
RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);
public staticObservable.OnSubscribe onObservableStart(Observable instance, Observable.OnSubscribe onSubscribe) { Func2 f = onObservableStart; if (f != null) { return f.call(instance, onSubscribe); } return onSubscribe; }
RxJavaHooks.onObservableStart(observable, observable.onSubscribe) 这个方法返回的是它的第二个参数,也就是Observable它自己的onSubscribe 对象, 所以在subscribe 方法里面调用了 onSubscribe.call(subscriber)方法
这里的subscriber 就是传进来的参数
protected Observable(OnSubscribef) { this.onSubscribe = f; }
public staticObservable create(OnSubscribe f) { return new Observable (RxJavaHooks.onCreate(f)); }
可以看出onSubscribe 对象 是create 传进来的参数,那么整个流程就很清楚了
只有调用了subscribe 方法整个流程才会执行 :subscribe ===>调用 onSubscribe.call(observer) 方法 同时也把observer 传递进去了
相关推荐:











