update 优化 SSE 心跳检测逻辑,增强连接管理与异常处理

This commit is contained in:
AprilWind
2025-10-24 10:53:52 +08:00
parent 35c77403d6
commit aa1f89e253

View File

@@ -1,5 +1,6 @@
package org.dromara.common.sse.core;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.map.MapUtil;
import lombok.extern.slf4j.Slf4j;
import org.dromara.common.core.utils.SpringUtils;
@@ -8,6 +9,9 @@ import org.dromara.common.sse.dto.SseMessageDto;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
@@ -113,22 +117,41 @@ public class SseEmitterManager {
}
/**
* SSE心跳检测关闭无效连接
* SSE 心跳检测,关闭无效连接
*/
public void sseMonitor() {
log.info("开始 SSE 心跳");
USER_TOKEN_EMITTERS.forEach((userId, map) ->
map.entrySet().removeIf(e -> {
final SseEmitter.SseEventBuilder heartbeat = SseEmitter.event().comment("heartbeat");
// 记录需要移除的用户ID
List<Long> toRemoveUsers = new ArrayList<>();
USER_TOKEN_EMITTERS.forEach((userId, emitterMap) -> {
if (CollUtil.isEmpty(emitterMap)) {
toRemoveUsers.add(userId);
return;
}
emitterMap.entrySet().removeIf(entry -> {
try {
e.getValue().send(SseEmitter.event().comment("heartbeat"));
entry.getValue().send(heartbeat);
return false;
} catch (Exception ex) {
log.warn("心跳失败,移除连接: userId={}, token={}", userId, e.getKey());
e.getValue().complete();
return true;
try {
entry.getValue().complete();
} catch (Exception ignore) {
// 忽略重复关闭异常
}
return true; // 发送失败 → 移除该连接
}
})
);
});
// 移除空连接用户
if (emitterMap.isEmpty()) {
toRemoveUsers.add(userId);
}
});
// 循环结束后统一清理空用户,避免并发修改异常
toRemoveUsers.forEach(USER_TOKEN_EMITTERS::remove);
}
/**