
本文详解如何在 Quarkus WebSocket 服务中,于 @OnMessage 异步处理及 Vert.x EventBus 事件消费阶段,可靠地透传 MDC(Mapped Diagnostic Context)日志上下文,确保 user.id、websocket.sessionId 等关键追踪字段全程可用。
本文详解如何在 quarkus websocket 服务中,于 `@onmessage` 异步处理及 vert.x eventbus 事件消费阶段,可靠地透传 mdc(mapped diagnostic context)日志上下文,确保 `user.id`、`websocket.sessionid` 等关键追踪字段全程可用。
在 Quarkus 的 WebSocket 实现中,MDC(如 Logback 的 MDC)本质上基于 ThreadLocal,其生命周期严格绑定于当前线程。而 WebSocket 的 @OnMessage 回调虽由容器线程触发,但一旦消息被投递至 Vert.x EventBus(如 vertx.eventBus().send()),后续的 @ConsumeEvent 处理将运行在 Vert.x 工作线程池中——此时原始线程的 MDC 上下文已丢失,导致 MDC.get("user.id") 返回 null。
要解决这一问题,核心思路是:主动捕获、显式存储、按需恢复。不能依赖自动传播(Quarkus 未为 WebSocket + EventBus 组合提供开箱即用的 MDC 跨线程透传),而应将 MDC 上下文快照与 WebSocket 会话强绑定,并在异步处理入口处手动还原。
✅ 推荐实现方案:基于 Session ID 的 MDC 快照管理
我们采用静态线程安全映射(ConcurrentHashMap)持久化每个活跃 WebSocket 会话的 MDC 快照,并在关键生命周期节点进行存取操作:
// UserWebSocketController.java
@Slf4j
@ApplicationScoped
@ServerEndpoint(value = "/users/{userId}")
public class UserWebSocketController {
// ⚠️ 线程安全的 MDC 快照存储(Key: session.id)
private static final Map<String, Map<String, String>> SESSION_MDC_MAP =
new ConcurrentHashMap<>();
private final WebsocketConnectionService websocketConnectionService;
private final Vertx vertx;
public UserWebSocketController(WebsocketConnectionService websocketConnectionService, Vertx vertx) {
this.websocketConnectionService = websocketConnectionService;
this.vertx = vertx;
}
@OnOpen
public void onOpen(Session session, @PathParam("userId") String userId) {
// 1. 初始化 MDC
MDC.put("websocket.sessionId", session.getId());
MDC.put("user.id", userId);
log.info("New WebSocket Session opened for user {}", userId);
// 2. 持久化当前 MDC 快照(供后续异步使用)
storeSessionMDC(session.getId());
websocketConnectionService.addConnection(userId, session);
}
@OnMessage
public void onMessage(Session session, String message, @PathParam("userId") String userId) {
// 3. 关键:恢复当前会话的 MDC(保障 onMessage 内日志可追溯)
restoreSessionMDC(session.getId());
log.info("New message received: {}", message);
// 4. 将 sessionId 显式携带至事件负载,用于下游还原
var asyncMsg = new AsyncWebSocketMessage(session.getId(), message);
vertx.eventBus().send("websocket.message.new", asyncMsg);
}
@OnClose
public void onClose(Session session, @PathParam("userId") String userId) {
restoreSessionMDC(session.getId());
log.info("WebSocket Session closed for user {}", userId);
removeSessionMDC(session.getId()); // 清理资源
websocketConnectionService.removeSession(userId);
}
@OnError
public void onError(Session session, @PathParam("userId") String userId, Throwable throwable) {
restoreSessionMDC(session.getId());
log.error("Error in WebSocket session for user {}", userId, throwable);
removeSessionMDC(session.getId());
websocketConnectionService.removeSession(userId);
}
// —— MDC 快照工具方法 ——
private static void storeSessionMDC(String sessionId) {
SESSION_MDC_MAP.put(sessionId, MDC.getCopyOfContextMap());
}
public static void restoreSessionMDC(String sessionId) {
Map<String, String> contextMap = SESSION_MDC_MAP.get(sessionId);
if (contextMap != null) {
MDC.setContextMap(contextMap);
} else {
MDC.clear(); // 防止残留
}
}
private static void removeSessionMDC(String sessionId) {
SESSION_MDC_MAP.remove(sessionId);
MDC.clear();
}
// 事件载体(需可序列化,适配 Vert.x EventBus)
public static class AsyncWebSocketMessage implements Serializable {
private final String sessionId;
private final String payload;
public AsyncWebSocketMessage(String sessionId, String payload) {
this.sessionId = sessionId;
this.payload = payload;
}
public String getSessionId() { return sessionId; }
public String getPayload() { return payload; }
}
}? 在事件消费者中还原 MDC
UserService 的 @ConsumeEvent 方法需在执行业务逻辑前,主动调用 UserWebSocketController.restoreSessionMDC(...),确保日志上下文就位:
// UserService.java
@Slf4j
@ApplicationScoped
public class UserService {
private final WebsocketConnectionService websocketConnectionService;
private final Vertx vertx;
public UserService(WebsocketConnectionService websocketConnectionService, Vertx vertx) {
this.websocketConnectionService = websocketConnectionService;
this.vertx = vertx;
}
@ConsumeEvent("websocket.message.new")
public Uni<Void> handleWebSocketMessages(UserWebSocketController.AsyncWebSocketMessage msg) {
// ✅ 关键:在业务逻辑前恢复 MDC
UserWebSocketController.restoreSessionMDC(msg.getSessionId());
// 此时 MDC 已就绪
String userId = MDC.get("user.id");
log.info("Processing message for user {} with payload: {}", userId, msg.getPayload());
// 执行实际业务逻辑(如 DB 查询、外部调用等)
// ... your business logic ...
return Uni.createFrom().voidItem();
}
}⚠️ 注意事项与最佳实践
- 内存泄漏防护:务必在 @OnClose 和 @OnError 中调用 removeSessionMDC(),避免 SESSION_MDC_MAP 持续增长。生产环境建议增加 TTL 或结合 WeakReference 进一步优化。
- 序列化兼容性:AsyncWebSocketMessage 必须 implements Serializable,且所有字段需可序列化(Vert.x EventBus 默认使用 Jackson 序列化,若使用 Protobuf 等需额外配置)。
- 线程安全性:ConcurrentHashMap 保证了多线程存取安全,但 MDC.setContextMap() 本身是线程局部的,无需额外同步。
- 日志一致性:@OnOpen/@OnMessage/@OnClose 中均调用 restoreSessionMDC(),可确保整个 WebSocket 生命周期内日志链路完整。
- 替代方案考量:若项目已引入 Micrometer Tracing(如 OpenTelemetry),可优先使用 TracedExecutor 或 Tracer.withSpanInScope() 实现分布式追踪上下文透传,MDC 则作为补充的日志增强字段。
通过该方案,你能在 Quarkus WebSocket 的异步事件流中,稳定维持用户身份、会话标识等诊断信息,大幅提升日志可追溯性与故障排查效率。










