From 3934e119d649e4cbd324b6ca14f9b174b644ef4e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=8D=89=E7=B7=A8=E7=9A=84=E6=88=92=E6=8C=87=E7=A4=BB?= <970259858@qq.com> Date: Mon, 20 Oct 2025 04:01:21 +0000 Subject: [PATCH] =?UTF-8?q?!776=20update=20=E4=BC=98=E5=8C=96=20sse=20?= =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E7=9B=B8=E5=90=8Ctoken=E5=8E=86=E5=8F=B2?= =?UTF-8?q?=E8=BF=9E=E6=8E=A5=E6=9C=AA=E5=85=B3=E9=97=AD=E9=97=AE=E9=A2=98?= =?UTF-8?q?=EF=BC=9B=E6=96=B0=E5=A2=9E=E5=BF=83=E8=B7=B3=E7=9B=91=E6=B5=8B?= =?UTF-8?q?=EF=BC=8C=E5=85=B3=E9=97=AD=E6=97=A0=E6=95=88=E8=BF=9E=E6=8E=A5?= =?UTF-8?q?=20*=20update=20=E4=BC=98=E5=8C=96=20sse=20=E5=BF=83=E8=B7=B3?= =?UTF-8?q?=E5=AE=9A=E6=97=B6=E5=99=A8=E6=89=A7=E8=A1=8C=E6=96=B9=E5=BC=8F?= =?UTF-8?q?=20*=20update=20=E4=BC=98=E5=8C=96=20sse=20=E5=BF=83=E8=B7=B3?= =?UTF-8?q?=E6=A3=80=E6=B5=8B=E5=86=99=E6=B3=95=20*=20update=20=E4=BC=98?= =?UTF-8?q?=E5=8C=96=20sse=20=E4=BF=AE=E5=A4=8D=E7=9B=B8=E5=90=8Ctoken?= =?UTF-8?q?=E5=8E=86=E5=8F=B2=E8=BF=9E=E6=8E=A5=E6=9C=AA=E5=85=B3=E9=97=AD?= =?UTF-8?q?=E9=97=AE=E9=A2=98=EF=BC=9B=E6=96=B0=E5=A2=9E=E5=BF=83=E8=B7=B3?= =?UTF-8?q?=E7=9B=91=E6=B5=8B=EF=BC=8C=E5=85=B3=E9=97=AD=E6=97=A0=E6=95=88?= =?UTF-8?q?=E8=BF=9E=E6=8E=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../common/sse/core/SseEmitterManager.java | 34 +++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/core/SseEmitterManager.java b/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/core/SseEmitterManager.java index bc19460f8..313f0291a 100644 --- a/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/core/SseEmitterManager.java +++ b/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/core/SseEmitterManager.java @@ -2,6 +2,7 @@ package org.dromara.common.sse.core; import cn.hutool.core.map.MapUtil; import lombok.extern.slf4j.Slf4j; +import org.dromara.common.core.utils.SpringUtils; import org.dromara.common.redis.utils.RedisUtils; import org.dromara.common.sse.dto.SseMessageDto; import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; @@ -9,6 +10,8 @@ import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; import java.io.IOException; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.function.Consumer; /** @@ -26,6 +29,12 @@ public class SseEmitterManager { private final static Map> USER_TOKEN_EMITTERS = new ConcurrentHashMap<>(); + public SseEmitterManager() { + // 定时执行 SSE 心跳检测 + SpringUtils.getBean(ScheduledExecutorService.class) + .scheduleWithFixedDelay(this::sseMonitor, 60L, 60L, TimeUnit.SECONDS); + } + /** * 建立与指定用户的 SSE 连接 * @@ -38,6 +47,12 @@ public class SseEmitterManager { // 每个用户可以有多个 SSE 连接,通过 token 进行区分 Map emitters = USER_TOKEN_EMITTERS.computeIfAbsent(userId, k -> new ConcurrentHashMap<>()); + // 关闭已存在的SseEmitter,防止超过最大连接数 + SseEmitter oldEmitter = emitters.remove(token); + if (oldEmitter != null) { + oldEmitter.complete(); + } + // 创建一个新的 SseEmitter 实例,超时时间设置为一天 避免连接之后直接关闭浏览器导致连接停滞 SseEmitter emitter = new SseEmitter(86400000L); @@ -97,6 +112,25 @@ public class SseEmitterManager { } } + /** + * SSE心跳检测,关闭无效连接 + */ + public void sseMonitor() { + log.info("开始 SSE 心跳"); + USER_TOKEN_EMITTERS.forEach((userId, map) -> + map.entrySet().removeIf(e -> { + try { + e.getValue().send(SseEmitter.event().comment("heartbeat")); + return false; + } catch (Exception ex) { + log.warn("心跳失败,移除连接: userId={}, token={}", userId, e.getKey()); + e.getValue().complete(); + return true; + } + }) + ); + } + /** * 订阅SSE消息主题,并提供一个消费者函数来处理接收到的消息 *