diff --git a/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/core/SseEmitterManager.java b/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/core/SseEmitterManager.java index bc19460f8..313f0291a 100644 --- a/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/core/SseEmitterManager.java +++ b/ruoyi-common/ruoyi-common-sse/src/main/java/org/dromara/common/sse/core/SseEmitterManager.java @@ -2,6 +2,7 @@ package org.dromara.common.sse.core; import cn.hutool.core.map.MapUtil; import lombok.extern.slf4j.Slf4j; +import org.dromara.common.core.utils.SpringUtils; import org.dromara.common.redis.utils.RedisUtils; import org.dromara.common.sse.dto.SseMessageDto; import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; @@ -9,6 +10,8 @@ import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; import java.io.IOException; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.function.Consumer; /** @@ -26,6 +29,12 @@ public class SseEmitterManager { private final static Map> USER_TOKEN_EMITTERS = new ConcurrentHashMap<>(); + public SseEmitterManager() { + // 定时执行 SSE 心跳检测 + SpringUtils.getBean(ScheduledExecutorService.class) + .scheduleWithFixedDelay(this::sseMonitor, 60L, 60L, TimeUnit.SECONDS); + } + /** * 建立与指定用户的 SSE 连接 * @@ -38,6 +47,12 @@ public class SseEmitterManager { // 每个用户可以有多个 SSE 连接,通过 token 进行区分 Map emitters = USER_TOKEN_EMITTERS.computeIfAbsent(userId, k -> new ConcurrentHashMap<>()); + // 关闭已存在的SseEmitter,防止超过最大连接数 + SseEmitter oldEmitter = emitters.remove(token); + if (oldEmitter != null) { + oldEmitter.complete(); + } + // 创建一个新的 SseEmitter 实例,超时时间设置为一天 避免连接之后直接关闭浏览器导致连接停滞 SseEmitter emitter = new SseEmitter(86400000L); @@ -97,6 +112,25 @@ public class SseEmitterManager { } } + /** + * SSE心跳检测,关闭无效连接 + */ + public void sseMonitor() { + log.info("开始 SSE 心跳"); + USER_TOKEN_EMITTERS.forEach((userId, map) -> + map.entrySet().removeIf(e -> { + try { + e.getValue().send(SseEmitter.event().comment("heartbeat")); + return false; + } catch (Exception ex) { + log.warn("心跳失败,移除连接: userId={}, token={}", userId, e.getKey()); + e.getValue().complete(); + return true; + } + }) + ); + } + /** * 订阅SSE消息主题,并提供一个消费者函数来处理接收到的消息 *