0

0

SmallRye Mutiny 异步处理事件时订阅无响应的解决方案

聖光之護

聖光之護

发布时间:2025-08-16 16:54:02

|

493人浏览过

|

来源于php中文网

原创

smallrye mutiny 异步处理事件时订阅无响应的解决方案

在使用 SmallRye Mutiny 进行异步事件处理时,有时会遇到订阅者(Subscriber)无法接收到事件的情况,导致 onNext 方法未被调用的问题。这通常是由于 Reactive Streams 的背压机制导致的。理解并正确处理背压是解决此类问题的关键。

背压机制

Reactive Streams 规范引入了背压机制,用于控制数据流的速度,避免生产者(Publisher)产生数据的速度超过消费者(Subscriber)的处理能力,从而导致资源耗尽或程序崩溃。在这种机制下,消费者需要显式地向生产者请求数据,生产者才会发送相应的数据。

解决方案一:手动请求数据

当使用标准的 Subscriber 接口时,需要在 onSubscribe 方法中保存 Subscription 对象,并在 onNext 方法中调用 subscription.request(long) 方法,显式地请求下一个数据。request(long) 方法的参数表示请求的数据量。通常情况下,每次处理完一个数据后,请求下一个数据即可。

以下是修改后的代码示例:

import io.smallrye.mutiny.Multi;
import org.reactivestreams.Subscription;
import org.reactivestreams.Subscriber;
import java.util.concurrent.Executor;

public class MutinyExample {

    private final Executor managedExecutor;

    public MutinyExample(Executor managedExecutor) {
        this.managedExecutor = managedExecutor;
    }

    public void writeTo(Multi events) {
        events
            .runSubscriptionOn(managedExecutor)
            .subscribe()
            .withSubscriber(
                new Subscriber() {
                    private Subscription subscription;

                    @Override
                    public void onSubscribe(Subscription s) {
                        System.out.println("OnSubscription Method");
                        System.out.println("ON SUBS END");
                        subscription = s;
                        subscription.request(1); // 请求第一个数据
                    }

                    @Override
                    public void onNext(String event) {
                        System.out.println("On Next Method: " + event);
                        subscription.request(1); // 处理完一个数据后,请求下一个数据
                    }

                    @Override
                    public void onError(Throwable t) {
                        System.out.println("OnError Method: " + t.getMessage());
                    }

                    @Override
                    public void onComplete() {
                        System.out.println("On Complete Method");
                    }
                });
    }

    public static void main(String[] args) throws InterruptedException {
        // 模拟一个 Multi
        Multi events = Multi.createFrom().items("Event 1", "Event 2", "Event 3");

        // 创建一个 Executor (这里使用一个简单的 Executor)
        Executor executor = Runnable::run;

        // 创建 MutinyExample 实例
        MutinyExample example = new MutinyExample(executor);

        // 调用 writeTo 方法
        example.writeTo(events);

        // 等待一段时间,确保异步操作完成
        Thread.sleep(1000);
    }
}

注意事项:

千面数字人
千面数字人

千面 Avatar 系列:音频转换让静图随声动起来,动作模仿让动漫复刻真人动作,操作简单,满足多元创意需求。

下载
  • 务必在 onSubscribe 方法中保存 Subscription 对象。
  • 在 onNext 方法中处理完数据后,必须调用 subscription.request(long) 方法请求下一个数据。
  • 如果生产者发送的数据量很大,可以根据消费者的处理能力调整 request(long) 方法的参数。

解决方案二:使用 SmallRye 提供的简化 API

SmallRye Mutiny 提供了更简洁的 API 来处理事件流,避免了手动管理 Subscription 对象的麻烦。可以使用 onSubscription、onItem、onFailure 和 onCompletion 方法来注册相应的回调函数。

以下是使用简化 API 的代码示例:

import io.smallrye.mutiny.Multi;
import java.util.concurrent.Executor;

public class MutinyExampleSimplified {

    private final Executor managedExecutor;

    public MutinyExampleSimplified(Executor managedExecutor) {
        this.managedExecutor = managedExecutor;
    }

    public void writeTo(Multi events) {
        events
            .runSubscriptionOn(managedExecutor)
            .onSubscription()
                .invoke(() -> {
                    System.out.println("OnSubscription Method");
                    System.out.println("ON SUBS END");
                })
            .onItem()
                .invoke(event -> System.out.println("On Next Method: " + event))
            .onFailure()
                .invoke(t -> System.out.println("OnError Method: " + t.getMessage()))
            .onCompletion()
                .invoke(() -> System.out.println("On Complete Method"))
            .subscribe()
                .with(value -> {});
    }

    public static void main(String[] args) throws InterruptedException {
        // 模拟一个 Multi
        Multi events = Multi.createFrom().items("Event 1", "Event 2", "Event 3");

        // 创建一个 Executor (这里使用一个简单的 Executor)
        Executor executor = Runnable::run;

        // 创建 MutinyExampleSimplified 实例
        MutinyExampleSimplified example = new MutinyExampleSimplified(executor);

        // 调用 writeTo 方法
        example.writeTo(events);

        // 等待一段时间,确保异步操作完成
        Thread.sleep(1000);
    }
}

总结

在使用 SmallRye Mutiny 进行异步事件处理时,理解 Reactive Streams 的背压机制至关重要。可以通过手动请求数据或使用 SmallRye 提供的简化 API 来解决订阅者无法接收到事件的问题。选择哪种方案取决于具体的需求和个人偏好。使用简化 API 可以减少代码量,提高可读性,但手动管理 Subscription 对象可以更精细地控制数据流。

相关专题

更多
硬盘接口类型介绍
硬盘接口类型介绍

硬盘接口类型有IDE、SATA、SCSI、Fibre Channel、USB、eSATA、mSATA、PCIe等等。详细介绍:1、IDE接口是一种并行接口,主要用于连接硬盘和光驱等设备,它主要有两种类型:ATA和ATAPI,IDE接口已经逐渐被SATA接口;2、SATA接口是一种串行接口,相较于IDE接口,它具有更高的传输速度、更低的功耗和更小的体积;3、SCSI接口等等。

1049

2023.10.19

PHP接口编写教程
PHP接口编写教程

本专题整合了PHP接口编写教程,阅读专题下面的文章了解更多详细内容。

86

2025.10.17

php8.4实现接口限流的教程
php8.4实现接口限流的教程

PHP8.4本身不内置限流功能,需借助Redis(令牌桶)或Swoole(漏桶)实现;文件锁因I/O瓶颈、无跨机共享、秒级精度等缺陷不适用高并发场景。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

456

2025.12.29

java接口相关教程
java接口相关教程

本专题整合了java接口相关内容,阅读专题下面的文章了解更多详细内容。

11

2026.01.19

Java编译相关教程合集
Java编译相关教程合集

本专题整合了Java编译相关教程,阅读专题下面的文章了解更多详细内容。

11

2026.01.21

C++多线程相关合集
C++多线程相关合集

本专题整合了C++多线程相关教程,阅读专题下面的的文章了解更多详细内容。

4

2026.01.21

无人机驾驶证报考 uom民用无人机综合管理平台官网
无人机驾驶证报考 uom民用无人机综合管理平台官网

无人机驾驶证(CAAC执照)报考需年满16周岁,初中以上学历,身体健康(矫正视力1.0以上,无严重疾病),且无犯罪记录。个人需通过民航局授权的训练机构报名,经理论(法规、原理)、模拟飞行、实操(GPS/姿态模式)及地面站训练后考试合格,通常15-25天拿证。

16

2026.01.21

Python多线程合集
Python多线程合集

本专题整合了Python多线程相关教程,阅读专题下面的文章了解更多详细内容。

1

2026.01.21

java多线程相关教程合集
java多线程相关教程合集

本专题整合了java多线程相关教程,阅读专题下面的文章了解更多详细内容。

3

2026.01.21

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
10分钟--Midjourney创作自己的漫画
10分钟--Midjourney创作自己的漫画

共1课时 | 0.1万人学习

Midjourney 关键词系列整合
Midjourney 关键词系列整合

共13课时 | 0.9万人学习

AI绘画教程
AI绘画教程

共2课时 | 0.2万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号