diff --git a/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/core/SseEmitterManager.java b/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/core/SseEmitterManager.java index 313f0291a..b80e56169 100644 --- a/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/core/SseEmitterManager.java +++ b/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/core/SseEmitterManager.java @@ -1,5 +1,6 @@ package org.dromara.common.sse.core; +import cn.hutool.core.collection.CollUtil; import cn.hutool.core.map.MapUtil; import lombok.extern.slf4j.Slf4j; import org.dromara.common.core.utils.SpringUtils; @@ -8,6 +9,9 @@ import org.dromara.common.sse.dto.SseMessageDto; import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledExecutorService; @@ -113,22 +117,41 @@ public class SseEmitterManager { } /** - * SSE心跳检测,关闭无效连接 + * SSE 心跳检测,关闭无效连接 */ public void sseMonitor() { - log.info("开始 SSE 心跳"); - USER_TOKEN_EMITTERS.forEach((userId, map) -> - map.entrySet().removeIf(e -> { + final SseEmitter.SseEventBuilder heartbeat = SseEmitter.event().comment("heartbeat"); + // 记录需要移除的用户ID + List toRemoveUsers = new ArrayList<>(); + + USER_TOKEN_EMITTERS.forEach((userId, emitterMap) -> { + if (CollUtil.isEmpty(emitterMap)) { + toRemoveUsers.add(userId); + return; + } + + emitterMap.entrySet().removeIf(entry -> { try { - e.getValue().send(SseEmitter.event().comment("heartbeat")); + entry.getValue().send(heartbeat); return false; } catch (Exception ex) { - log.warn("心跳失败,移除连接: userId={}, token={}", userId, e.getKey()); - e.getValue().complete(); - return true; + try { + entry.getValue().complete(); + } catch (Exception ignore) { + // 忽略重复关闭异常 + } + return true; // 发送失败 → 移除该连接 } - }) - ); + }); + + // 移除空连接用户 + if (emitterMap.isEmpty()) { + toRemoveUsers.add(userId); + } + }); + + // 循环结束后统一清理空用户,避免并发修改异常 + toRemoveUsers.forEach(USER_TOKEN_EMITTERS::remove); } /**