0

0

深入浅出RxJava_02[订阅深入和异常处理]的代码示例

黄舟

黄舟

发布时间:2017-03-04 09:41:19

|

1229人浏览过

|

来源于php中文网

原创

本教程基于RxJava1.x版本进行全面讲解,后续课程将陆续更新,敬请关注…

8.可连接的被观察者

前几节提到的了响应用过程中的三个细节:被观察者 观察者 和订阅。 接下来这一节继续理解下订阅的其他知识点。

这里介绍一个叫做ConnectableObservable的可连接被观察者。一个可连接的Observable与普通的Observable差不多,除了这一点:可连接的Observable在被订阅时并不开始发射数据,只有在它的connect()被调用时才开始。用这种方法,你可以等所有的潜在订阅者都订阅了这个Observable之后才开始发射数据。

Publish

该操作符可以将普通的Observable转化成可连接的Observable。

可连接的Observable (connectable Observable)与普通的Observable差不多,不过它并不会在被订阅时开始发射数据,而是直到使用了Connect操作符时才会开始。用这种方法,你可以在任何时候让一个Observable开始发射数据。

RxJava中connect是ConnectableObservable接口的一个方法,使用publish操作符可以将一个普通的Observable转换为一个ConnectableObservable。

立即学习Java免费学习笔记(深入)”;

//创建了一个普通的Observable对象
Observable observable =
        Observable.create(new Observable.OnSubscribe() {
            @Override
            public void call(Subscriber subscriber) {
                subscriber.onNext(1);
                subscriber.onNext(2);
                subscriber.onNext(3);
            }
        });

//将一个被观察者转换成一个可连接的被观察者
ConnectableObservable connectableObservable =observable.publish();

//为可连接的被观察者订阅事件,但这里并不会马上发送事件
connectableObservable.subscribe(new Action1() {
    @Override
    public void call(Integer integer) {
        Log.i(TAG, "call: "+integer);
    }
});

与普通的Observable对象订阅不同,上面的代码中并没直接调用Action1对象的call()方法。

Connect

可连接的Observable (connectable Observable)与普通的Observable差不多,不过它并不会在被订阅时开始发射数据,而是直到使用了Connect操作符时才会开始。用这个方法,你可以等待所有的观察者都订阅了Observable之后再开始发射数据。

//创建了一个普通的Observable对象
Observable observable =
        Observable.create(new Observable.OnSubscribe() {
            @Override
            public void call(Subscriber subscriber) {
                subscriber.onNext(1);
                subscriber.onNext(2);
                subscriber.onNext(3);
            }
        });

//将一个被观察者转换成一个可连接的被观察者
ConnectableObservable connectableObservable =observable.publish();

//为可连接的被观察者订阅事件,但这里并不会马上发送事件
connectableObservable.subscribe(new Action1() {
    @Override
    public void call(Integer integer) {
        Log.i(TAG, "call: "+integer);
    }
});

//当调用可连接的被观察者connect()方法后 开始发送所有的数据。
connectableObservable.connect(new Action1() {
    @Override
    public void call(Subscription subscription) {
        Log.i(TAG, "call: "+subscription);
    }
});

输出:

IT520: call: OperatorPublish$PublishSubscriber@20dce78
IT520: call: 1
IT520: call: 2
IT520: call: 3

RefCount

RefCount操作符可以让一个可连接的Observable转换为普通的Observable。

//创建了一个普通的Observable对象
Observable observable =
        Observable.create(new Observable.OnSubscribe() {
            @Override
            public void call(Subscriber subscriber) {
                subscriber.onNext(1);
                subscriber.onNext(2);
                subscriber.onNext(3);
            }
        });

//将一个被观察者转换成一个可连接的被观察者
ConnectableObservable connectableObservable =observable.publish();

//将一个可链接的被观察者转换成一个普通观察者
Observable integerObservable = connectableObservable.refCount();


//为可连接的被观察者订阅事件,一订阅就马上发送数据并打印出 1 2 3...
integerObservable.subscribe(new Action1() {
    @Override
    public void call(Integer integer) {
        Log.i(TAG, "call: "+integer);
    }
});

Replay

保证所有的观察者收到相同的数据序列,即使它们在Observable开始发射数据之后才订阅.

先来看一个例子:

Observable observable =
            Observable.create(new Observable.OnSubscribe() {
                @Override
                public void call(Subscriber subscriber) {
                    subscriber.onNext(1);
                    subscriber.onNext(2);
                    subscriber.onNext(3);
                }
            });

ConnectableObservable connectableObservable =observable.publish();

connectableObservable.subscribe(new Action1() {
    @Override
    public void call(Integer integer) {
        Log.i(TAG, "call--1--: "+integer);
    }
});

connectableObservable.connect();

connectableObservable.subscribe(new Action1() {
    @Override
    public void call(Integer integer) {
        Log.i(TAG, "call--2--: "+integer);
    }
});

输出:

com.m520it.rxjava I/IT520: call--1--: 1
com.m520it.rxjava I/IT520: call--1--: 2
com.m520it.rxjava I/IT520: call--1--: 3

首先我们通过publish()将一个普通的Observable转换成ConnectableObservable。当调用connect()的时候,则connect()上面已经订阅的观察者会收到数据。而connect()后面订阅的观察者则无法接收到数据。 如果我们想让所有的观察者在调用connect()的时候同时接收到数据而跟订阅的顺序无关,则需要通过replay()。

Observable observable =
        Observable.create(new Observable.OnSubscribe() {
            @Override
            public void call(Subscriber subscriber) {
                subscriber.onNext(1);
                subscriber.onNext(2);
                subscriber.onNext(3);
            }
        });

//这里将publish()改为replay()
ConnectableObservable connectableObservable =observable.replay();

connectableObservable.subscribe(new Action1() {
    @Override
    public void call(Integer integer) {
        Log.i(TAG, "call--1--: "+integer);
    }
});

connectableObservable.connect();

connectableObservable.subscribe(new Action1() {
    @Override
    public void call(Integer integer) {
        Log.i(TAG, "call--2--: "+integer);
    }
});

输出:

com.m520it.rxjava I/IT520: call--1--: 1
com.m520it.rxjava I/IT520: call--1--: 2
com.m520it.rxjava I/IT520: call--1--: 3
com.m520it.rxjava I/IT520: call--2--: 1
com.m520it.rxjava I/IT520: call--2--: 2
com.m520it.rxjava I/IT520: call--2--: 3

9.“冷Observable”&“热Observable”

前面我们提到,订阅的时候(如果观察者有发送数据的),观察者有直接接收数据的,有等过了一段时间才接收数据的。

  • 我们将一订阅观察者就马上能接收数据的观察者称之为“热Observable”。

  • 如上面的ConnectableObservable就算被订阅后,也没能发送数据,只有调用connect()才能让观察者接收到数据。我们称该观察者为“冷Observable”

10.错误处理

很多操作符可用于对Observable发射的onError通知做出响应或者从错误中恢复

Catch操作符拦截原始Observable的onError通知,将它替换为其它的数据项或数据序列,让产生的Observable能够正常终止或者根本不终止。

RxJava将Catch实现为三个不同的操作符:

自学 PHP、MySQL和Apache
自学 PHP、MySQL和Apache

本书将PHP开发与MySQL应用相结合,分别对PHP和MySQL做了深入浅出的分析,不仅介绍PHP和MySQL的一般概念,而且对PHP和MySQL的Web应用做了较全面的阐述,并包括几个经典且实用的例子。 本书是第4版,经过了全面的更新、重写和扩展,包括PHP5.3最新改进的特性(例如,更好的错误和异常处理),MySQL的存储过程和存储引擎,Ajax技术与Web2.0以及Web应用需要注意的安全

下载

onErrorReturn

onErrorReturn方法返回一个镜像原有Observable行为的新Observable,后者会忽略前者的onError调用,不会将错误传递给观察者,作为替代,它会发发射一个特殊的项并调用观察者的onCompleted方法。

下面的代码发送1,2,3 并在发送的过程中模拟发送一个异常,只要有异常发送,onErrorReturn()就会被调用 并发送44.代码如下:

Observable.create(new Observable.OnSubscribe() {
        @Override
        public void call(Subscriber subscriber) {
            subscriber.onNext(1);
            subscriber.onNext(2);
            subscriber.onError(new NullPointerException("mock exception !"));
            subscriber.onNext(3);
        }
    }).onErrorReturn(new Func1() {
        @Override
        public Integer call(Throwable throwable) {
            return 44;
        }
    }).subscribe(new Action1() {
        @Override
        public void call(Integer integer) {
            Log.i(TAG, "call: "+integer);
        }
    });

输出:

com.m520it.rxjava I/IT520: call: 1
com.m520it.rxjava I/IT520: call: 2
com.m520it.rxjava I/IT520: call: 44

onErrorResumeNext

让Observable在遇到错误时开始发射第二个Observable的数据序列。

下面的代码在发送的时候,模拟发送一个异常。接着onErrorResumeNext就会被调用 并开始发射新的Observable对象。

Observable.create(new Observable.OnSubscribe() {
    @Override
    public void call(Subscriber subscriber) {
        subscriber.onNext(1);
        subscriber.onNext(2);
        subscriber.onError(new NullPointerException("mock exception !"));
        subscriber.onNext(3);
    }
}).onErrorResumeNext(new Func1>() {
    @Override
    public Observable call(Throwable throwable) {
            Observable innerObservable =
                    Observable.create(new Observable.OnSubscribe() {
                        @Override
                        public void call(Subscriber subscriber) {
                            subscriber.onNext(4);
                            subscriber.onNext(5);
                        }
                    });
        return innerObservable;
    }
}).subscribe(new Action1() {
    @Override
    public void call(Integer integer) {
        Log.i(TAG, "call: "+integer);
    }
});

输出:

com.m520it.rxjava I/IT520: call: 1
com.m520it.rxjava I/IT520: call: 2
com.m520it.rxjava I/IT520: call: 3
com.m520it.rxjava I/IT520: call: 4
com.m520it.rxjava I/IT520: call: 5

onExceptionResumeNext

让Observable在遇到错误时继续发射后面的数据项。

//创建一个错误处理的Observable对象
Observable exceptionObserver = Observable
        .create(new Observable.OnSubscribe() {
            @Override
            public void call(Subscriber subscriber) {
                subscriber.onNext(55);
                subscriber.onNext(66);
            }
        });

Observable
        .create(new Observable.OnSubscribe() {
            @Override
            public void call(Subscriber subscriber) {
                subscriber.onNext(1);
                subscriber.onNext(2);
                subscriber.onError(new NullPointerException("mock exception !"));
                subscriber.onNext(3);
            }
        })
        //上面的代码发送的过程中出现了异常,该方法就会被调用 并发射exceptionObserver
        .onExceptionResumeNext(exceptionObserver)
        .subscribe(new Action1() {
            @Override
            public void call(Integer integer) {
                Log.i(TAG, "call: "+integer);
            }
        });

输出:

com.m520it.rxjava I/IT520: call: 1
com.m520it.rxjava I/IT520: call: 2
com.m520it.rxjava I/IT520: call: 55
com.m520it.rxjava I/IT520: call: 66

Retry重试机制

如果原始Observable遇到错误,重新订阅它并期望它能正常终止。

RxJava中的实现为retry和retryWhen。

Observable.create(new Observable.OnSubscribe() {
        @Override
        public void call(Subscriber subscriber) {
            subscriber.onNext(1);
            subscriber.onNext(2);
            subscriber.onError(new NullPointerException("mock exception !"));
            subscriber.onNext(3);
        }
    })
    .retry(3)//重复3次订阅
    .subscribe(new Action1() {
        @Override
        public void call(Integer integer) {
            Log.i(TAG, "call: "+integer);
        }
    });

类似的函数还有:

  • Javadoc: retry())  无论收到多少次onError通知,都会继续订阅并发射原始Observable。

  • Javadoc: retry(long)) retry会最多重新订阅指定的次数,如果次数超了,不会尝试再次订阅

  • Javadoc: retry(Func2))

  • retryWhen

 以上就是深入浅出RxJava_02[订阅深入和异常处理]的代码示例的内容,更多相关内容请关注PHP中文网(www.php.cn)!

相关文章

java速学教程(入门到精通)
java速学教程(入门到精通)

java怎么学习?java怎么入门?java在哪学?java怎么学才快?不用担心,这里为大家提供了java速学教程(入门到精通),有需要的小伙伴保存下载就能学习啦!

下载

本站声明:本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn

热门AI工具

更多
DeepSeek
DeepSeek

幻方量化公司旗下的开源大模型平台

豆包大模型
豆包大模型

字节跳动自主研发的一系列大型语言模型

通义千问
通义千问

阿里巴巴推出的全能AI助手

腾讯元宝
腾讯元宝

腾讯混元平台推出的AI助手

文心一言
文心一言

文心一言是百度开发的AI聊天机器人,通过对话可以生成各种形式的内容。

讯飞写作
讯飞写作

基于讯飞星火大模型的AI写作工具,可以快速生成新闻稿件、品宣文案、工作总结、心得体会等各种文文稿

即梦AI
即梦AI

一站式AI创作平台,免费AI图片和视频生成。

ChatGPT
ChatGPT

最最强大的AI聊天机器人程序,ChatGPT不单是聊天机器人,还能进行撰写邮件、视频脚本、文案、翻译、代码等任务。

相关专题

更多
AO3官网入口与中文阅读设置 AO3网页版使用与访问
AO3官网入口与中文阅读设置 AO3网页版使用与访问

本专题围绕 Archive of Our Own(AO3)官网入口展开,系统整理 AO3 最新可用官网地址、网页版访问方式、正确打开链接的方法,并详细讲解 AO3 中文界面设置、阅读语言切换及基础使用流程,帮助用户稳定访问 AO3 官网,高效完成中文阅读与作品浏览。

8

2026.02.02

主流快递单号查询入口 实时物流进度一站式追踪专题
主流快递单号查询入口 实时物流进度一站式追踪专题

本专题聚合极兔快递、京东快递、中通快递、圆通快递、韵达快递等主流物流平台的单号查询与运单追踪内容,重点解决单号查询、手机号查物流、官网入口直达、包裹进度实时追踪等高频问题,帮助用户快速获取最新物流状态,提升查件效率与使用体验。

3

2026.02.02

Golang WebAssembly(WASM)开发入门
Golang WebAssembly(WASM)开发入门

本专题系统讲解 Golang 在 WebAssembly(WASM)开发中的实践方法,涵盖 WASM 基础原理、Go 编译到 WASM 的流程、与 JavaScript 的交互方式、性能与体积优化,以及典型应用场景(如前端计算、跨平台模块)。帮助开发者掌握 Go 在新一代 Web 技术栈中的应用能力。

1

2026.02.02

PHP Swoole 高性能服务开发
PHP Swoole 高性能服务开发

本专题聚焦 PHP Swoole 扩展在高性能服务端开发中的应用,系统讲解协程模型、异步IO、TCP/HTTP/WebSocket服务器、进程与任务管理、常驻内存架构设计。通过实战案例,帮助开发者掌握 使用 PHP 构建高并发、低延迟服务端应用的工程化能力。

1

2026.02.02

Java JNI 与本地代码交互实战
Java JNI 与本地代码交互实战

本专题系统讲解 Java 通过 JNI 调用 C/C++ 本地代码的核心机制,涵盖 JNI 基本原理、数据类型映射、内存管理、异常处理、性能优化策略以及典型应用场景(如高性能计算、底层库封装)。通过实战示例,帮助开发者掌握 Java 与本地代码混合开发的完整流程。

1

2026.02.02

go语言 注释编码
go语言 注释编码

本专题整合了go语言注释、注释规范等等内容,阅读专题下面的文章了解更多详细内容。

61

2026.01.31

go语言 math包
go语言 math包

本专题整合了go语言math包相关内容,阅读专题下面的文章了解更多详细内容。

52

2026.01.31

go语言输入函数
go语言输入函数

本专题整合了go语言输入相关教程内容,阅读专题下面的文章了解更多详细内容。

25

2026.01.31

golang 循环遍历
golang 循环遍历

本专题整合了golang循环遍历相关教程,阅读专题下面的文章了解更多详细内容。

31

2026.01.31

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
麻省理工大佬Python课程
麻省理工大佬Python课程

共34课时 | 5.2万人学习

【web前端】Node.js快速入门
【web前端】Node.js快速入门

共16课时 | 2万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号