!776 update 优化 sse 修复相同token历史连接未关闭问题;新增心跳监测,关闭无效连接

* update 优化 sse 心跳定时器执行方式
* update 优化 sse 心跳检测写法
* update 优化 sse 修复相同token历史连接未关闭问题;新增心跳监测,关闭无效连接
This commit is contained in:
草編的戒指礻
2025-10-20 04:01:21 +00:00
committed by 疯狂的狮子Li
parent 33a6a21fdf
commit 3934e119d6

View File

@@ -2,6 +2,7 @@ package org.dromara.common.sse.core;
import cn.hutool.core.map.MapUtil; import cn.hutool.core.map.MapUtil;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.dromara.common.core.utils.SpringUtils;
import org.dromara.common.redis.utils.RedisUtils; import org.dromara.common.redis.utils.RedisUtils;
import org.dromara.common.sse.dto.SseMessageDto; import org.dromara.common.sse.dto.SseMessageDto;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
@@ -9,6 +10,8 @@ import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.io.IOException; import java.io.IOException;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer; import java.util.function.Consumer;
/** /**
@@ -26,6 +29,12 @@ public class SseEmitterManager {
private final static Map<Long, Map<String, SseEmitter>> USER_TOKEN_EMITTERS = new ConcurrentHashMap<>(); private final static Map<Long, Map<String, SseEmitter>> USER_TOKEN_EMITTERS = new ConcurrentHashMap<>();
public SseEmitterManager() {
// 定时执行 SSE 心跳检测
SpringUtils.getBean(ScheduledExecutorService.class)
.scheduleWithFixedDelay(this::sseMonitor, 60L, 60L, TimeUnit.SECONDS);
}
/** /**
* 建立与指定用户的 SSE 连接 * 建立与指定用户的 SSE 连接
* *
@@ -38,6 +47,12 @@ public class SseEmitterManager {
// 每个用户可以有多个 SSE 连接,通过 token 进行区分 // 每个用户可以有多个 SSE 连接,通过 token 进行区分
Map<String, SseEmitter> emitters = USER_TOKEN_EMITTERS.computeIfAbsent(userId, k -> new ConcurrentHashMap<>()); Map<String, SseEmitter> emitters = USER_TOKEN_EMITTERS.computeIfAbsent(userId, k -> new ConcurrentHashMap<>());
// 关闭已存在的SseEmitter防止超过最大连接数
SseEmitter oldEmitter = emitters.remove(token);
if (oldEmitter != null) {
oldEmitter.complete();
}
// 创建一个新的 SseEmitter 实例,超时时间设置为一天 避免连接之后直接关闭浏览器导致连接停滞 // 创建一个新的 SseEmitter 实例,超时时间设置为一天 避免连接之后直接关闭浏览器导致连接停滞
SseEmitter emitter = new SseEmitter(86400000L); SseEmitter emitter = new SseEmitter(86400000L);
@@ -97,6 +112,25 @@ public class SseEmitterManager {
} }
} }
/**
* SSE心跳检测关闭无效连接
*/
public void sseMonitor() {
log.info("开始 SSE 心跳");
USER_TOKEN_EMITTERS.forEach((userId, map) ->
map.entrySet().removeIf(e -> {
try {
e.getValue().send(SseEmitter.event().comment("heartbeat"));
return false;
} catch (Exception ex) {
log.warn("心跳失败,移除连接: userId={}, token={}", userId, e.getKey());
e.getValue().complete();
return true;
}
})
);
}
/** /**
* 订阅SSE消息主题并提供一个消费者函数来处理接收到的消息 * 订阅SSE消息主题并提供一个消费者函数来处理接收到的消息
* *