
本文详解如何在 Quarkus WebSocket 服务中,于 @OnMessage 和 Vert.x EventBus 异步事件处理链路中可靠传递 MDC(Mapped Diagnostic Context)日志上下文,解决因线程切换导致的 MDC.get() 返回 null 的典型问题。
本文详解如何在 quarkus websocket 服务中,于 `@onmessage` 和 vert.x eventbus 异步事件处理链路中可靠传递 mdc(mapped diagnostic context)日志上下文,解决因线程切换导致的 `mdc.get()` 返回 null 的典型问题。
在 Quarkus 中构建 WebSocket 服务时,常需将消息处理异步化(例如通过 Vert.x EventBus 解耦),以提升响应性与可维护性。但 MDC 本质基于 ThreadLocal,其上下文仅绑定于当前线程——而 @OnMessage 回调、EventBus 处理器(如 @ConsumeEvent)及后续 Uni 异步操作往往运行在不同线程池中,导致原始 MDC 键值(如 "user.id"、"websocket.sessionId")丢失。
直接依赖 MDC.getCopyOfContextMap() 在主线程捕获并跨线程还原,是轻量且符合 Quarkus 哲学的解决方案。关键在于:在 WebSocket 生命周期钩子中显式快照与恢复 MDC,并将上下文标识(如 session ID)随业务数据一并传递。
以下为完整实现方案:
✅ 步骤一:在 WebSocket 控制器中管理 MDC 快照
@Slf4j
@ApplicationScoped
@ServerEndpoint(value = "/users/{userId}")
public class UserWebSocketController {
// 使用 ConcurrentHashMap 保证线程安全,避免静态 Map 的并发风险
private static final Map<String, Map<String, String>> SESSION_MDC_CONTEXTS =
new ConcurrentHashMap<>();
private final WebsocketConnectionService websocketConnectionService;
private final Vertx vertx;
public UserWebSocketController(WebsocketConnectionService websocketConnectionService, Vertx vertx) {
this.websocketConnectionService = websocketConnectionService;
this.vertx = vertx;
}
@OnOpen
public void onOpen(Session session, @PathParam("userId") String userId) {
// 设置初始 MDC
MDC.put("websocket.sessionId", session.getId());
MDC.put("user.id", userId);
MDC.put("event", "websocket.open");
log.info("New WebSocket session opened for user: {}", userId);
// 快照当前 MDC 到全局映射(以 session ID 为 key)
storeSessionMdc(session.getId());
websocketConnectionService.addConnection(userId, session);
}
@OnMessage
public void onMessage(Session session, String message, @PathParam("userId") String userId) {
// 恢复该 session 对应的 MDC(确保 onMessage 内日志含上下文)
restoreSessionMdc(session.getId());
log.info("Received message: '{}'", message);
// 构造携带 session ID 的消息载体,确保下游可还原上下文
AsyncWebSocketMessage payload = new AsyncWebSocketMessage(
session.getId(),
message,
userId
);
// 发送至 EventBus —— 注意:此处不依赖 MDC 自动传播
vertx.eventBus().send("websocket.message.new", payload);
}
@OnClose
public void onClose(Session session, @PathParam("userId") String userId) {
restoreSessionMdc(session.getId());
log.info("WebSocket session closed for user: {}", userId);
removeSessionMdc(session.getId());
websocketConnectionService.removeSession(userId);
}
@OnError
public void onError(Session session, @PathParam("userId") String userId, Throwable throwable) {
restoreSessionMdc(session.getId());
log.error("Error in WebSocket session for user: {}", userId, throwable);
removeSessionMdc(session.getId());
websocketConnectionService.removeSession(userId);
}
// —— MDC 管理工具方法 ——
private void storeSessionMdc(String sessionId) {
SESSION_MDC_CONTEXTS.put(sessionId, MDC.getCopyOfContextMap());
}
public static void restoreSessionMdc(String sessionId) {
Map<String, String> context = SESSION_MDC_CONTEXTS.get(sessionId);
if (context != null) {
MDC.setContextMap(context);
} else {
MDC.clear(); // 防止残留旧上下文
}
}
private void removeSessionMdc(String sessionId) {
SESSION_MDC_CONTEXTS.remove(sessionId);
}
// 内部消息载体(需序列化支持,用于 EventBus 传输)
public static class AsyncWebSocketMessage implements Serializable {
private final String sessionId;
private final String message;
private final String userId;
public AsyncWebSocketMessage(String sessionId, String message, String userId) {
this.sessionId = sessionId;
this.message = message;
this.userId = userId;
}
public String getSessionId() { return sessionId; }
public String getMessage() { return message; }
public String getUserId() { return userId; }
}
}✅ 步骤二:在事件处理器中主动恢复 MDC
@Slf4j
@ApplicationScoped
public class UserService {
private final WebsocketConnectionService websocketConnectionService;
private final Vertx vertx;
public UserService(WebsocketConnectionService websocketConnectionService, Vertx vertx) {
this.websocketConnectionService = websocketConnectionService;
this.vertx = vertx;
}
@ConsumeEvent("websocket.message.new")
public Uni<Void> handleWebSocketMessages(UserWebSocketController.AsyncWebSocketMessage payload) {
// 关键:在事件处理起始处恢复对应 session 的 MDC
UserWebSocketController.restoreSessionMdc(payload.getSessionId());
// 此时 MDC 已就绪,日志自动携带上下文
log.info("Processing WebSocket message for user: {}, content: '{}'",
payload.getUserId(), payload.getMessage());
// 执行业务逻辑(可含异步 DB 调用、HTTP 请求等)
return doBusinessWork(payload)
.onItem().invoke(() -> {
// 可选:处理完成后清理(若需严格隔离)
MDC.clear();
})
.replaceWithVoid();
}
private Uni<Void> doBusinessWork(UserWebSocketController.AsyncWebSocketMessage payload) {
// 示例:模拟异步业务处理
return Uni.createFrom().voidItem()
.onItem().delayIt().by(Duration.ofMillis(50))
.onItem().invoke(() -> log.debug("Business logic completed for user: {}", payload.getUserId()));
}
}⚠️ 重要注意事项
- 线程安全性:SESSION_MDC_CONTEXTS 必须使用 ConcurrentHashMap,避免多 session 并发注册/清理引发 ConcurrentModificationException。
- 内存泄漏防护:务必在 @OnClose 或 @OnError 中调用 removeSessionMdc(),否则长期连接断开后 MDC 映射将持续占用内存。
- 序列化兼容性:AsyncWebSocketMessage 需实现 Serializable,且所有字段应为可序列化类型;若使用 JSON 序列化(如 EventBus 默认编解码),可改用 JsonObject 替代自定义类。
- MDC 清理时机:restoreSessionMdc() 后建议在 Uni 链尾或 @ConsumeEvent 方法末尾调用 MDC.clear(),防止上下文意外泄露至其他异步任务。
- 替代方案提示:对高并发场景,可考虑结合 Micrometer 的 Observation 或 Quarkus 的 SmallRye Context Propagation 扩展实现更通用的上下文透传,但 MDC 快照方案在日志追踪维度上更直接、低侵入。
通过上述设计,你不仅解决了 MDC 在 WebSocket + EventBus 异步链路中的断连问题,还保持了代码的清晰性与可测试性——每个组件职责明确:控制器负责上下文生命周期管理,服务层专注业务逻辑,日志始终具备完整的请求溯源能力。










