diff --git a/ruoyi-admin/src/main/resources/application.yml b/ruoyi-admin/src/main/resources/application.yml index b8848f036..015f6a284 100644 --- a/ruoyi-admin/src/main/resources/application.yml +++ b/ruoyi-admin/src/main/resources/application.yml @@ -220,6 +220,14 @@ message: path: /resource/message # websocket 允许的跨域来源 allowedOrigins: '*' + # SSE 连接超时时间,单位毫秒 + sse-timeout: 86400000 + # 本地连接心跳检测间隔,单位秒 + heartbeat-interval: 60 + # WebSocket 单次发送超时时间,单位毫秒 + web-socket-send-time-limit: 10000 + # WebSocket 发送缓冲区大小 + web-socket-buffer-size-limit: 64000 --- # warm-flow工作流配置 warm-flow: diff --git a/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/config/MessageSseConfiguration.java b/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/config/MessageSseConfiguration.java index 2f14eb893..c67ec5d96 100644 --- a/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/config/MessageSseConfiguration.java +++ b/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/config/MessageSseConfiguration.java @@ -3,17 +3,20 @@ package org.dromara.common.push.config; import org.dromara.common.push.controller.SseController; import org.dromara.common.push.core.SseEmitterSessionManager; import org.dromara.common.push.listener.MessageTopicListener; +import org.dromara.common.push.properties.MessageProperties; import org.springframework.boot.autoconfigure.AutoConfiguration; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; import org.springframework.context.annotation.Bean; +import java.util.concurrent.ScheduledExecutorService; + /** * SSE 消息推送自动装配。 * * @author Lion Li */ @AutoConfiguration(after = MessageAutoConfiguration.class) -@ConditionalOnProperty(prefix = "message", name = "transport", havingValue = "sse", matchIfMissing = true) +@ConditionalOnExpression("'${message.enabled:true}'.equalsIgnoreCase('true') && '${message.transport:sse}'.equalsIgnoreCase('sse')") public class MessageSseConfiguration { /** @@ -23,8 +26,9 @@ public class MessageSseConfiguration { * @return SseEmitterSessionManager 实例 */ @Bean - public SseEmitterSessionManager sseEmitterManager() { - return new SseEmitterSessionManager(); + public SseEmitterSessionManager sseEmitterManager(ScheduledExecutorService scheduledExecutorService, + MessageProperties messageProperties) { + return new SseEmitterSessionManager(scheduledExecutorService, messageProperties); } /** diff --git a/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/config/MessageWebSocketConfiguration.java b/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/config/MessageWebSocketConfiguration.java index b4bbd30eb..8b17eebfc 100644 --- a/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/config/MessageWebSocketConfiguration.java +++ b/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/config/MessageWebSocketConfiguration.java @@ -6,13 +6,15 @@ import org.dromara.common.push.handler.PlusWebSocketHandler; import org.dromara.common.push.interceptor.PlusWebSocketInterceptor; import org.dromara.common.push.properties.MessageProperties; import org.springframework.boot.autoconfigure.AutoConfiguration; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; import org.springframework.context.annotation.Bean; import org.springframework.web.socket.WebSocketHandler; import org.springframework.web.socket.config.annotation.EnableWebSocket; import org.springframework.web.socket.config.annotation.WebSocketConfigurer; import org.springframework.web.socket.server.HandshakeInterceptor; +import java.util.concurrent.ScheduledExecutorService; + /** * WebSocket 消息推送自动装配。 * @@ -20,7 +22,7 @@ import org.springframework.web.socket.server.HandshakeInterceptor; */ @EnableWebSocket @AutoConfiguration(after = MessageAutoConfiguration.class) -@ConditionalOnProperty(prefix = "message", name = "transport", havingValue = "websocket") +@ConditionalOnExpression("'${message.enabled:true}'.equalsIgnoreCase('true') && '${message.transport:sse}'.equalsIgnoreCase('websocket')") public class MessageWebSocketConfiguration { /** @@ -42,8 +44,9 @@ public class MessageWebSocketConfiguration { * 负责连接管理、消息发送、定时清理失效会话 */ @Bean - public WebSocketSessionManager webSocketSessionManager() { - return new WebSocketSessionManager(); + public WebSocketSessionManager webSocketSessionManager(ScheduledExecutorService scheduledExecutorService, + MessageProperties messageProperties) { + return new WebSocketSessionManager(scheduledExecutorService, messageProperties); } /** @@ -60,8 +63,9 @@ public class MessageWebSocketConfiguration { * 处理连接、消息、心跳、断开、异常等事件 */ @Bean - public WebSocketHandler webSocketHandler(WebSocketSessionManager webSocketSessionManager) { - return new PlusWebSocketHandler(webSocketSessionManager); + public WebSocketHandler webSocketHandler(WebSocketSessionManager webSocketSessionManager, + MessageProperties messageProperties) { + return new PlusWebSocketHandler(webSocketSessionManager, messageProperties); } /** diff --git a/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/core/SseEmitterSessionManager.java b/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/core/SseEmitterSessionManager.java index 57d5e8eb8..c7f36fb85 100644 --- a/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/core/SseEmitterSessionManager.java +++ b/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/core/SseEmitterSessionManager.java @@ -3,10 +3,10 @@ package org.dromara.common.push.core; import cn.hutool.core.collection.CollUtil; import cn.hutool.core.map.MapUtil; import lombok.extern.slf4j.Slf4j; -import org.dromara.common.core.utils.SpringUtils; import org.dromara.common.json.utils.JsonUtils; import org.dromara.common.push.constant.MessageConstants; import org.dromara.common.push.dto.PushDTO; +import org.dromara.common.push.properties.MessageProperties; import org.dromara.common.redis.utils.RedisUtils; import org.dromara.system.api.domain.PushPayloadDTO; import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; @@ -30,10 +30,17 @@ public class SseEmitterSessionManager implements PushSessionManager { private final static Map> USER_TOKEN_EMITTERS = new ConcurrentHashMap<>(); - public SseEmitterSessionManager() { + private final MessageProperties messageProperties; + + public SseEmitterSessionManager(ScheduledExecutorService scheduledExecutorService, MessageProperties messageProperties) { + this.messageProperties = messageProperties; // 定时执行 SSE 心跳检测 - SpringUtils.getBean(ScheduledExecutorService.class) - .scheduleWithFixedDelay(this::sseMonitor, 60L, 60L, TimeUnit.SECONDS); + scheduledExecutorService.scheduleWithFixedDelay( + this::sseMonitor, + messageProperties.getHeartbeatInterval(), + messageProperties.getHeartbeatInterval(), + TimeUnit.SECONDS + ); } /** @@ -54,8 +61,8 @@ public class SseEmitterSessionManager implements PushSessionManager { oldEmitter.complete(); } - // 创建一个新的 SseEmitter 实例,超时时间设置为一天 避免连接之后直接关闭浏览器导致连接停滞 - SseEmitter emitter = new SseEmitter(86400000L); + // 创建一个新的 SseEmitter 实例,避免连接之后直接关闭浏览器导致连接停滞 + SseEmitter emitter = new SseEmitter(messageProperties.getSseTimeout()); emitters.put(token, emitter); @@ -195,6 +202,9 @@ public class SseEmitterSessionManager implements PushSessionManager { */ @Override public void sendMessage(Long userId, PushPayloadDTO payload) { + if (payload == null) { + return; + } sendMessage(userId, JsonUtils.toJsonString(payload)); } @@ -229,6 +239,9 @@ public class SseEmitterSessionManager implements PushSessionManager { */ @Override public void sendMessage(PushPayloadDTO payload) { + if (payload == null) { + return; + } sendMessage(JsonUtils.toJsonString(payload)); } @@ -239,6 +252,9 @@ public class SseEmitterSessionManager implements PushSessionManager { */ @Override public void publishMessage(PushDTO pushDTO) { + if (pushDTO == null || pushDTO.getPayload() == null) { + return; + } RedisUtils.publish(MessageConstants.MESSAGE_TOPIC, pushDTO, consumer -> log.info( "发送主题订阅消息topic:{} userIds:{} message:{}", MessageConstants.MESSAGE_TOPIC, @@ -263,11 +279,6 @@ public class SseEmitterSessionManager implements PushSessionManager { */ @Override public void publishAll(PushPayloadDTO payload) { - PushDTO dto = new PushDTO(); - dto.setPayload(payload); - RedisUtils.publish(MessageConstants.MESSAGE_TOPIC, dto, consumer -> { - log.info("发送主题订阅消息topic:{} type:{} source:{} message:{}", - MessageConstants.MESSAGE_TOPIC, payload.getType(), payload.getSource(), payload.getMessage()); - }); + publishMessage(PushDTO.broadcast(payload)); } } diff --git a/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/core/WebSocketSessionManager.java b/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/core/WebSocketSessionManager.java index 435fbf8d7..4061aad6e 100644 --- a/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/core/WebSocketSessionManager.java +++ b/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/core/WebSocketSessionManager.java @@ -3,9 +3,9 @@ package org.dromara.common.push.core; import cn.hutool.core.collection.CollUtil; import cn.hutool.core.map.MapUtil; import lombok.extern.slf4j.Slf4j; -import org.dromara.common.core.utils.SpringUtils; import org.dromara.common.json.utils.JsonUtils; import org.dromara.common.push.dto.PushDTO; +import org.dromara.common.push.properties.MessageProperties; import org.dromara.common.redis.utils.RedisUtils; import org.dromara.system.api.domain.PushPayloadDTO; import org.springframework.web.socket.*; @@ -40,9 +40,13 @@ public class WebSocketSessionManager implements PushSessionManager { * 构造函数 * 初始化定时任务:每60秒执行一次会话监控,自动清理无效连接 */ - public WebSocketSessionManager() { - SpringUtils.getBean(ScheduledExecutorService.class) - .scheduleWithFixedDelay(this::sessionMonitor, 60L, 60L, TimeUnit.SECONDS); + public WebSocketSessionManager(ScheduledExecutorService scheduledExecutorService, MessageProperties messageProperties) { + scheduledExecutorService.scheduleWithFixedDelay( + this::sessionMonitor, + messageProperties.getHeartbeatInterval(), + messageProperties.getHeartbeatInterval(), + TimeUnit.SECONDS + ); } /** @@ -68,6 +72,17 @@ public class WebSocketSessionManager implements PushSessionManager { * @param token 客户端唯一标识 */ public void disconnect(Long userId, String token) { + disconnect(userId, token, CloseStatus.NORMAL); + } + + /** + * 用户断开WebSocket连接 + * + * @param userId 用户ID + * @param token 客户端唯一标识 + * @param status 关闭状态码 + */ + public void disconnect(Long userId, String token, CloseStatus status) { if (userId == null || token == null) { return; } @@ -77,7 +92,7 @@ public class WebSocketSessionManager implements PushSessionManager { return; } // 移除指定token会话并关闭 - closeSession(sessions.remove(token), CloseStatus.NORMAL); + closeSession(sessions.remove(token), status); // 该用户无任何会话时,从缓存中移除 if (sessions.isEmpty()) { USER_TOKEN_SESSIONS.remove(userId); @@ -163,6 +178,9 @@ public class WebSocketSessionManager implements PushSessionManager { */ @Override public void sendMessage(PushPayloadDTO payload) { + if (payload == null) { + return; + } USER_TOKEN_SESSIONS.keySet().forEach(userId -> sendMessage(userId, payload)); } @@ -174,6 +192,9 @@ public class WebSocketSessionManager implements PushSessionManager { */ @Override public void publishMessage(PushDTO pushDTO) { + if (pushDTO == null || pushDTO.getPayload() == null) { + return; + } RedisUtils.publish(MESSAGE_TOPIC, pushDTO, consumer -> log.info( "WebSocket发送主题订阅消息topic:{} userIds:{} message:{}", MESSAGE_TOPIC, @@ -189,9 +210,7 @@ public class WebSocketSessionManager implements PushSessionManager { */ @Override public void publishAll(PushPayloadDTO payload) { - PushDTO dto = new PushDTO(); - dto.setPayload(payload); - publishMessage(dto); + publishMessage(PushDTO.broadcast(payload)); } /** @@ -241,7 +260,7 @@ public class WebSocketSessionManager implements PushSessionManager { * @param session 待关闭的会话 * @param status 关闭状态码 */ - private void closeSession(WebSocketSession session, CloseStatus status) { + public void closeSession(WebSocketSession session, CloseStatus status) { if (session == null) { return; } diff --git a/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/dto/PushDTO.java b/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/dto/PushDTO.java index e44df6f27..ff81f184c 100644 --- a/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/dto/PushDTO.java +++ b/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/dto/PushDTO.java @@ -27,4 +27,28 @@ public class PushDTO implements Serializable { * 推送消息体。 */ private PushPayloadDTO payload; + + /** + * 构建指定用户推送消息。 + * + * @param userIds 目标用户 ID 列表 + * @param payload 推送消息体 + * @return 推送 DTO + */ + public static PushDTO of(List userIds, PushPayloadDTO payload) { + PushDTO dto = new PushDTO(); + dto.setUserIds(userIds); + dto.setPayload(payload); + return dto; + } + + /** + * 构建广播推送消息。 + * + * @param payload 推送消息体 + * @return 推送 DTO + */ + public static PushDTO broadcast(PushPayloadDTO payload) { + return of(null, payload); + } } diff --git a/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/handler/PlusWebSocketHandler.java b/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/handler/PlusWebSocketHandler.java index 7c340903c..3fb38be99 100644 --- a/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/handler/PlusWebSocketHandler.java +++ b/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/handler/PlusWebSocketHandler.java @@ -9,6 +9,7 @@ import org.dromara.common.core.utils.StringUtils; import org.dromara.common.push.constant.MessageConstants; import org.dromara.common.push.core.WebSocketSessionManager; import org.dromara.common.push.dto.PushDTO; +import org.dromara.common.push.properties.MessageProperties; import org.dromara.system.api.domain.PushPayloadDTO; import org.dromara.system.api.model.LoginUser; import org.springframework.web.socket.*; @@ -33,6 +34,11 @@ public class PlusWebSocketHandler extends AbstractWebSocketHandler { */ private final WebSocketSessionManager webSocketSessionManager; + /** + * 消息推送配置 + */ + private final MessageProperties messageProperties; + /** * 建立WebSocket连接后触发 * 校验用户登录信息,注册会话 @@ -56,7 +62,11 @@ public class PlusWebSocketHandler extends AbstractWebSocketHandler { webSocketSessionManager.connect( loginUser.getUserId(), token, - new ConcurrentWebSocketSessionDecorator(session, 10 * 1000, 64_000) + new ConcurrentWebSocketSessionDecorator( + session, + messageProperties.getWebSocketSendTimeLimit(), + messageProperties.getWebSocketBufferSizeLimit() + ) ); log.info("[connect] sessionId: {}, userId:{}, token:***{}", session.getId(), loginUser.getUserId(), StringUtils.right(token, 8)); } @@ -82,9 +92,7 @@ public class PlusWebSocketHandler extends AbstractWebSocketHandler { } // 构建客户端自定义消息并发布 - PushDTO dto = new PushDTO(); - dto.setUserIds(List.of(loginUser.getUserId())); - dto.setPayload(PushPayloadDTO.of( + PushDTO dto = PushDTO.of(List.of(loginUser.getUserId()), PushPayloadDTO.of( PushTypeEnum.CUSTOM, PushSourceEnum.CLIENT, message.getPayload(), @@ -116,7 +124,14 @@ public class PlusWebSocketHandler extends AbstractWebSocketHandler { */ @Override public void handleTransportError(WebSocketSession session, Throwable exception) { - log.error("[transport error] sessionId: {}, exception:{}", session.getId(), exception.getMessage()); + log.error("[transport error] sessionId: {}, exception:{}", session.getId(), exception.getMessage(), exception); + LoginUser loginUser = (LoginUser) session.getAttributes().get(MessageConstants.LOGIN_USER_KEY); + String token = (String) session.getAttributes().get(MessageConstants.LOGIN_TOKEN_KEY); + if (ObjectUtil.hasNull(loginUser, token)) { + webSocketSessionManager.closeSession(session, CloseStatus.SERVER_ERROR); + return; + } + webSocketSessionManager.disconnect(loginUser.getUserId(), token, CloseStatus.SERVER_ERROR); } /** diff --git a/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/helper/PushHelper.java b/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/helper/PushHelper.java index 51f78f673..9ca621832 100644 --- a/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/helper/PushHelper.java +++ b/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/helper/PushHelper.java @@ -70,10 +70,7 @@ public class PushHelper { * @param payload 消息推送体 */ public static void publishMessage(List userIds, PushPayloadDTO payload) { - PushDTO dto = new PushDTO(); - dto.setUserIds(userIds); - dto.setPayload(payload); - publishMessage(dto); + publishMessage(PushDTO.of(userIds, payload)); } /** diff --git a/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/interceptor/PlusWebSocketInterceptor.java b/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/interceptor/PlusWebSocketInterceptor.java index b53a537c0..4bc1461e5 100644 --- a/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/interceptor/PlusWebSocketInterceptor.java +++ b/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/interceptor/PlusWebSocketInterceptor.java @@ -3,7 +3,6 @@ package org.dromara.common.push.interceptor; import cn.dev33.satoken.exception.NotLoginException; import cn.dev33.satoken.stp.StpUtil; import lombok.extern.slf4j.Slf4j; -import org.dromara.common.core.utils.ServletUtils; import org.dromara.common.core.utils.StringUtils; import org.dromara.common.push.constant.MessageConstants; import org.dromara.common.satoken.utils.LoginHelper; @@ -12,6 +11,7 @@ import org.springframework.http.server.ServerHttpRequest; import org.springframework.http.server.ServerHttpResponse; import org.springframework.web.socket.WebSocketHandler; import org.springframework.web.socket.server.HandshakeInterceptor; +import org.springframework.web.util.UriComponentsBuilder; import java.util.Map; @@ -44,12 +44,15 @@ public class PlusWebSocketInterceptor implements HandshakeInterceptor { } // 3. 校验客户端ID(防止多端冒用) - String headerCid = ServletUtils.getRequest().getHeader(LoginHelper.CLIENT_KEY); - String paramCid = ServletUtils.getParameter(LoginHelper.CLIENT_KEY); - String clientId = StpUtil.getExtra(LoginHelper.CLIENT_KEY).toString(); + String headerCid = request.getHeaders().getFirst(LoginHelper.CLIENT_KEY); + String paramCid = UriComponentsBuilder.fromUri(request.getURI()) + .build() + .getQueryParams() + .getFirst(LoginHelper.CLIENT_KEY); + Object clientExtra = StpUtil.getExtra(LoginHelper.CLIENT_KEY); // 客户端ID必须与请求头/参数中的一致,否则拒绝连接 - if (!StringUtils.equalsAny(clientId, headerCid, paramCid)) { + if (clientExtra == null || !StringUtils.equalsAny(clientExtra.toString(), headerCid, paramCid)) { throw NotLoginException.newInstance(StpUtil.getLoginType(), "-100", "客户端ID与Token不匹配", StpUtil.getTokenValue()); diff --git a/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/listener/MessageTopicListener.java b/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/listener/MessageTopicListener.java index a0cb451a4..9eb25c83d 100644 --- a/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/listener/MessageTopicListener.java +++ b/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/listener/MessageTopicListener.java @@ -32,12 +32,12 @@ public class MessageTopicListener implements ApplicationRunner, Ordered { public void run(ApplicationArguments args) { // 订阅消息主题,处理消息分发 pushSessionManager.subscribeMessage(message -> { - log.info("消息主题订阅收到消息userIds={} message={}", - message.getUserIds(), - message.getPayload() == null ? null : message.getPayload().getMessage()); - if (message.getPayload() == null) { + if (message == null || message.getPayload() == null) { return; } + log.info("消息主题订阅收到消息userIds={} message={}", + message.getUserIds(), + message.getPayload().getMessage()); // 有指定用户 -> 单发 if (CollUtil.isNotEmpty(message.getUserIds())) { message.getUserIds().forEach(userId -> pushSessionManager.sendMessage(userId, message.getPayload())); diff --git a/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/properties/MessageProperties.java b/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/properties/MessageProperties.java index f444123ef..7d0aeaebf 100644 --- a/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/properties/MessageProperties.java +++ b/ruoyi-common/ruoyi-common-push/src/main/java/org/dromara/common/push/properties/MessageProperties.java @@ -31,5 +31,25 @@ public class MessageProperties { /** * WebSocket 允许的跨域来源。 */ - private String allowedOrigins = "*"; + private String[] allowedOrigins = {"*"}; + + /** + * SSE 连接超时时间,单位毫秒。 + */ + private long sseTimeout = 86_400_000L; + + /** + * 本地连接心跳检测间隔,单位秒。 + */ + private long heartbeatInterval = 60L; + + /** + * WebSocket 单次发送超时时间,单位毫秒。 + */ + private int webSocketSendTimeLimit = 10_000; + + /** + * WebSocket 发送缓冲区大小。 + */ + private int webSocketBufferSizeLimit = 64_000; }