0

0

SmallRye Mutiny 异步处理事件时订阅无响应问题排查与解决

DDD

DDD

发布时间:2025-08-16 16:08:14

|

934人浏览过

|

来源于php中文网

原创

smallrye mutiny 异步处理事件时订阅无响应问题排查与解决

本文旨在解决在使用 SmallRye Mutiny 处理异步事件流时,订阅者无法接收到事件的问题。通过分析背压机制,提供了手动请求数据和使用 Mutiny 提供的更简洁API两种解决方案,并附带代码示例,帮助开发者正确地异步处理事件流。

在使用 SmallRye Mutiny 进行响应式编程时,异步处理事件流是一个常见的需求。 然而,开发者可能会遇到订阅者(Subscriber)无法接收到事件,导致 onNext 方法没有被调用的情况。 这通常是由于对 Reactive Streams 规范中的背压(Backpressure)机制理解不足造成的。

背压机制详解

Reactive Streams 规范,包括 SmallRye Mutiny 的实现,都内置了背压机制。 背压机制用于控制数据流的速度,防止生产者(Publisher)产生数据的速度超过消费者(Subscriber)的处理能力,从而避免资源耗尽或系统崩溃。

简单来说,背压机制要求消费者显式地向生产者请求数据。 只有在消费者准备好处理数据时,才向生产者发出请求。 如果消费者没有发出请求,生产者就不会发送数据。

问题分析

在原始代码中,订阅者实现了 Subscriber 接口,并重写了 onSubscribe、onNext、onError 和 onComplete 方法。 然而,在 onSubscribe 方法中,仅仅输出了日志,并没有向 Subscription 对象请求数据。 这导致生产者无法得知消费者已经准备好接收数据,因此不会发送任何事件。

解决方案一:手动请求数据

解决这个问题的方法是在 onSubscribe 方法中保存 Subscription 对象,并在 onNext 方法中调用 request(long) 方法,显式地请求数据。

以下是修改后的代码示例:

import io.smallrye.mutiny.Multi;
import org.reactivestreams.Subscription;
import org.reactivestreams.Subscriber;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

public class MutinyExample {

    private static final Executor managedExecutor = Executors.newFixedThreadPool(10);

    public static void main(String[] args) {
        StreamingInfo streamingInfo = new StreamingInfo();
        streamingInfo.setEvents(Multi.createFrom().items("Event 1", "Event 2", "Event 3"));
        writeTo(streamingInfo);
    }

    public static void writeTo(StreamingInfo streamingInfo) {
        streamingInfo
            .getEvents()
            .runSubscriptionOn(managedExecutor)
            .subscribe()
            .withSubscriber(
                new Subscriber() {
                    private Subscription subscription;

                    @Override
                    public void onSubscribe(Subscription s) {
                        System.out.println("OnSubscription Method");
                        System.out.println("ON SUBS END");
                        subscription = s;
                        subscription.request(1); // 请求第一个事件
                    }

                    @Override
                    public void onNext(String event) {
                        System.out.println("On Next Method: " + event);
                        subscription.request(1); // 处理完一个事件后,请求下一个事件
                    }

                    @Override
                    public void onError(Throwable t) {
                        System.out.println("OnError Method: " + t.getMessage());
                    }

                    @Override
                    public void onComplete() {
                        System.out.println("On Complete Method");
                    }
                });
    }

    static class StreamingInfo {
        private Multi events;

        public Multi getEvents() {
            return events;
        }

        public void setEvents(Multi events) {
            this.events = events;
        }
    }
}

在这个示例中,onSubscribe 方法中保存了 Subscription 对象,并调用了 subscription.request(1) 请求第一个事件。 在 onNext 方法中,处理完一个事件后,再次调用 subscription.request(1) 请求下一个事件。 这样,订阅者就能接收到所有的事件了。

Audo Studio
Audo Studio

AI音频清洗工具(噪音消除、声音平衡、音量调节)

下载

注意事项:

  • request(long) 方法的参数表示请求的事件数量。 可以根据实际需求调整请求的数量。
  • 在 onError 方法中,通常不需要请求数据。
  • 在 onComplete 方法中,表示事件流已经结束,不需要再请求数据。

解决方案二:使用 Mutiny 提供的 API

SmallRye Mutiny 提供了更简洁的 API 来处理事件流,避免手动管理 Subscription 对象。 可以使用 onSubscription、onItem、onFailure 和 onCompletion 方法来注册相应的回调函数。

以下是使用 Mutiny 提供的 API 的代码示例:

import io.smallrye.mutiny.Multi;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

public class MutinyExample {

    private static final Executor managedExecutor = Executors.newFixedThreadPool(10);

    public static void main(String[] args) {
        StreamingInfo streamingInfo = new StreamingInfo();
        streamingInfo.setEvents(Multi.createFrom().items("Event 1", "Event 2", "Event 3"));
        writeTo(streamingInfo);
    }

    public static void writeTo(StreamingInfo streamingInfo) {
        streamingInfo
            .getEvents()
            .runSubscriptionOn(managedExecutor)
            .onSubscription()
            .invoke(() -> {
                System.out.println("OnSubscription Method");
                System.out.println("ON SUBS END");
            })
            .onItem()
            .invoke(event -> System.out.println("On Next Method: " + event))
            .onFailure()
            .invoke(t -> System.out.println("OnError Method: " + t.getMessage()))
            .onCompletion()
            .invoke(() -> System.out.println("On Complete Method"))
            .subscribe()
            .with(value -> {});
    }

    static class StreamingInfo {
        private Multi events;

        public Multi getEvents() {
            return events;
        }

        public void setEvents(Multi events) {
            this.events = events;
        }
    }
}

在这个示例中,使用了 onSubscription、onItem、onFailure 和 onCompletion 方法来注册相应的回调函数,避免了手动管理 Subscription 对象,代码更加简洁易懂。

总结:

在 SmallRye Mutiny 中异步处理事件流时,需要注意 Reactive Streams 规范中的背压机制。 可以通过手动请求数据或使用 Mutiny 提供的 API 来解决订阅者无法接收到事件的问题。 建议使用 Mutiny 提供的 API,因为代码更加简洁易懂。

相关专题

更多
硬盘接口类型介绍
硬盘接口类型介绍

硬盘接口类型有IDE、SATA、SCSI、Fibre Channel、USB、eSATA、mSATA、PCIe等等。详细介绍:1、IDE接口是一种并行接口,主要用于连接硬盘和光驱等设备,它主要有两种类型:ATA和ATAPI,IDE接口已经逐渐被SATA接口;2、SATA接口是一种串行接口,相较于IDE接口,它具有更高的传输速度、更低的功耗和更小的体积;3、SCSI接口等等。

1050

2023.10.19

PHP接口编写教程
PHP接口编写教程

本专题整合了PHP接口编写教程,阅读专题下面的文章了解更多详细内容。

86

2025.10.17

php8.4实现接口限流的教程
php8.4实现接口限流的教程

PHP8.4本身不内置限流功能,需借助Redis(令牌桶)或Swoole(漏桶)实现;文件锁因I/O瓶颈、无跨机共享、秒级精度等缺陷不适用高并发场景。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

458

2025.12.29

java接口相关教程
java接口相关教程

本专题整合了java接口相关内容,阅读专题下面的文章了解更多详细内容。

11

2026.01.19

Golang 性能分析与pprof调优实战
Golang 性能分析与pprof调优实战

本专题系统讲解 Golang 应用的性能分析与调优方法,重点覆盖 pprof 的使用方式,包括 CPU、内存、阻塞与 goroutine 分析,火焰图解读,常见性能瓶颈定位思路,以及在真实项目中进行针对性优化的实践技巧。通过案例讲解,帮助开发者掌握 用数据驱动的方式持续提升 Go 程序性能与稳定性。

9

2026.01.22

html编辑相关教程合集
html编辑相关教程合集

本专题整合了html编辑相关教程合集,阅读专题下面的文章了解更多详细内容。

56

2026.01.21

三角洲入口地址合集
三角洲入口地址合集

本专题整合了三角洲入口地址合集,阅读专题下面的文章了解更多详细内容。

30

2026.01.21

AO3中文版入口地址大全
AO3中文版入口地址大全

本专题整合了AO3中文版入口地址大全,阅读专题下面的的文章了解更多详细内容。

393

2026.01.21

妖精漫画入口地址合集
妖精漫画入口地址合集

本专题整合了妖精漫画入口地址合集,阅读专题下面的文章了解更多详细内容。

116

2026.01.21

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
10分钟--Midjourney创作自己的漫画
10分钟--Midjourney创作自己的漫画

共1课时 | 0.1万人学习

Midjourney 关键词系列整合
Midjourney 关键词系列整合

共13课时 | 0.9万人学习

AI绘画教程
AI绘画教程

共2课时 | 0.2万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号