0

0

java9新特性Reactive Stream响应式编程API怎么用

王林

王林

发布时间:2023-05-10 13:46:06

|

1841人浏览过

|

来源于亿速云

转载

一、Java9 Reactive Stream API

java 9提供了一组定义响应式流编程的接口。所有这些接口都作为静态内部接口定义在java.util.concurrent.flow类里面。

java9新特性Reactive Stream响应式编程API怎么用

下面是Java 响应式编程中的一些重要角色和概念,先简单理解一下

发布者(Publisher)是潜在的无限数量的有序数据元素的生产者。 它根据收到的需求(subscription)向当前订阅者发布一定数量的数据元素。

订阅者(Subscriber)从发布者那里订阅并接收数据元素。与发布者建立订阅关系后,发布者向订阅者发送订阅令牌(subscription),订阅者可以根据自己的处理能力请求发布者发布数据元素的数量。

立即学习Java免费学习笔记(深入)”;

订阅令牌(subscription)表示订阅者与发布者之间建立的订阅关系。 当建立订阅关系后,发布者将其传递给订阅者。 订阅者使用订阅令牌与发布者进行交互,例如请求数据元素的数量或取消订阅。

二、Java响应式编程四大接口

2.1.Subscriber Interface(订阅者订阅接口)

public static interface Subscriber {
    public void onSubscribe(Subscription subscription);
    public void onNext(T item);
    public void onError(Throwable throwable);
    public void onComplete();
}

onSubscribe:在发布者接受订阅者的订阅动作之后,发布任何的订阅消息之前被调用。新创建的Subscription订阅令牌对象通过此方法传递给订阅者。

onNext:下一个待处理的数据项的处理函数

onError:在发布者或订阅遇到不可恢复的错误时调用

onComplete:当没有订阅者调用(包括onNext()方法)发生时调用。

2.2.Subscription Interface (订阅令牌接口)

订阅令牌对象通过Subscriber.onSubscribe()方法传递

public static interface Subscription {    public void request(long n);    public void cancel();}

request(long n)是无阻塞背压概念背后的关键方法。订阅者使用它来请求n个以上的消费项目。这样,订阅者控制了它当前能够接收多少个数据。cancel()由订阅者主动来取消其订阅,取消后将不会在接收到任何数据消息。

BlackBox AI
BlackBox AI

AI编程助手,智能对话问答助手

下载

2.3.Publisher Interface(发布者接口)

@FunctionalInterface
public static interface Publisher {
    public void subscribe(Subscriber subscriber);
}

调用该方法,建立订阅者Subscriber与发布者Publisher之间的消息订阅关系。

2.4.Processor Interface(处理器接口)

处理者Processor 可以同时充当订阅者和发布者,起到转换发布者——订阅者管道中的元素的作用。用于将发布者T类型的数据元素,接收并转换为类型R的数据并发布。

public static interface Processor extends Subscriber, Publisher {
}

二、实战案例

现在我们要去实现上面的四个接口来完成响应式编程

Subscription Interface订阅令牌接口通常不需要我们自己编程去实现,我们只需要在知道request()方法和cancle()方法含义即可。

Publisher Interface发布者接口,Java 9 已经默认为我们提供了实现SubmissionPublisher,该实现类除了实现Publisher接口的方法外,提供了一个方法叫做submit()来完成消息数据的发送。

Subscriber Interface订阅者接口,通常需要我们自己去实现。因为在数据订阅接收之后,不同的业务有不同的处理逻辑。

Processor实际上是 Publisher Interface和Subscriber Interface的集合体,有需要数据类型转换及数据处理的需求才去实现这个接口

下面的例子实现的式字符串的数据消息订阅处理

实现订阅者Subscriber Interface

import java.util.concurrent.Flow;
public class MySubscriber implements Flow.Subscriber {
  private Flow.Subscription subscription;  //订阅令牌
  @Override
  public void onSubscribe(Flow.Subscription subscription) {
      System.out.println("订阅关系建立onSubscribe: " + subscription);
      this.subscription = subscription;
      subscription.request(2);
  }
  @Override
  public void onNext(String item) {
      System.out.println("item: " + item);
      // 一个消息处理完成之后,可以继续调用subscription.request(n);向发布者要求数据发送
      //subscription.request(n);
  }
  @Override
  public void onError(Throwable throwable) {
      System.out.println("onError: " + throwable);
  }
  @Override
  public void onComplete() {
      System.out.println("onComplete");
  }
}

SubmissionPublisher消息发布者

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
public class SubmissionPublisherExample {
  public static void main(String[] args) throws InterruptedException {
      ExecutorService executor = Executors.newFixedThreadPool(1);
      SubmissionPublisher sb = new SubmissionPublisher<>(executor, Flow.defaultBufferSize());
      sb.subscribe(new MySubscriber());   //建立订阅关系,可以有多个订阅者
      sb.submit("数据 1");  //发送消息1
      sb.submit("数据 2"); //发送消息2
      sb.submit("数据 3"); //发送消息3
      executor.shutdown();
  }
}

控制台打印输出结果

订阅关系建立onSubscribe: java.util.concurrent.SubmissionPublisher$BufferedSubscription@27e81a39item: 数据 1item: 数据 2

请注意:即使发布者submit了3条数据,MySubscriber也仅收到了2条数据进行了处理。是因为我们在MySubscriber#onSubscribe()方法中使用了subscription.request(2);。这就是“背压”的响应式编程效果,我有能力处理多少数据,就会通知消息发布者给多少数据。

热门AI工具

更多
DeepSeek
DeepSeek

幻方量化公司旗下的开源大模型平台

豆包大模型
豆包大模型

字节跳动自主研发的一系列大型语言模型

通义千问
通义千问

阿里巴巴推出的全能AI助手

腾讯元宝
腾讯元宝

腾讯混元平台推出的AI助手

文心一言
文心一言

文心一言是百度开发的AI聊天机器人,通过对话可以生成各种形式的内容。

讯飞写作
讯飞写作

基于讯飞星火大模型的AI写作工具,可以快速生成新闻稿件、品宣文案、工作总结、心得体会等各种文文稿

即梦AI
即梦AI

一站式AI创作平台,免费AI图片和视频生成。

ChatGPT
ChatGPT

最最强大的AI聊天机器人程序,ChatGPT不单是聊天机器人,还能进行撰写邮件、视频脚本、文案、翻译、代码等任务。

相关专题

更多
数据类型有哪几种
数据类型有哪几种

数据类型有整型、浮点型、字符型、字符串型、布尔型、数组、结构体和枚举等。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

309

2023.10.31

php数据类型
php数据类型

本专题整合了php数据类型相关内容,阅读专题下面的文章了解更多详细内容。

222

2025.10.31

js 字符串转数组
js 字符串转数组

js字符串转数组的方法:1、使用“split()”方法;2、使用“Array.from()”方法;3、使用for循环遍历;4、使用“Array.split()”方法。本专题为大家提供js字符串转数组的相关的文章、下载、课程内容,供大家免费下载体验。

298

2023.08.03

js截取字符串的方法
js截取字符串的方法

js截取字符串的方法有substring()方法、substr()方法、slice()方法、split()方法和slice()方法。本专题为大家提供字符串相关的文章、下载、课程内容,供大家免费下载体验。

212

2023.09.04

java基础知识汇总
java基础知识汇总

java基础知识有Java的历史和特点、Java的开发环境、Java的基本数据类型、变量和常量、运算符和表达式、控制语句、数组和字符串等等知识点。想要知道更多关于java基础知识的朋友,请阅读本专题下面的的有关文章,欢迎大家来php中文网学习。

1501

2023.10.24

字符串介绍
字符串介绍

字符串是一种数据类型,它可以是任何文本,包括字母、数字、符号等。字符串可以由不同的字符组成,例如空格、标点符号、数字等。在编程中,字符串通常用引号括起来,如单引号、双引号或反引号。想了解更多字符串的相关内容,可以阅读本专题下面的文章。

624

2023.11.24

java读取文件转成字符串的方法
java读取文件转成字符串的方法

Java8引入了新的文件I/O API,使用java.nio.file.Files类读取文件内容更加方便。对于较旧版本的Java,可以使用java.io.FileReader和java.io.BufferedReader来读取文件。在这些方法中,你需要将文件路径替换为你的实际文件路径,并且可能需要处理可能的IOException异常。想了解更多java的相关内容,可以阅读本专题下面的文章。

633

2024.03.22

php中定义字符串的方式
php中定义字符串的方式

php中定义字符串的方式:单引号;双引号;heredoc语法等等。想了解更多字符串的相关内容,可以阅读本专题下面的文章。

588

2024.04.29

Golang 网络安全与加密实战
Golang 网络安全与加密实战

本专题系统讲解 Golang 在网络安全与加密技术中的应用,包括对称加密与非对称加密(AES、RSA)、哈希与数字签名、JWT身份认证、SSL/TLS 安全通信、常见网络攻击防范(如SQL注入、XSS、CSRF)及其防护措施。通过实战案例,帮助学习者掌握 如何使用 Go 语言保障网络通信的安全性,保护用户数据与隐私。

2

2026.01.29

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
Kotlin 教程
Kotlin 教程

共23课时 | 3万人学习

C# 教程
C# 教程

共94课时 | 7.9万人学习

Java 教程
Java 教程

共578课时 | 52.8万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号