From aa1f89e253ce6bd2df6ee23e334c9556bae6c224 Mon Sep 17 00:00:00 2001 From: AprilWind <2100166581@qq.com> Date: Fri, 24 Oct 2025 10:53:52 +0800 Subject: [PATCH] =?UTF-8?q?update=20=E4=BC=98=E5=8C=96=20SSE=20=E5=BF=83?= =?UTF-8?q?=E8=B7=B3=E6=A3=80=E6=B5=8B=E9=80=BB=E8=BE=91=EF=BC=8C=E5=A2=9E?= =?UTF-8?q?=E5=BC=BA=E8=BF=9E=E6=8E=A5=E7=AE=A1=E7=90=86=E4=B8=8E=E5=BC=82?= =?UTF-8?q?=E5=B8=B8=E5=A4=84=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../common/sse/core/SseEmitterManager.java | 43 ++++++++++++++----- 1 file changed, 33 insertions(+), 10 deletions(-) diff --git a/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/core/SseEmitterManager.java b/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/core/SseEmitterManager.java index 313f0291a..b80e56169 100644 --- a/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/core/SseEmitterManager.java +++ b/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/core/SseEmitterManager.java @@ -1,5 +1,6 @@ package org.dromara.common.sse.core; +import cn.hutool.core.collection.CollUtil; import cn.hutool.core.map.MapUtil; import lombok.extern.slf4j.Slf4j; import org.dromara.common.core.utils.SpringUtils; @@ -8,6 +9,9 @@ import org.dromara.common.sse.dto.SseMessageDto; import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledExecutorService; @@ -113,22 +117,41 @@ public class SseEmitterManager { } /** - * SSE心跳检测,关闭无效连接 + * SSE 心跳检测,关闭无效连接 */ public void sseMonitor() { - log.info("开始 SSE 心跳"); - USER_TOKEN_EMITTERS.forEach((userId, map) -> - map.entrySet().removeIf(e -> { + final SseEmitter.SseEventBuilder heartbeat = SseEmitter.event().comment("heartbeat"); + // 记录需要移除的用户ID + List toRemoveUsers = new ArrayList<>(); + + USER_TOKEN_EMITTERS.forEach((userId, emitterMap) -> { + if (CollUtil.isEmpty(emitterMap)) { + toRemoveUsers.add(userId); + return; + } + + emitterMap.entrySet().removeIf(entry -> { try { - e.getValue().send(SseEmitter.event().comment("heartbeat")); + entry.getValue().send(heartbeat); return false; } catch (Exception ex) { - log.warn("心跳失败,移除连接: userId={}, token={}", userId, e.getKey()); - e.getValue().complete(); - return true; + try { + entry.getValue().complete(); + } catch (Exception ignore) { + // 忽略重复关闭异常 + } + return true; // 发送失败 → 移除该连接 } - }) - ); + }); + + // 移除空连接用户 + if (emitterMap.isEmpty()) { + toRemoveUsers.add(userId); + } + }); + + // 循环结束后统一清理空用户,避免并发修改异常 + toRemoveUsers.forEach(USER_TOKEN_EMITTERS::remove); } /**