0

0

Quarkus WebSocket 中异步消息处理下的 MDC 上下文传播实践

碧海醫心

碧海醫心

发布时间:2026-02-19 11:31:04

|

443人浏览过

|

来源于php中文网

原创

Quarkus WebSocket 中异步消息处理下的 MDC 上下文传播实践

本文详解如何在 Quarkus WebSocket 服务中,于 @OnMessage 异步转发至 Vert.x EventBus 及事件消费者时,完整保留并复用 MDC(Mapped Diagnostic Context)中的请求级日志上下文(如 user.id、websocket.sessionId),解决因线程切换导致的 MDC 丢失问题。

本文详解如何在 quarkus websocket 服务中,于 `@onmessage` 异步转发至 vert.x eventbus 及事件消费者时,完整保留并复用 mdc(mapped diagnostic context)中的请求级日志上下文(如 `user.id`、`websocket.sessionid`),解决因线程切换导致的 mdc 丢失问题。

在 Quarkus 中构建 WebSocket 服务时,常需将耗时逻辑(如业务校验、外部调用)异步化以避免阻塞 I/O 线程。典型做法是通过 Vert.x EventBus 发布消息,并由 @ConsumeEvent 方法在工作线程中消费处理。然而,由于 MDC 本质依赖 ThreadLocal,而 WebSocket 生命周期方法(@OnOpen/@OnMessage)运行在 Netty/Vert.x I/O 线程,而事件消费者运行在独立的工作线程池中,原始 MDC 上下文无法自动跨线程传递——这直接导致 MDC.get("user.id") 在 handleWebSocketMessages 中返回 null。

要实现可靠的上下文传播,核心思路是:在 I/O 线程中显式捕获 MDC 快照,并将其随消息一同传递;在消费端线程中主动恢复该快照。以下为经过生产验证的完整实现方案:

✅ 步骤一:定义可序列化的上下文携带消息

为确保 EventBus 消息能安全跨线程/跨节点传输,建议使用轻量、无状态的 POJO 封装原始消息与会话标识:

public class WebSocketAsyncMessage implements Serializable {
    private final String sessionId;
    private final String payload;

    public WebSocketAsyncMessage(String sessionId, String payload) {
        this.sessionId = sessionId;
        this.payload = payload;
    }

    // getters...
}

⚠️ 注意:Serializable 是 Vert.x EventBus 默认编解码要求(若启用 Jackson 编解码器可替换为 @RegisterForReflection + JSON 序列化)。

SoundRaw AI
SoundRaw AI

面向创作者的 AI 音乐生成器,只需选择情绪、流派和长度,SoundRaw AI就能为你生成优美的歌曲。

下载

✅ 步骤二:集中管理会话级 MDC 快照

在 WebSocket 控制器中维护一个线程安全的静态映射表,以 sessionId 为键存储 MDC.getCopyOfContextMap() 的副本:

@Slf4j
@ApplicationScoped
@ServerEndpoint(value = "/users/{userId}")
public class UserWebSocketController {

    // 使用 ConcurrentHashMap 保证线程安全
    private static final Map<String, Map<String, String>> SESSION_MDC_CONTEXTS =
            new ConcurrentHashMap<>();

    private final WebsocketConnectionService websocketConnectionService;
    private final Vertx vertx;

    public UserWebSocketController(WebsocketConnectionService websocketConnectionService, Vertx vertx) {
        this.websocketConnectionService = websocketConnectionService;
        this.vertx = vertx;
    }

    @OnOpen
    public void onOpen(Session session, @PathParam("userId") String userId) {
        String sessionId = session.getId();
        // 初始化 MDC
        MDC.put("websocket.sessionId", sessionId);
        MDC.put("user.id", userId);
        log.info("New WebSocket Session opened for user {}", userId);

        // 持久化当前 MDC 快照
        SESSION_MDC_CONTEXTS.put(sessionId, MDC.getCopyOfContextMap());
        websocketConnectionService.addConnection(userId, session);
    }

    @OnMessage
    public void onMessage(Session session, String message, @PathParam("userId") String userId) {
        String sessionId = session.getId();
        // 关键:在发送前恢复当前会话的 MDC(确保日志含上下文)
        restoreSessionMDC(sessionId);
        log.info("Received message: {}", message);

        // 将 sessionId 与消息一起发送,供消费端还原上下文
        vertx.eventBus().send("websocket.message.new",
                new WebSocketAsyncMessage(sessionId, message));
    }

    @OnClose
    public void onClose(Session session, @PathParam("userId") String userId) {
        String sessionId = session.getId();
        restoreSessionMDC(sessionId);
        log.info("WebSocket Session closed for user {}", userId);

        // 清理资源:移除 MDC 快照 & 连接
        SESSION_MDC_CONTEXTS.remove(sessionId);
        websocketConnectionService.removeSession(userId);
    }

    @OnError
    public void onError(Session session, @PathParam("userId") String userId, Throwable throwable) {
        String sessionId = session.getId();
        restoreSessionMDC(sessionId);
        log.error("Error in WebSocket session for user {}", userId, throwable);
        websocketConnectionService.removeSession(userId);
    }

    // 工具方法:恢复指定会话的 MDC 上下文
    public static void restoreSessionMDC(String sessionId) {
        Map<String, String> context = SESSION_MDC_CONTEXTS.get(sessionId);
        if (context != null) {
            MDC.setContextMap(context);
        } else {
            MDC.clear(); // 防止残留旧上下文
        }
    }
}

✅ 步骤三:在事件消费者中主动还原 MDC

在 UserService 的事件处理器中,先调用 UserWebSocketController.restoreSessionMDC(...),再执行业务逻辑:

@Slf4j
@ApplicationScoped
public class UserService {

    private final WebsocketConnectionService websocketConnectionService;
    private final Vertx vertx;

    public UserService(WebsocketConnectionService websocketConnectionService, Vertx vertx) {
        this.websocketConnectionService = websocketConnectionService;
        this.vertx = vertx;
    }

    @ConsumeEvent("websocket.message.new")
    public Uni<Void> handleWebSocketMessages(WebSocketAsyncMessage asyncMessage) {
        // ✅ 关键:立即还原该会话的 MDC 上下文
        UserWebSocketController.restoreSessionMDC(asyncMessage.getSessionId());

        // 此时 MDC 已就绪,可安全读取
        String userId = MDC.get("user.id");
        log.info("Processing message for user {} with payload: {}", userId, asyncMessage.getPayload());

        // 执行实际业务逻辑(例如:持久化、通知、调用其他服务)
        // ... business logic ...

        return Uni.createFrom().voidItem();
    }
}

? 关键注意事项与最佳实践

  • 线程安全性:ConcurrentHashMap 是必须的——WebSocket 多个连接可能并发触发 onOpen/onClose。
  • 内存泄漏防护:务必在 @OnClose 和 @OnError 中调用 SESSION_MDC_CONTEXTS.remove(sessionId),避免长期持有已断开连接的上下文。
  • 日志一致性:所有 log.*() 调用前应确保 restoreSessionMDC() 已执行,否则日志将缺失关键诊断字段。
  • 扩展性考量:若需支持分布式部署,SESSION_MDC_CONTEXTS 应替换为 Redis 或 Infinispan 等共享存储(本例适用于单节点场景)。
  • 替代方案提示:Quarkus 2.13+ 提供了 @WithSpan 与 OpenTelemetry 集成,对链路追踪更友好;但 MDC 仍是最轻量、最直接的日志上下文注入方式。

通过以上设计,你能在完全异步的 WebSocket 消息流中,稳定维持用户身份、会话标识等关键日志维度,大幅提升可观测性与问题排查效率。

热门AI工具

更多
DeepSeek
DeepSeek

幻方量化公司旗下的开源大模型平台

豆包大模型
豆包大模型

字节跳动自主研发的一系列大型语言模型

通义千问
通义千问

阿里巴巴推出的全能AI助手

腾讯元宝
腾讯元宝

腾讯混元平台推出的AI助手

文心一言
文心一言

文心一言是百度开发的AI聊天机器人,通过对话可以生成各种形式的内容。

讯飞写作
讯飞写作

基于讯飞星火大模型的AI写作工具,可以快速生成新闻稿件、品宣文案、工作总结、心得体会等各种文文稿

即梦AI
即梦AI

一站式AI创作平台,免费AI图片和视频生成。

ChatGPT
ChatGPT

最最强大的AI聊天机器人程序,ChatGPT不单是聊天机器人,还能进行撰写邮件、视频脚本、文案、翻译、代码等任务。

相关专题

更多
什么是分布式
什么是分布式

分布式是一种计算和数据处理的方式,将计算任务或数据分散到多个计算机或节点中进行处理。本专题为大家提供分布式相关的文章、下载、课程内容,供大家免费下载体验。

394

2023.08.11

分布式和微服务的区别
分布式和微服务的区别

分布式和微服务的区别在定义和概念、设计思想、粒度和复杂性、服务边界和自治性、技术栈和部署方式等。本专题为大家提供分布式和微服务相关的文章、下载、课程内容,供大家免费下载体验。

246

2023.10.07

json数据格式
json数据格式

JSON是一种轻量级的数据交换格式。本专题为大家带来json数据格式相关文章,帮助大家解决问题。

442

2023.08.07

json是什么
json是什么

JSON是一种轻量级的数据交换格式,具有简洁、易读、跨平台和语言的特点,JSON数据是通过键值对的方式进行组织,其中键是字符串,值可以是字符串、数值、布尔值、数组、对象或者null,在Web开发、数据交换和配置文件等方面得到广泛应用。本专题为大家提供json相关的文章、下载、课程内容,供大家免费下载体验。

544

2023.08.23

jquery怎么操作json
jquery怎么操作json

操作的方法有:1、“$.parseJSON(jsonString)”2、“$.getJSON(url, data, success)”;3、“$.each(obj, callback)”;4、“$.ajax()”。更多jquery怎么操作json的详细内容,可以访问本专题下面的文章。

322

2023.10.13

go语言处理json数据方法
go语言处理json数据方法

本专题整合了go语言中处理json数据方法,阅读专题下面的文章了解更多详细内容。

81

2025.09.10

c语言中null和NULL的区别
c语言中null和NULL的区别

c语言中null和NULL的区别是:null是C语言中的一个宏定义,通常用来表示一个空指针,可以用于初始化指针变量,或者在条件语句中判断指针是否为空;NULL是C语言中的一个预定义常量,通常用来表示一个空值,用于表示一个空的指针、空的指针数组或者空的结构体指针。

244

2023.09.22

java中null的用法
java中null的用法

在Java中,null表示一个引用类型的变量不指向任何对象。可以将null赋值给任何引用类型的变量,包括类、接口、数组、字符串等。想了解更多null的相关内容,可以阅读本专题下面的文章。

786

2024.03.01

pixiv网页版官网登录与阅读指南_pixiv官网直达入口与在线访问方法
pixiv网页版官网登录与阅读指南_pixiv官网直达入口与在线访问方法

本专题系统整理pixiv网页版官网入口及登录访问方式,涵盖官网登录页面直达路径、在线阅读入口及快速进入方法说明,帮助用户高效找到pixiv官方网站,实现便捷、安全的网页端浏览与账号登录体验。

660

2026.02.13

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
swoole入门物联网开发与实战
swoole入门物联网开发与实战

共15课时 | 1.3万人学习

swoole项目实战(第二季)
swoole项目实战(第二季)

共15课时 | 1.3万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号