refactor(common-push): 优化消息推送模块配置与连接管理

- 修正 SSE/WebSocket 自动装配条件,message.enabled=false 时不再注册推送组件
  - 将 SSE 超时、心跳间隔、WebSocket 发送超时和缓冲区大小下沉到配置项
  - 将推送会话管理器改为构造器注入定时线程池,减少 SpringUtils 服务定位
  - WebSocket 传输异常时主动移除并关闭会话,避免异常连接残留
  - WebSocket 握手改为从 ServerHttpRequest 读取客户端标识,降低线程上下文依赖
  - 补充 Redis topic 空消息防御,避免异常消息触发 NPE
  - PushDTO 增加静态工厂方法,简化推送参数构建
This commit is contained in:
疯狂的狮子Li
2026-05-16 15:11:34 +08:00
parent 287cce6238
commit 3cea6f294f
11 changed files with 155 additions and 50 deletions

View File

@@ -220,6 +220,14 @@ message:
path: /resource/message
# websocket 允许的跨域来源
allowedOrigins: '*'
# SSE 连接超时时间,单位毫秒
sse-timeout: 86400000
# 本地连接心跳检测间隔,单位秒
heartbeat-interval: 60
# WebSocket 单次发送超时时间,单位毫秒
web-socket-send-time-limit: 10000
# WebSocket 发送缓冲区大小
web-socket-buffer-size-limit: 64000
--- # warm-flow工作流配置
warm-flow:

View File

@@ -3,17 +3,20 @@ package org.dromara.common.push.config;
import org.dromara.common.push.controller.SseController;
import org.dromara.common.push.core.SseEmitterSessionManager;
import org.dromara.common.push.listener.MessageTopicListener;
import org.dromara.common.push.properties.MessageProperties;
import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.context.annotation.Bean;
import java.util.concurrent.ScheduledExecutorService;
/**
* SSE 消息推送自动装配。
*
* @author Lion Li
*/
@AutoConfiguration(after = MessageAutoConfiguration.class)
@ConditionalOnProperty(prefix = "message", name = "transport", havingValue = "sse", matchIfMissing = true)
@ConditionalOnExpression("'${message.enabled:true}'.equalsIgnoreCase('true') && '${message.transport:sse}'.equalsIgnoreCase('sse')")
public class MessageSseConfiguration {
/**
@@ -23,8 +26,9 @@ public class MessageSseConfiguration {
* @return SseEmitterSessionManager 实例
*/
@Bean
public SseEmitterSessionManager sseEmitterManager() {
return new SseEmitterSessionManager();
public SseEmitterSessionManager sseEmitterManager(ScheduledExecutorService scheduledExecutorService,
MessageProperties messageProperties) {
return new SseEmitterSessionManager(scheduledExecutorService, messageProperties);
}
/**

View File

@@ -6,13 +6,15 @@ import org.dromara.common.push.handler.PlusWebSocketHandler;
import org.dromara.common.push.interceptor.PlusWebSocketInterceptor;
import org.dromara.common.push.properties.MessageProperties;
import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.context.annotation.Bean;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.server.HandshakeInterceptor;
import java.util.concurrent.ScheduledExecutorService;
/**
* WebSocket 消息推送自动装配。
*
@@ -20,7 +22,7 @@ import org.springframework.web.socket.server.HandshakeInterceptor;
*/
@EnableWebSocket
@AutoConfiguration(after = MessageAutoConfiguration.class)
@ConditionalOnProperty(prefix = "message", name = "transport", havingValue = "websocket")
@ConditionalOnExpression("'${message.enabled:true}'.equalsIgnoreCase('true') && '${message.transport:sse}'.equalsIgnoreCase('websocket')")
public class MessageWebSocketConfiguration {
/**
@@ -42,8 +44,9 @@ public class MessageWebSocketConfiguration {
* 负责连接管理、消息发送、定时清理失效会话
*/
@Bean
public WebSocketSessionManager webSocketSessionManager() {
return new WebSocketSessionManager();
public WebSocketSessionManager webSocketSessionManager(ScheduledExecutorService scheduledExecutorService,
MessageProperties messageProperties) {
return new WebSocketSessionManager(scheduledExecutorService, messageProperties);
}
/**
@@ -60,8 +63,9 @@ public class MessageWebSocketConfiguration {
* 处理连接、消息、心跳、断开、异常等事件
*/
@Bean
public WebSocketHandler webSocketHandler(WebSocketSessionManager webSocketSessionManager) {
return new PlusWebSocketHandler(webSocketSessionManager);
public WebSocketHandler webSocketHandler(WebSocketSessionManager webSocketSessionManager,
MessageProperties messageProperties) {
return new PlusWebSocketHandler(webSocketSessionManager, messageProperties);
}
/**

View File

@@ -3,10 +3,10 @@ package org.dromara.common.push.core;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.map.MapUtil;
import lombok.extern.slf4j.Slf4j;
import org.dromara.common.core.utils.SpringUtils;
import org.dromara.common.json.utils.JsonUtils;
import org.dromara.common.push.constant.MessageConstants;
import org.dromara.common.push.dto.PushDTO;
import org.dromara.common.push.properties.MessageProperties;
import org.dromara.common.redis.utils.RedisUtils;
import org.dromara.system.api.domain.PushPayloadDTO;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
@@ -30,10 +30,17 @@ public class SseEmitterSessionManager implements PushSessionManager {
private final static Map<Long, Map<String, SseEmitter>> USER_TOKEN_EMITTERS = new ConcurrentHashMap<>();
public SseEmitterSessionManager() {
private final MessageProperties messageProperties;
public SseEmitterSessionManager(ScheduledExecutorService scheduledExecutorService, MessageProperties messageProperties) {
this.messageProperties = messageProperties;
// 定时执行 SSE 心跳检测
SpringUtils.getBean(ScheduledExecutorService.class)
.scheduleWithFixedDelay(this::sseMonitor, 60L, 60L, TimeUnit.SECONDS);
scheduledExecutorService.scheduleWithFixedDelay(
this::sseMonitor,
messageProperties.getHeartbeatInterval(),
messageProperties.getHeartbeatInterval(),
TimeUnit.SECONDS
);
}
/**
@@ -54,8 +61,8 @@ public class SseEmitterSessionManager implements PushSessionManager {
oldEmitter.complete();
}
// 创建一个新的 SseEmitter 实例,超时时间设置为一天 避免连接之后直接关闭浏览器导致连接停滞
SseEmitter emitter = new SseEmitter(86400000L);
// 创建一个新的 SseEmitter 实例,避免连接之后直接关闭浏览器导致连接停滞
SseEmitter emitter = new SseEmitter(messageProperties.getSseTimeout());
emitters.put(token, emitter);
@@ -195,6 +202,9 @@ public class SseEmitterSessionManager implements PushSessionManager {
*/
@Override
public void sendMessage(Long userId, PushPayloadDTO payload) {
if (payload == null) {
return;
}
sendMessage(userId, JsonUtils.toJsonString(payload));
}
@@ -229,6 +239,9 @@ public class SseEmitterSessionManager implements PushSessionManager {
*/
@Override
public void sendMessage(PushPayloadDTO payload) {
if (payload == null) {
return;
}
sendMessage(JsonUtils.toJsonString(payload));
}
@@ -239,6 +252,9 @@ public class SseEmitterSessionManager implements PushSessionManager {
*/
@Override
public void publishMessage(PushDTO pushDTO) {
if (pushDTO == null || pushDTO.getPayload() == null) {
return;
}
RedisUtils.publish(MessageConstants.MESSAGE_TOPIC, pushDTO, consumer -> log.info(
"发送主题订阅消息topic:{} userIds:{} message:{}",
MessageConstants.MESSAGE_TOPIC,
@@ -263,11 +279,6 @@ public class SseEmitterSessionManager implements PushSessionManager {
*/
@Override
public void publishAll(PushPayloadDTO payload) {
PushDTO dto = new PushDTO();
dto.setPayload(payload);
RedisUtils.publish(MessageConstants.MESSAGE_TOPIC, dto, consumer -> {
log.info("发送主题订阅消息topic:{} type:{} source:{} message:{}",
MessageConstants.MESSAGE_TOPIC, payload.getType(), payload.getSource(), payload.getMessage());
});
publishMessage(PushDTO.broadcast(payload));
}
}

View File

@@ -3,9 +3,9 @@ package org.dromara.common.push.core;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.map.MapUtil;
import lombok.extern.slf4j.Slf4j;
import org.dromara.common.core.utils.SpringUtils;
import org.dromara.common.json.utils.JsonUtils;
import org.dromara.common.push.dto.PushDTO;
import org.dromara.common.push.properties.MessageProperties;
import org.dromara.common.redis.utils.RedisUtils;
import org.dromara.system.api.domain.PushPayloadDTO;
import org.springframework.web.socket.*;
@@ -40,9 +40,13 @@ public class WebSocketSessionManager implements PushSessionManager {
* 构造函数
* 初始化定时任务每60秒执行一次会话监控自动清理无效连接
*/
public WebSocketSessionManager() {
SpringUtils.getBean(ScheduledExecutorService.class)
.scheduleWithFixedDelay(this::sessionMonitor, 60L, 60L, TimeUnit.SECONDS);
public WebSocketSessionManager(ScheduledExecutorService scheduledExecutorService, MessageProperties messageProperties) {
scheduledExecutorService.scheduleWithFixedDelay(
this::sessionMonitor,
messageProperties.getHeartbeatInterval(),
messageProperties.getHeartbeatInterval(),
TimeUnit.SECONDS
);
}
/**
@@ -68,6 +72,17 @@ public class WebSocketSessionManager implements PushSessionManager {
* @param token 客户端唯一标识
*/
public void disconnect(Long userId, String token) {
disconnect(userId, token, CloseStatus.NORMAL);
}
/**
* 用户断开WebSocket连接
*
* @param userId 用户ID
* @param token 客户端唯一标识
* @param status 关闭状态码
*/
public void disconnect(Long userId, String token, CloseStatus status) {
if (userId == null || token == null) {
return;
}
@@ -77,7 +92,7 @@ public class WebSocketSessionManager implements PushSessionManager {
return;
}
// 移除指定token会话并关闭
closeSession(sessions.remove(token), CloseStatus.NORMAL);
closeSession(sessions.remove(token), status);
// 该用户无任何会话时,从缓存中移除
if (sessions.isEmpty()) {
USER_TOKEN_SESSIONS.remove(userId);
@@ -163,6 +178,9 @@ public class WebSocketSessionManager implements PushSessionManager {
*/
@Override
public void sendMessage(PushPayloadDTO payload) {
if (payload == null) {
return;
}
USER_TOKEN_SESSIONS.keySet().forEach(userId -> sendMessage(userId, payload));
}
@@ -174,6 +192,9 @@ public class WebSocketSessionManager implements PushSessionManager {
*/
@Override
public void publishMessage(PushDTO pushDTO) {
if (pushDTO == null || pushDTO.getPayload() == null) {
return;
}
RedisUtils.publish(MESSAGE_TOPIC, pushDTO, consumer -> log.info(
"WebSocket发送主题订阅消息topic:{} userIds:{} message:{}",
MESSAGE_TOPIC,
@@ -189,9 +210,7 @@ public class WebSocketSessionManager implements PushSessionManager {
*/
@Override
public void publishAll(PushPayloadDTO payload) {
PushDTO dto = new PushDTO();
dto.setPayload(payload);
publishMessage(dto);
publishMessage(PushDTO.broadcast(payload));
}
/**
@@ -241,7 +260,7 @@ public class WebSocketSessionManager implements PushSessionManager {
* @param session 待关闭的会话
* @param status 关闭状态码
*/
private void closeSession(WebSocketSession session, CloseStatus status) {
public void closeSession(WebSocketSession session, CloseStatus status) {
if (session == null) {
return;
}

View File

@@ -27,4 +27,28 @@ public class PushDTO implements Serializable {
* 推送消息体。
*/
private PushPayloadDTO payload;
/**
* 构建指定用户推送消息。
*
* @param userIds 目标用户 ID 列表
* @param payload 推送消息体
* @return 推送 DTO
*/
public static PushDTO of(List<Long> userIds, PushPayloadDTO payload) {
PushDTO dto = new PushDTO();
dto.setUserIds(userIds);
dto.setPayload(payload);
return dto;
}
/**
* 构建广播推送消息。
*
* @param payload 推送消息体
* @return 推送 DTO
*/
public static PushDTO broadcast(PushPayloadDTO payload) {
return of(null, payload);
}
}

View File

@@ -9,6 +9,7 @@ import org.dromara.common.core.utils.StringUtils;
import org.dromara.common.push.constant.MessageConstants;
import org.dromara.common.push.core.WebSocketSessionManager;
import org.dromara.common.push.dto.PushDTO;
import org.dromara.common.push.properties.MessageProperties;
import org.dromara.system.api.domain.PushPayloadDTO;
import org.dromara.system.api.model.LoginUser;
import org.springframework.web.socket.*;
@@ -33,6 +34,11 @@ public class PlusWebSocketHandler extends AbstractWebSocketHandler {
*/
private final WebSocketSessionManager webSocketSessionManager;
/**
* 消息推送配置
*/
private final MessageProperties messageProperties;
/**
* 建立WebSocket连接后触发
* 校验用户登录信息,注册会话
@@ -56,7 +62,11 @@ public class PlusWebSocketHandler extends AbstractWebSocketHandler {
webSocketSessionManager.connect(
loginUser.getUserId(),
token,
new ConcurrentWebSocketSessionDecorator(session, 10 * 1000, 64_000)
new ConcurrentWebSocketSessionDecorator(
session,
messageProperties.getWebSocketSendTimeLimit(),
messageProperties.getWebSocketBufferSizeLimit()
)
);
log.info("[connect] sessionId: {}, userId:{}, token:***{}", session.getId(), loginUser.getUserId(), StringUtils.right(token, 8));
}
@@ -82,9 +92,7 @@ public class PlusWebSocketHandler extends AbstractWebSocketHandler {
}
// 构建客户端自定义消息并发布
PushDTO dto = new PushDTO();
dto.setUserIds(List.of(loginUser.getUserId()));
dto.setPayload(PushPayloadDTO.of(
PushDTO dto = PushDTO.of(List.of(loginUser.getUserId()), PushPayloadDTO.of(
PushTypeEnum.CUSTOM,
PushSourceEnum.CLIENT,
message.getPayload(),
@@ -116,7 +124,14 @@ public class PlusWebSocketHandler extends AbstractWebSocketHandler {
*/
@Override
public void handleTransportError(WebSocketSession session, Throwable exception) {
log.error("[transport error] sessionId: {}, exception:{}", session.getId(), exception.getMessage());
log.error("[transport error] sessionId: {}, exception:{}", session.getId(), exception.getMessage(), exception);
LoginUser loginUser = (LoginUser) session.getAttributes().get(MessageConstants.LOGIN_USER_KEY);
String token = (String) session.getAttributes().get(MessageConstants.LOGIN_TOKEN_KEY);
if (ObjectUtil.hasNull(loginUser, token)) {
webSocketSessionManager.closeSession(session, CloseStatus.SERVER_ERROR);
return;
}
webSocketSessionManager.disconnect(loginUser.getUserId(), token, CloseStatus.SERVER_ERROR);
}
/**

View File

@@ -70,10 +70,7 @@ public class PushHelper {
* @param payload 消息推送体
*/
public static void publishMessage(List<Long> userIds, PushPayloadDTO payload) {
PushDTO dto = new PushDTO();
dto.setUserIds(userIds);
dto.setPayload(payload);
publishMessage(dto);
publishMessage(PushDTO.of(userIds, payload));
}
/**

View File

@@ -3,7 +3,6 @@ package org.dromara.common.push.interceptor;
import cn.dev33.satoken.exception.NotLoginException;
import cn.dev33.satoken.stp.StpUtil;
import lombok.extern.slf4j.Slf4j;
import org.dromara.common.core.utils.ServletUtils;
import org.dromara.common.core.utils.StringUtils;
import org.dromara.common.push.constant.MessageConstants;
import org.dromara.common.satoken.utils.LoginHelper;
@@ -12,6 +11,7 @@ import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServerHttpResponse;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.server.HandshakeInterceptor;
import org.springframework.web.util.UriComponentsBuilder;
import java.util.Map;
@@ -44,12 +44,15 @@ public class PlusWebSocketInterceptor implements HandshakeInterceptor {
}
// 3. 校验客户端ID防止多端冒用
String headerCid = ServletUtils.getRequest().getHeader(LoginHelper.CLIENT_KEY);
String paramCid = ServletUtils.getParameter(LoginHelper.CLIENT_KEY);
String clientId = StpUtil.getExtra(LoginHelper.CLIENT_KEY).toString();
String headerCid = request.getHeaders().getFirst(LoginHelper.CLIENT_KEY);
String paramCid = UriComponentsBuilder.fromUri(request.getURI())
.build()
.getQueryParams()
.getFirst(LoginHelper.CLIENT_KEY);
Object clientExtra = StpUtil.getExtra(LoginHelper.CLIENT_KEY);
// 客户端ID必须与请求头/参数中的一致,否则拒绝连接
if (!StringUtils.equalsAny(clientId, headerCid, paramCid)) {
if (clientExtra == null || !StringUtils.equalsAny(clientExtra.toString(), headerCid, paramCid)) {
throw NotLoginException.newInstance(StpUtil.getLoginType(),
"-100", "客户端ID与Token不匹配",
StpUtil.getTokenValue());

View File

@@ -32,12 +32,12 @@ public class MessageTopicListener implements ApplicationRunner, Ordered {
public void run(ApplicationArguments args) {
// 订阅消息主题,处理消息分发
pushSessionManager.subscribeMessage(message -> {
log.info("消息主题订阅收到消息userIds={} message={}",
message.getUserIds(),
message.getPayload() == null ? null : message.getPayload().getMessage());
if (message.getPayload() == null) {
if (message == null || message.getPayload() == null) {
return;
}
log.info("消息主题订阅收到消息userIds={} message={}",
message.getUserIds(),
message.getPayload().getMessage());
// 有指定用户 -> 单发
if (CollUtil.isNotEmpty(message.getUserIds())) {
message.getUserIds().forEach(userId -> pushSessionManager.sendMessage(userId, message.getPayload()));

View File

@@ -31,5 +31,25 @@ public class MessageProperties {
/**
* WebSocket 允许的跨域来源。
*/
private String allowedOrigins = "*";
private String[] allowedOrigins = {"*"};
/**
* SSE 连接超时时间,单位毫秒。
*/
private long sseTimeout = 86_400_000L;
/**
* 本地连接心跳检测间隔,单位秒。
*/
private long heartbeatInterval = 60L;
/**
* WebSocket 单次发送超时时间,单位毫秒。
*/
private int webSocketSendTimeLimit = 10_000;
/**
* WebSocket 发送缓冲区大小。
*/
private int webSocketBufferSizeLimit = 64_000;
}