refactor: 使用事件解耦业务副作用并优化虚拟线程并发

- 新增登录成功事件,解耦在线用户缓存、登录日志和最近登录信息更新
  - 新增在线用户清理事件,角色授权变更后异步清理受影响用户会话
  - 新增 OSS 配置变更事件,统一处理配置初始化、删除和默认配置切换后的缓存刷新
  - 新增工作流抄送、待办消息、结果消息事件,拆分全局监听器中的副作用逻辑
  - 扩展 ThreadUtils 支持带返回值的虚拟线程批量执行
  - 在线用户监控批量读取 Redis 会话信息时使用虚拟线程并发处理
  - OSS 多文件查询、URL 解析和 DTO 转换使用虚拟线程并发处理
  - 工作流办理人回显和用户解析按类型并发查询
  - 工作流多渠道消息发送改为并发执行
  - WebSocket 与 SSE 广播消息按用户并发发送
This commit is contained in:
疯狂的狮子Li
2026-05-16 18:44:24 +08:00
parent f171ac03c4
commit 84afd6e6c0
23 changed files with 579 additions and 146 deletions
@@ -0,0 +1,13 @@
package org.dromara.web.event;
import cn.dev33.satoken.stp.parameter.SaLoginParameter;
/**
* 用户登录成功事件。
*
* @param loginId 登录标识
* @param tokenValue token 值
* @param loginParameter 登录参数
*/
public record UserLoginSuccessEvent(Object loginId, String tokenValue, SaLoginParameter loginParameter) {
}
@@ -2,73 +2,29 @@ package org.dromara.web.listener;
import cn.dev33.satoken.listener.SaTokenListener;
import cn.dev33.satoken.stp.parameter.SaLoginParameter;
import cn.hutool.http.useragent.UserAgent;
import cn.hutool.http.useragent.UserAgentUtil;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.dromara.common.core.constant.CacheNames;
import org.dromara.common.core.constant.Constants;
import org.dromara.common.core.utils.MessageUtils;
import org.dromara.common.core.utils.ServletUtils;
import org.dromara.common.core.utils.SpringUtils;
import org.dromara.common.core.utils.StringUtils;
import org.dromara.common.core.utils.ip.AddressUtils;
import org.dromara.common.log.event.LoginInfoEvent;
import org.dromara.common.redis.utils.RedisUtils;
import org.dromara.common.satoken.utils.LoginHelper;
import org.dromara.system.api.domain.UserOnlineDTO;
import org.dromara.web.service.SysLoginService;
import org.dromara.web.event.UserLoginSuccessEvent;
import org.springframework.stereotype.Component;
import java.time.Duration;
/**
* 用户行为监听器,用于同步在线状态和登录日志。
*
* @author Lion Li
*/
@RequiredArgsConstructor
@Component
@Slf4j
public class UserActionListener implements SaTokenListener {
private final SysLoginService loginService;
/**
* 登录成功后记录在线信息并写入登录日志。
*/
@Override
public void doLogin(String loginType, Object loginId, String tokenValue, SaLoginParameter loginParameter) {
UserAgent userAgent = UserAgentUtil.parse(ServletUtils.getRequest().getHeader("User-Agent"));
String ip = ServletUtils.getClientIP();
UserOnlineDTO dto = new UserOnlineDTO();
dto.setIpaddr(ip);
dto.setLoginLocation(AddressUtils.getRealAddressByIP(ip));
dto.setBrowser(userAgent.getBrowser().getName());
dto.setOs(userAgent.getOs().getName());
dto.setLoginTime(System.currentTimeMillis());
dto.setTokenId(tokenValue);
String username = (String) loginParameter.getExtra(LoginHelper.USER_NAME_KEY);
dto.setUserName(username);
dto.setClientKey((String) loginParameter.getExtra(LoginHelper.CLIENT_KEY));
dto.setDeviceType(loginParameter.getDeviceType());
dto.setDeptName((String) loginParameter.getExtra(LoginHelper.DEPT_NAME_KEY));
if (loginParameter.getTimeout() == -1) {
RedisUtils.setCacheObject(CacheNames.ONLINE_TOKEN_KEY + tokenValue, dto);
} else {
RedisUtils.setCacheObject(CacheNames.ONLINE_TOKEN_KEY + tokenValue, dto, Duration.ofSeconds(loginParameter.getTimeout()));
}
// 记录登录日志
LoginInfoEvent loginInfoEvent = new LoginInfoEvent();
loginInfoEvent.setUsername(username);
loginInfoEvent.setStatus(Constants.LOGIN_SUCCESS);
loginInfoEvent.setMessage(MessageUtils.message("user.login.success"));
loginInfoEvent.setRequest(ServletUtils.getRequest());
SpringUtils.context().publishEvent(loginInfoEvent);
// 更新登录信息
loginService.recordLoginInfo((Long) loginParameter.getExtra(LoginHelper.USER_KEY), ip);
log.info("user doLogin, userId:{}, token:***{}", loginId, StringUtils.right(tokenValue, 8));
SpringUtils.context().publishEvent(new UserLoginSuccessEvent(loginId, tokenValue, loginParameter));
}
/**
@@ -0,0 +1,71 @@
package org.dromara.web.listener;
import cn.dev33.satoken.stp.parameter.SaLoginParameter;
import cn.hutool.http.useragent.UserAgent;
import cn.hutool.http.useragent.UserAgentUtil;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.dromara.common.core.constant.CacheNames;
import org.dromara.common.core.constant.Constants;
import org.dromara.common.core.utils.MessageUtils;
import org.dromara.common.core.utils.ServletUtils;
import org.dromara.common.core.utils.StringUtils;
import org.dromara.common.core.utils.ip.AddressUtils;
import org.dromara.common.redis.utils.RedisUtils;
import org.dromara.common.satoken.utils.LoginHelper;
import org.dromara.system.api.domain.UserOnlineDTO;
import org.dromara.web.event.UserLoginSuccessEvent;
import org.dromara.web.service.SysLoginService;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;
import java.time.Duration;
/**
* 用户登录成功监听器。
*
* @author Lion Li
*/
@RequiredArgsConstructor
@Component
@Slf4j
public class UserLoginSuccessListener {
private final SysLoginService loginService;
/**
* 登录成功后记录在线信息、登录日志与最近登录信息。
*
* @param event 用户登录成功事件
*/
@EventListener
public void handleLoginSuccess(UserLoginSuccessEvent event) {
SaLoginParameter loginParameter = event.loginParameter();
UserAgent userAgent = UserAgentUtil.parse(ServletUtils.getRequest().getHeader("User-Agent"));
String ip = ServletUtils.getClientIP();
String username = (String) loginParameter.getExtra(LoginHelper.USER_NAME_KEY);
String tokenValue = event.tokenValue();
UserOnlineDTO dto = new UserOnlineDTO();
dto.setIpaddr(ip);
dto.setLoginLocation(AddressUtils.getRealAddressByIP(ip));
dto.setBrowser(userAgent.getBrowser().getName());
dto.setOs(userAgent.getOs().getName());
dto.setLoginTime(System.currentTimeMillis());
dto.setTokenId(tokenValue);
dto.setUserName(username);
dto.setClientKey((String) loginParameter.getExtra(LoginHelper.CLIENT_KEY));
dto.setDeviceType(loginParameter.getDeviceType());
dto.setDeptName((String) loginParameter.getExtra(LoginHelper.DEPT_NAME_KEY));
if (loginParameter.getTimeout() == -1) {
RedisUtils.setCacheObject(CacheNames.ONLINE_TOKEN_KEY + tokenValue, dto);
} else {
RedisUtils.setCacheObject(CacheNames.ONLINE_TOKEN_KEY + tokenValue, dto, Duration.ofSeconds(loginParameter.getTimeout()));
}
loginService.recordLoginInfo(username, Constants.LOGIN_SUCCESS, MessageUtils.message("user.login.success"));
loginService.updateLastLoginInfo((Long) loginParameter.getExtra(LoginHelper.USER_KEY), ip);
log.info("user doLogin, userId:{}, token:***{}", event.loginId(), StringUtils.right(tokenValue, 8));
}
}
@@ -175,7 +175,7 @@ public class SysLoginService {
* @param userId 用户ID
* @param ip 登录IP
*/
public void recordLoginInfo(Long userId, String ip) {
public void updateLastLoginInfo(Long userId, String ip) {
SysUser sysUser = new SysUser();
sysUser.setUserId(userId);
sysUser.setLoginIp(ip);
@@ -2,11 +2,12 @@ package org.dromara.common.core.utils;
import lombok.AccessLevel;
import lombok.NoArgsConstructor;
import org.dromara.common.core.exception.ServiceException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.*;
import java.util.function.Supplier;
/**
* 线程工具
@@ -38,4 +39,43 @@ public class ThreadUtils {
}
/**
* 批量执行有返回值的任务
*
* @param supplierList 任务列表
* @param <T> 返回值类型
* @return 按提交顺序返回的任务结果
*/
@SafeVarargs
public static <T> List<T> virtualSubmitAll(Supplier<T>... supplierList) {
return virtualSubmitAll(List.of(supplierList));
}
/**
* 批量执行有返回值的任务
*
* @param supplierList 任务列表
* @param <T> 返回值类型
* @return 按提交顺序返回的任务结果
*/
public static <T> List<T> virtualSubmitAll(Collection<? extends Supplier<T>> supplierList) {
List<Future<T>> futureList = new ArrayList<>();
try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) {
for (Supplier<T> supplier : supplierList) {
futureList.add(executor.submit(supplier::get));
}
List<T> resultList = new ArrayList<>(futureList.size());
for (Future<T> future : futureList) {
resultList.add(future.get());
}
return resultList;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("线程执行被中断", e);
} catch (ExecutionException e) {
Throwable cause = e.getCause() == null ? e : e.getCause();
throw new RuntimeException("线程执行异常:" + cause.getMessage(), cause);
}
}
}
@@ -3,6 +3,7 @@ package org.dromara.common.push.core;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.map.MapUtil;
import lombok.extern.slf4j.Slf4j;
import org.dromara.common.core.utils.ThreadUtils;
import org.dromara.common.json.utils.JsonUtils;
import org.dromara.common.push.constant.MessageConstants;
import org.dromara.common.push.dto.PushDTO;
@@ -227,9 +228,11 @@ public class SseEmitterSessionManager implements PushSessionManager {
* @param message 要发送的消息内容
*/
public void sendMessage(String message) {
for (Long userId : USER_TOKEN_EMITTERS.keySet()) {
sendMessage(userId, message);
}
List<Long> userIds = new ArrayList<>(USER_TOKEN_EMITTERS.keySet());
Runnable[] sendTasks = userIds.stream()
.map(userId -> (Runnable) () -> sendMessage(userId, message))
.toArray(Runnable[]::new);
ThreadUtils.virtualInvokeAll(sendTasks);
}
/**
@@ -3,6 +3,7 @@ package org.dromara.common.push.core;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.map.MapUtil;
import lombok.extern.slf4j.Slf4j;
import org.dromara.common.core.utils.ThreadUtils;
import org.dromara.common.json.utils.JsonUtils;
import org.dromara.common.push.dto.PushDTO;
import org.dromara.common.push.properties.MessageProperties;
@@ -181,7 +182,11 @@ public class WebSocketSessionManager implements PushSessionManager {
if (payload == null) {
return;
}
USER_TOKEN_SESSIONS.keySet().forEach(userId -> sendMessage(userId, payload));
List<Long> userIds = new ArrayList<>(USER_TOKEN_SESSIONS.keySet());
Runnable[] sendTasks = userIds.stream()
.map(userId -> (Runnable) () -> sendMessage(userId, payload))
.toArray(Runnable[]::new);
ThreadUtils.virtualInvokeAll(sendTasks);
}
/**
@@ -10,6 +10,7 @@ import org.dromara.common.core.domain.PageResult;
import org.dromara.common.core.domain.R;
import org.dromara.common.core.utils.StreamUtils;
import org.dromara.common.core.utils.StringUtils;
import org.dromara.common.core.utils.ThreadUtils;
import org.dromara.common.log.annotation.Log;
import org.dromara.common.log.enums.BusinessType;
import org.dromara.common.redis.annotation.RepeatSubmit;
@@ -19,11 +20,10 @@ import org.dromara.system.api.domain.UserOnlineDTO;
import org.dromara.system.domain.SysUserOnline;
import org.springframework.web.bind.annotation.*;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import java.util.function.Supplier;
/**
* 在线用户监控
@@ -47,15 +47,16 @@ public class SysUserOnlineController extends BaseController {
public R<PageResult<SysUserOnline>> list(String ipaddr, String userName) {
// 获取所有未过期的 token
Collection<String> keys = RedisUtils.keys(CacheNames.ONLINE_TOKEN_KEY + "*");
List<UserOnlineDTO> userOnlineDTOList = new ArrayList<>();
for (String key : keys) {
List<Supplier<UserOnlineDTO>> suppliers = keys.stream().map(key -> (Supplier<UserOnlineDTO>) () -> {
String token = StringUtils.substringAfterLast(key, ":");
// 如果已经过期则跳过
if (StpUtil.stpLogic.getTokenActiveTimeoutByToken(token) < -1) {
continue;
return null;
}
userOnlineDTOList.add(RedisUtils.getCacheObject(CacheNames.ONLINE_TOKEN_KEY + token));
}
return RedisUtils.getCacheObject(CacheNames.ONLINE_TOKEN_KEY + token);
}).toList();
List<UserOnlineDTO> userOnlineDTOList = ThreadUtils.virtualSubmitAll(suppliers);
userOnlineDTOList.removeAll(Collections.singleton(null));
if (StringUtils.isNotEmpty(ipaddr) && StringUtils.isNotEmpty(userName)) {
userOnlineDTOList = StreamUtils.filter(userOnlineDTOList, userOnline ->
StringUtils.equals(ipaddr, userOnline.getIpaddr()) &&
@@ -71,7 +72,6 @@ public class SysUserOnlineController extends BaseController {
);
}
Collections.reverse(userOnlineDTOList);
userOnlineDTOList.removeAll(Collections.singleton(null));
List<SysUserOnline> userOnlineList = BeanUtil.copyToList(userOnlineDTOList, SysUserOnline.class);
return R.ok(PageResult.build(userOnlineList));
}
@@ -103,10 +103,13 @@ public class SysUserOnlineController extends BaseController {
public R<PageResult<SysUserOnline>> getInfo() {
// 获取指定账号 id 的 token 集合
List<String> tokenIds = StpUtil.getTokenValueListByLoginId(StpUtil.getLoginIdAsString());
List<UserOnlineDTO> userOnlineDTOList = tokenIds.stream()
.filter(token -> StpUtil.stpLogic.getTokenActiveTimeoutByToken(token) >= -1)
.map(token -> (UserOnlineDTO) RedisUtils.getCacheObject(CacheNames.ONLINE_TOKEN_KEY + token))
.collect(Collectors.toList());
List<Supplier<UserOnlineDTO>> suppliers = tokenIds.stream().map(token -> (Supplier<UserOnlineDTO>) () -> {
if (StpUtil.stpLogic.getTokenActiveTimeoutByToken(token) < -1) {
return null;
}
return RedisUtils.getCacheObject(CacheNames.ONLINE_TOKEN_KEY + token);
}).toList();
List<UserOnlineDTO> userOnlineDTOList = ThreadUtils.virtualSubmitAll(suppliers);
//复制和处理 SysUserOnline 对象列表
Collections.reverse(userOnlineDTOList);
userOnlineDTOList.removeAll(Collections.singleton(null));
@@ -6,6 +6,7 @@ import jakarta.servlet.http.HttpServletResponse;
import lombok.RequiredArgsConstructor;
import org.dromara.common.core.domain.PageResult;
import org.dromara.common.core.domain.R;
import org.dromara.common.core.utils.SpringUtils;
import org.dromara.common.excel.utils.ExcelBuilder;
import org.dromara.common.log.annotation.Log;
import org.dromara.common.log.enums.BusinessType;
@@ -18,6 +19,7 @@ import org.dromara.system.domain.bo.SysRoleBo;
import org.dromara.system.domain.bo.SysUserBo;
import org.dromara.system.domain.vo.SysRoleVo;
import org.dromara.system.domain.vo.SysUserVo;
import org.dromara.system.event.OnlineUserCleanEvent;
import org.dromara.system.service.ISysDeptService;
import org.dromara.system.service.ISysRoleService;
import org.dromara.system.service.ISysUserService;
@@ -123,7 +125,7 @@ public class SysRoleController extends BaseController {
}
if (roleService.updateRoleBaseInfo(role) > 0) {
roleService.cleanOnlineUserByRole(role.getRoleId());
SpringUtils.context().publishEvent(OnlineUserCleanEvent.byRole(role.getRoleId()));
return R.ok();
}
return R.fail("修改角色'" + role.getRoleName() + "'失败,请联系管理员");
@@ -143,7 +145,7 @@ public class SysRoleController extends BaseController {
roleService.checkRoleAllowed(role);
roleService.checkRoleDataScope(role.getRoleId());
if (roleService.updateRolePermission(role) > 0) {
roleService.cleanOnlineUserByRole(role.getRoleId());
SpringUtils.context().publishEvent(OnlineUserCleanEvent.byRole(role.getRoleId()));
return R.ok();
}
return R.fail("修改角色'" + role.getRoleName() + "'权限失败,请联系管理员");
@@ -163,7 +165,7 @@ public class SysRoleController extends BaseController {
roleService.checkRoleAllowed(role);
roleService.checkRoleDataScope(role.getRoleId());
if (roleService.updateRoleStatus(role.getRoleId(), role.getStatus()) > 0) {
roleService.cleanOnlineUserByRole(role.getRoleId());
SpringUtils.context().publishEvent(OnlineUserCleanEvent.byRole(role.getRoleId()));
return R.ok();
}
return R.fail("修改角色'" + role.getRoleName() + "'状态失败,请联系管理员");
@@ -0,0 +1,30 @@
package org.dromara.system.event;
import cn.hutool.core.collection.CollUtil;
import java.util.Collection;
import java.util.List;
/**
* 在线用户清理事件。
*
* @param roleId 角色 ID,存在时清理拥有该角色的在线用户
* @param userIds 用户 ID 集合,存在时清理指定在线用户
*/
public record OnlineUserCleanEvent(Long roleId, Collection<Long> userIds) {
public OnlineUserCleanEvent {
if (CollUtil.isNotEmpty(userIds)) {
userIds = List.copyOf(userIds);
}
}
public static OnlineUserCleanEvent byRole(Long roleId) {
return new OnlineUserCleanEvent(roleId, null);
}
public static OnlineUserCleanEvent byUsers(Collection<Long> userIds) {
return new OnlineUserCleanEvent(null, userIds);
}
}
@@ -0,0 +1,30 @@
package org.dromara.system.event;
/**
* OSS 配置变更事件。
*
* @param configKey 当前配置 key
* @param oldConfigKey 变更前配置 key
* @param configJson 当前配置 JSON,为空表示清理缓存
* @param defaultConfig 是否设置为默认配置
*/
public record OssConfigChangeEvent(
String configKey,
String oldConfigKey,
String configJson,
boolean defaultConfig
) {
public static OssConfigChangeEvent save(String configKey, String oldConfigKey, String configJson) {
return new OssConfigChangeEvent(configKey, oldConfigKey, configJson, false);
}
public static OssConfigChangeEvent remove(String configKey) {
return new OssConfigChangeEvent(configKey, null, null, false);
}
public static OssConfigChangeEvent useDefault(String configKey) {
return new OssConfigChangeEvent(configKey, null, null, true);
}
}
@@ -0,0 +1,39 @@
package org.dromara.system.listener;
import cn.hutool.core.collection.CollUtil;
import lombok.RequiredArgsConstructor;
import org.dromara.system.event.OnlineUserCleanEvent;
import org.dromara.system.service.ISysRoleService;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import org.springframework.transaction.event.TransactionPhase;
import org.springframework.transaction.event.TransactionalEventListener;
/**
* 在线用户清理监听器。
*
* @author Lion Li
*/
@Component
@RequiredArgsConstructor
public class OnlineUserCleanListener {
private final ISysRoleService roleService;
/**
* 权限或用户角色关系变化后清理受影响的在线用户。
*
* @param event 在线用户清理事件
*/
@Async
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT, fallbackExecution = true)
public void cleanOnlineUser(OnlineUserCleanEvent event) {
if (event.roleId() != null) {
roleService.cleanOnlineUserByRole(event.roleId());
}
if (CollUtil.isNotEmpty(event.userIds())) {
roleService.cleanOnlineUser(event.userIds());
}
}
}
@@ -0,0 +1,48 @@
package org.dromara.system.listener;
import org.dromara.common.core.constant.CacheNames;
import org.dromara.common.core.utils.StringUtils;
import org.dromara.common.oss.constant.OssConstant;
import org.dromara.common.oss.factory.OssFactory;
import org.dromara.common.redis.utils.CacheUtils;
import org.dromara.common.redis.utils.RedisUtils;
import org.dromara.system.event.OssConfigChangeEvent;
import org.springframework.stereotype.Component;
import org.springframework.transaction.event.TransactionPhase;
import org.springframework.transaction.event.TransactionalEventListener;
/**
* OSS 配置变更监听器。
*
* @author Lion Li
*/
@Component
public class OssConfigChangeListener {
/**
* 数据提交后同步刷新 OSS 配置缓存与客户端实例。
*
* @param event OSS 配置变更事件
*/
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT, fallbackExecution = true)
public void refreshOssConfig(OssConfigChangeEvent event) {
if (event.defaultConfig()) {
RedisUtils.setCacheObject(OssConstant.DEFAULT_CONFIG_KEY, event.configKey());
return;
}
if (StringUtils.isNotBlank(event.oldConfigKey()) && !StringUtils.equals(event.oldConfigKey(), event.configKey())) {
CacheUtils.evict(CacheNames.SYS_OSS_CONFIG, event.oldConfigKey());
OssFactory.remove(event.oldConfigKey());
}
if (StringUtils.isBlank(event.configKey())) {
return;
}
if (StringUtils.isBlank(event.configJson())) {
CacheUtils.evict(CacheNames.SYS_OSS_CONFIG, event.configKey());
} else {
CacheUtils.put(CacheNames.SYS_OSS_CONFIG, event.configKey(), event.configJson());
}
OssFactory.remove(event.configKey());
}
}
@@ -12,17 +12,18 @@ import org.dromara.common.core.domain.PageResult;
import org.dromara.common.core.exception.ServiceException;
import org.dromara.common.core.utils.MapstructUtils;
import org.dromara.common.core.utils.ObjectUtils;
import org.dromara.common.core.utils.SpringUtils;
import org.dromara.common.core.utils.StringUtils;
import org.dromara.common.json.utils.JsonUtils;
import org.dromara.common.mybatis.core.page.PageQuery;
import org.dromara.common.mybatis.core.query.QueryBuilder;
import org.dromara.common.oss.constant.OssConstant;
import org.dromara.common.oss.factory.OssFactory;
import org.dromara.common.redis.utils.CacheUtils;
import org.dromara.common.redis.utils.RedisUtils;
import org.dromara.system.domain.SysOssConfig;
import org.dromara.system.domain.bo.SysOssConfigBo;
import org.dromara.system.domain.vo.SysOssConfigVo;
import org.dromara.system.event.OssConfigChangeEvent;
import org.dromara.system.mapper.SysOssConfigMapper;
import org.dromara.system.service.ISysOssConfigService;
import org.springframework.stereotype.Service;
@@ -30,7 +31,6 @@ import org.springframework.transaction.annotation.Transactional;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
/**
* 对象存储配置Service业务层处理
@@ -117,7 +117,7 @@ public class SysOssConfigServiceImpl implements ISysOssConfigService {
if (flag) {
// 从数据库查询完整的数据做缓存
config = ossConfigMapper.selectById(config.getOssConfigId());
CacheUtils.put(CacheNames.SYS_OSS_CONFIG, config.getConfigKey(), JsonUtils.toJsonString(config));
publishOssConfigSaved(config, null);
}
return flag;
}
@@ -143,12 +143,7 @@ public class SysOssConfigServiceImpl implements ISysOssConfigService {
if (flag) {
// 从数据库查询完整的数据做缓存
config = ossConfigMapper.selectById(config.getOssConfigId());
if (ObjectUtil.isNotNull(oldConfig) && !Objects.equals(oldConfig.getConfigKey(), config.getConfigKey())) {
CacheUtils.evict(CacheNames.SYS_OSS_CONFIG, oldConfig.getConfigKey());
OssFactory.remove(oldConfig.getConfigKey());
}
CacheUtils.put(CacheNames.SYS_OSS_CONFIG, config.getConfigKey(), JsonUtils.toJsonString(config));
OssFactory.remove(config.getConfigKey());
publishOssConfigSaved(config, ObjectUtils.notNullGetter(oldConfig, SysOssConfig::getConfigKey));
}
return flag;
}
@@ -188,10 +183,8 @@ public class SysOssConfigServiceImpl implements ISysOssConfigService {
}
boolean flag = ossConfigMapper.deleteByIds(ids) > 0;
if (flag) {
list.forEach(sysOssConfig -> {
CacheUtils.evict(CacheNames.SYS_OSS_CONFIG, sysOssConfig.getConfigKey());
OssFactory.remove(sysOssConfig.getConfigKey());
});
list.forEach(sysOssConfig ->
SpringUtils.context().publishEvent(OssConfigChangeEvent.remove(sysOssConfig.getConfigKey())));
}
return flag;
}
@@ -227,9 +220,23 @@ public class SysOssConfigServiceImpl implements ISysOssConfigService {
int row = ossConfigMapper.lambda().set(SysOssConfig::getStatus, SystemConstants.NO).updateCount();
row += ossConfigMapper.updateById(sysOssConfig);
if (row > 0) {
RedisUtils.setCacheObject(OssConstant.DEFAULT_CONFIG_KEY, sysOssConfig.getConfigKey());
SpringUtils.context().publishEvent(OssConfigChangeEvent.useDefault(sysOssConfig.getConfigKey()));
}
return row;
}
/**
* 发布 OSS 配置保存事件。
*
* @param config 当前配置
* @param oldConfigKey 变更前配置 key
*/
private void publishOssConfigSaved(SysOssConfig config, String oldConfigKey) {
SpringUtils.context().publishEvent(OssConfigChangeEvent.save(
config.getConfigKey(),
oldConfigKey,
JsonUtils.toJsonString(config)
));
}
}
@@ -15,6 +15,7 @@ import org.dromara.common.core.utils.MapstructUtils;
import org.dromara.common.core.utils.SpringUtils;
import org.dromara.common.core.utils.StreamUtils;
import org.dromara.common.core.utils.StringUtils;
import org.dromara.common.core.utils.ThreadUtils;
import org.dromara.common.core.utils.file.FileUtils;
import org.dromara.common.json.utils.JsonUtils;
import org.dromara.common.mybatis.core.page.PageQuery;
@@ -44,10 +45,11 @@ import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
/**
* 文件上传 服务层实现
@@ -85,19 +87,21 @@ public class SysOssServiceImpl implements ISysOssService, OssService {
*/
@Override
public List<SysOssVo> listByIds(Collection<Long> ossIds) {
List<SysOssVo> list = new ArrayList<>();
SysOssServiceImpl ossService = SpringUtils.getAopProxy(this);
for (Long id : ossIds) {
List<Supplier<SysOssVo>> suppliers = ossIds.stream().map(id -> (Supplier<SysOssVo>) () -> {
SysOssVo vo = ossService.getById(id);
if (ObjectUtil.isNotNull(vo)) {
try {
list.add(this.matchingUrl(vo));
return this.matchingUrl(vo);
} catch (Exception ignored) {
// 如果oss异常无法连接则将数据直接返回
list.add(vo);
return vo;
}
}
}
return null;
}).toList();
List<SysOssVo> list = ThreadUtils.virtualSubmitAll(suppliers);
list.removeAll(Collections.singleton(null));
return list;
}
@@ -109,19 +113,22 @@ public class SysOssServiceImpl implements ISysOssService, OssService {
*/
@Override
public String selectUrlByIds(String ossIds) {
List<String> list = new ArrayList<>();
List<Long> ids = StringUtils.splitTo(ossIds, Convert::toLong);
SysOssServiceImpl ossService = SpringUtils.getAopProxy(this);
for (Long id : StringUtils.splitTo(ossIds, Convert::toLong)) {
List<Supplier<String>> suppliers = ids.stream().map(id -> (Supplier<String>) () -> {
SysOssVo vo = ossService.getById(id);
if (ObjectUtil.isNotNull(vo)) {
try {
list.add(this.matchingUrl(vo).getUrl());
return this.matchingUrl(vo).getUrl();
} catch (Exception ignored) {
// 如果oss异常无法连接则将数据直接返回
list.add(vo.getUrl());
return vo.getUrl();
}
}
}
return null;
}).toList();
List<String> list = ThreadUtils.virtualSubmitAll(suppliers);
list.removeAll(Collections.singleton(null));
return StringUtils.joinComma(list);
}
@@ -133,19 +140,23 @@ public class SysOssServiceImpl implements ISysOssService, OssService {
*/
@Override
public List<OssDTO> selectByIds(String ossIds) {
List<OssDTO> list = new ArrayList<>();
for (Long id : StringUtils.splitTo(ossIds, Convert::toLong)) {
SysOssVo vo = SpringUtils.getAopProxy(this).getById(id);
List<Long> ids = StringUtils.splitTo(ossIds, Convert::toLong);
var ossService = SpringUtils.getAopProxy(this);
List<Supplier<OssDTO>> suppliers = ids.stream().map(id -> (Supplier<OssDTO>) () -> {
SysOssVo vo = ossService.getById(id);
if (ObjectUtil.isNotNull(vo)) {
try {
vo.setUrl(this.matchingUrl(vo).getUrl());
list.add(BeanUtil.toBean(vo, OssDTO.class));
return BeanUtil.toBean(vo, OssDTO.class);
} catch (Exception ignored) {
// 如果oss异常无法连接则将数据直接返回
list.add(BeanUtil.toBean(vo, OssDTO.class));
return BeanUtil.toBean(vo, OssDTO.class);
}
}
}
return null;
}).toList();
List<OssDTO> list = ThreadUtils.virtualSubmitAll(suppliers);
list.removeAll(Collections.singleton(null));
return list;
}
@@ -13,6 +13,7 @@ import org.dromara.common.core.constant.SystemConstants;
import org.dromara.common.core.domain.PageResult;
import org.dromara.common.core.exception.ServiceException;
import org.dromara.common.core.utils.MapstructUtils;
import org.dromara.common.core.utils.SpringUtils;
import org.dromara.common.core.utils.StreamUtils;
import org.dromara.common.core.utils.StringUtils;
import org.dromara.common.mybatis.core.page.PageQuery;
@@ -26,6 +27,7 @@ import org.dromara.system.domain.SysRoleMenu;
import org.dromara.system.domain.SysUserRole;
import org.dromara.system.domain.bo.SysRoleBo;
import org.dromara.system.domain.vo.SysRoleVo;
import org.dromara.system.event.OnlineUserCleanEvent;
import org.dromara.system.mapper.SysRoleDeptMapper;
import org.dromara.system.mapper.SysRoleMapper;
import org.dromara.system.mapper.SysRoleMenuMapper;
@@ -464,7 +466,7 @@ public class SysRoleServiceImpl implements ISysRoleService, RoleService {
.eq(SysUserRole::getUserId, userRole.getUserId())
.deleteCount();
if (rows > 0) {
cleanOnlineUser(List.of(userRole.getUserId()));
SpringUtils.context().publishEvent(OnlineUserCleanEvent.byUsers(List.of(userRole.getUserId())));
}
return rows;
}
@@ -486,7 +488,7 @@ public class SysRoleServiceImpl implements ISysRoleService, RoleService {
.in(SysUserRole::getUserId, userIds)
.deleteCount();
if (rows > 0) {
cleanOnlineUser(userIds);
SpringUtils.context().publishEvent(OnlineUserCleanEvent.byUsers(userIds));
}
return rows;
}
@@ -515,7 +517,7 @@ public class SysRoleServiceImpl implements ISysRoleService, RoleService {
rows = userRoleMapper.insertBatch(list) ? list.size() : 0;
}
if (rows > 0) {
cleanOnlineUser(userIds);
SpringUtils.context().publishEvent(OnlineUserCleanEvent.byUsers(userIds));
}
return rows;
}
@@ -0,0 +1,23 @@
package org.dromara.workflow.event;
import cn.hutool.core.collection.CollUtil;
import org.dromara.warm.flow.core.entity.Task;
import org.dromara.workflow.domain.bo.FlowCopyBo;
import java.util.List;
/**
* 工作流抄送事件。
*
* @param task 当前任务
* @param flowCopyList 抄送人列表
*/
public record WorkflowCopyEvent(Task task, List<FlowCopyBo> flowCopyList) {
public WorkflowCopyEvent {
if (CollUtil.isNotEmpty(flowCopyList)) {
flowCopyList = List.copyOf(flowCopyList);
}
}
}
@@ -0,0 +1,23 @@
package org.dromara.workflow.event;
import cn.hutool.core.collection.CollUtil;
import java.util.List;
/**
* 工作流结果消息事件。
*
* @param flowName 流程名称
* @param status 流程状态
* @param createBy 发起人用户 ID
* @param messageType 消息类型
*/
public record WorkflowResultMessageEvent(String flowName, String status, String createBy, List<String> messageType) {
public WorkflowResultMessageEvent {
if (CollUtil.isNotEmpty(messageType)) {
messageType = List.copyOf(messageType);
}
}
}
@@ -0,0 +1,23 @@
package org.dromara.workflow.event;
import cn.hutool.core.collection.CollUtil;
import java.util.List;
/**
* 工作流待办消息事件。
*
* @param flowName 流程名称
* @param instanceId 流程实例 ID
* @param messageType 消息类型
* @param notice 通知内容
*/
public record WorkflowTaskMessageEvent(String flowName, Long instanceId, List<String> messageType, String notice) {
public WorkflowTaskMessageEvent {
if (CollUtil.isNotEmpty(messageType)) {
messageType = List.copyOf(messageType);
}
}
}
@@ -9,10 +9,10 @@ import cn.hutool.core.util.StrUtil;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.dromara.common.core.enums.BusinessStatusEnum;
import org.dromara.common.core.utils.SpringUtils;
import org.dromara.common.core.utils.StreamUtils;
import org.dromara.common.core.utils.StringUtils;
import org.dromara.system.api.UserService;
import org.dromara.system.api.domain.UserDTO;
import org.dromara.warm.flow.core.FlowEngine;
import org.dromara.warm.flow.core.dto.FlowParams;
import org.dromara.warm.flow.core.entity.Definition;
@@ -22,10 +22,12 @@ import org.dromara.warm.flow.core.listener.GlobalListener;
import org.dromara.warm.flow.core.listener.ListenerVariable;
import org.dromara.workflow.common.ConditionalOnEnable;
import org.dromara.workflow.common.constant.FlowConstant;
import org.dromara.workflow.common.enums.MessageTypeEnum;
import org.dromara.workflow.common.enums.TaskStatusEnum;
import org.dromara.workflow.domain.bo.FlowCopyBo;
import org.dromara.workflow.domain.vo.NodeExtVo;
import org.dromara.workflow.event.WorkflowCopyEvent;
import org.dromara.workflow.event.WorkflowResultMessageEvent;
import org.dromara.workflow.event.WorkflowTaskMessageEvent;
import org.dromara.workflow.handler.FlowProcessEventHandler;
import org.dromara.workflow.service.IFlwCommonService;
import org.dromara.workflow.service.IFlwInstanceService;
@@ -232,7 +234,7 @@ public class WorkflowGlobalListener implements GlobalListener {
List<FlowCopyBo> flowCopyList = MapUtil.get(variable, FlowConstant.FLOW_COPY_LIST, new TypeReference<>() {
});
// 添加抄送人
flwTaskService.setCopy(task, flowCopyList);
SpringUtils.context().publishEvent(new WorkflowCopyEvent(task, flowCopyList));
}
if (variable.containsKey(FlowConstant.MESSAGE_TYPE)) {
List<String> messageType = MapUtil.get(variable, FlowConstant.MESSAGE_TYPE, new TypeReference<>() {
@@ -240,7 +242,7 @@ public class WorkflowGlobalListener implements GlobalListener {
String notice = MapUtil.getStr(variable, FlowConstant.MESSAGE_NOTICE);
// 退回到申请人时只保留“已退回”结果消息,避免再追加一条“新的待办”形成重复提醒。
if (shouldSendTaskMessage(flowParams, definition, nextTasks)) {
flwCommonService.sendMessage(definition.getFlowName(), instance.getId(), messageType, notice);
SpringUtils.context().publishEvent(new WorkflowTaskMessageEvent(definition.getFlowName(), instance.getId(), messageType, notice));
}
}
FlowEngine.insService().removeVariables(instance.getId(),
@@ -270,25 +272,13 @@ public class WorkflowGlobalListener implements GlobalListener {
if (StringUtils.isBlank(instance.getCreateBy())) {
return;
}
BusinessStatusEnum statusEnum = BusinessStatusEnum.getByStatus(status);
if (statusEnum == null) {
return;
}
Long createBy = Convert.toLong(instance.getCreateBy(), null);
if (createBy == null) {
return;
}
UserDTO initiator = userService.selectById(createBy);
if (initiator == null || initiator.getUserId() == null) {
return;
}
// 已完成、已退回这类结果消息只发给发起人,不再混入处理人待办消息。
List<String> messageType = Collections.singletonList(MessageTypeEnum.SYSTEM_MESSAGE.getCode());
List<String> messageType = null;
if (MapUtil.isNotEmpty(variable) && variable.containsKey(FlowConstant.MESSAGE_TYPE)) {
messageType = MapUtil.get(variable, FlowConstant.MESSAGE_TYPE, new TypeReference<>() {
});
}
flwCommonService.sendResultMessage(definition.getFlowName(), statusEnum, messageType, Collections.singletonList(initiator));
SpringUtils.context().publishEvent(new WorkflowResultMessageEvent(definition.getFlowName(), status, instance.getCreateBy(), messageType));
}
/**
@@ -0,0 +1,85 @@
package org.dromara.workflow.listener;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.convert.Convert;
import lombok.RequiredArgsConstructor;
import org.dromara.common.core.enums.BusinessStatusEnum;
import org.dromara.common.core.utils.StringUtils;
import org.dromara.system.api.UserService;
import org.dromara.system.api.domain.UserDTO;
import org.dromara.workflow.common.ConditionalOnEnable;
import org.dromara.workflow.common.enums.MessageTypeEnum;
import org.dromara.workflow.event.WorkflowCopyEvent;
import org.dromara.workflow.event.WorkflowResultMessageEvent;
import org.dromara.workflow.event.WorkflowTaskMessageEvent;
import org.dromara.workflow.service.IFlwCommonService;
import org.dromara.workflow.service.IFlwTaskService;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;
import java.util.Collections;
import java.util.List;
/**
* 工作流副作用事件监听器。
*
* @author may
*/
@ConditionalOnEnable
@Component
@RequiredArgsConstructor
public class WorkflowSideEffectListener {
private final IFlwTaskService flwTaskService;
private final IFlwCommonService flwCommonService;
private final UserService userService;
/**
* 保存工作流抄送记录。
*
* @param event 工作流抄送事件
*/
@EventListener
public void handleCopy(WorkflowCopyEvent event) {
flwTaskService.setCopy(event.task(), event.flowCopyList());
}
/**
* 发送工作流待办消息。
*
* @param event 工作流待办消息事件
*/
@EventListener
public void handleTaskMessage(WorkflowTaskMessageEvent event) {
flwCommonService.sendMessage(event.flowName(), event.instanceId(), event.messageType(), event.notice());
}
/**
* 发送工作流结果消息。
*
* @param event 工作流结果消息事件
*/
@EventListener
public void handleResultMessage(WorkflowResultMessageEvent event) {
if (!StringUtils.equalsAny(event.status(), BusinessStatusEnum.FINISH.getStatus(), BusinessStatusEnum.BACK.getStatus())) {
return;
}
Long createBy = Convert.toLong(event.createBy(), null);
if (createBy == null) {
return;
}
BusinessStatusEnum status = BusinessStatusEnum.getByStatus(event.status());
if (status == null) {
return;
}
UserDTO initiator = userService.selectById(createBy);
if (initiator == null || initiator.getUserId() == null) {
return;
}
List<String> messageType = CollUtil.isNotEmpty(event.messageType())
? event.messageType()
: Collections.singletonList(MessageTypeEnum.SYSTEM_MESSAGE.getCode());
flwCommonService.sendResultMessage(event.flowName(), status, messageType, Collections.singletonList(initiator));
}
}
@@ -11,6 +11,7 @@ import org.dromara.common.core.exception.ServiceException;
import org.dromara.common.core.utils.SpringUtils;
import org.dromara.common.core.utils.StreamUtils;
import org.dromara.common.core.utils.StringUtils;
import org.dromara.common.core.utils.ThreadUtils;
import org.dromara.common.mail.core.MailBuilder;
import org.dromara.system.api.MessageService;
import org.dromara.system.api.domain.PushPayloadDTO;
@@ -107,23 +108,29 @@ public class FlwCommonServiceImpl implements IFlwCommonService {
Set<String> emails = StreamUtils.toSet(userList, UserDTO::getEmail);
emails.removeIf(StringUtils::isBlank);
for (String code : messageType) {
MessageTypeEnum messageTypeEnum = MessageTypeEnum.getByCode(code);
if (ObjectUtil.isEmpty(messageTypeEnum)) {
continue;
}
try {
switch (messageTypeEnum) {
case SYSTEM_MESSAGE -> {
// 站内消息直接携带前端路由,消息盒子点击后可按路径分流。
messageService.publishMessage(userIds, PushPayloadDTO.of(
PushTypeEnum.MESSAGE,
PushSourceEnum.WORKFLOW,
message, null, path
));
}
case EMAIL_MESSAGE -> MailBuilder.of().to(emails).subject(subject).text(message).send();
case SMS_MESSAGE -> {
Runnable[] sendTasks = messageType.stream()
.map(code -> (Runnable) () -> sendMessageByType(code, message, subject, path, userIds, emails, userList.size()))
.toArray(Runnable[]::new);
ThreadUtils.virtualInvokeAll(sendTasks);
}
private void sendMessageByType(String code, String message, String subject, String path, List<Long> userIds, Set<String> emails, int userCount) {
MessageTypeEnum messageTypeEnum = MessageTypeEnum.getByCode(code);
if (ObjectUtil.isEmpty(messageTypeEnum)) {
return;
}
try {
switch (messageTypeEnum) {
case SYSTEM_MESSAGE -> {
// 站内消息直接携带前端路由,消息盒子点击后可按路径分流。
messageService.publishMessage(userIds, PushPayloadDTO.of(
PushTypeEnum.MESSAGE,
PushSourceEnum.WORKFLOW,
message, null, path
));
}
case EMAIL_MESSAGE -> MailBuilder.of().to(emails).subject(subject).text(message).send();
case SMS_MESSAGE -> {
// LinkedHashMap<String, String> map = new LinkedHashMap<>(1);
// // 根据具体短信服务商参数用法传参
// map.put("code", "1234");
@@ -132,14 +139,13 @@ public class FlwCommonServiceImpl implements IFlwCommonService {
// // 指定获取一个短信服务商 configKey
// SmsBlend smsBlend = SmsFactory.getSmsBlend("config1");
// SmsResponse smsResponse = smsBlend.sendMessage(phones, templateId, map);
log.info("【短信发送 - TODO】用户数量={} 内容={}", userList.size(), message);
}
default -> log.warn("【消息发送】未处理的消息类型:{}", messageTypeEnum);
log.info("【短信发送 - TODO】用户数量={} 内容={}", userCount, message);
}
} catch (Exception ex) {
// 记录错误但不抛出,确保主逻辑不受影响
log.error("【消息发送失败】类型={},原因={}", messageTypeEnum, ex.getMessage(), ex);
default -> log.warn("【消息发送】未处理的消息类型:{}", messageTypeEnum);
}
} catch (Exception ex) {
// 记录错误但不抛出,确保主逻辑不受影响
log.error("【消息发送失败】类型={},原因={}", messageTypeEnum, ex.getMessage(), ex);
}
}
@@ -11,6 +11,7 @@ import lombok.extern.slf4j.Slf4j;
import org.dromara.common.core.utils.DateUtils;
import org.dromara.common.core.utils.StreamUtils;
import org.dromara.common.core.utils.StringUtils;
import org.dromara.common.core.utils.ThreadUtils;
import org.dromara.system.api.*;
import org.dromara.system.api.domain.DeptDTO;
import org.dromara.system.api.domain.TaskAssigneeDTO;
@@ -29,6 +30,7 @@ import org.dromara.workflow.service.IFlwTaskAssigneeService;
import org.springframework.stereotype.Service;
import java.util.*;
import java.util.function.Supplier;
import java.util.stream.Collectors;
/**
@@ -103,8 +105,7 @@ public class FlwTaskAssigneeServiceImpl implements IFlwTaskAssigneeService, Hand
}
// 查询所有类型对应的 ID 名称映射
Map<TaskAssigneeEnum, Map<String, String>> nameMap = new EnumMap<>(TaskAssigneeEnum.class);
typeIdMap.forEach((type, ids) -> nameMap.put(type, this.getNamesByType(type, ids)));
Map<TaskAssigneeEnum, Map<String, String>> nameMap = this.getNamesByTypes(typeIdMap);
// 组装返回结果,保持原始顺序
return parsedMap.entrySet().stream()
.map(entry -> {
@@ -213,12 +214,34 @@ public class FlwTaskAssigneeServiceImpl implements IFlwTaskAssigneeService, Hand
typeIdMap.computeIfAbsent(parsed.getKey(), k -> new ArrayList<>()).add(parsed.getValue());
}
}
return typeIdMap.entrySet().stream()
.flatMap(entry -> this.getUsersByType(entry.getKey(), entry.getValue()).stream())
return this.getUsersByTypes(typeIdMap).stream()
.distinct()
.toList();
}
private List<UserDTO> getUsersByTypes(Map<TaskAssigneeEnum, List<String>> typeIdMap) {
List<Supplier<List<UserDTO>>> suppliers = typeIdMap.entrySet().stream()
.map(entry -> (Supplier<List<UserDTO>>) () -> this.getUsersByType(entry.getKey(), entry.getValue()))
.toList();
return ThreadUtils.virtualSubmitAll(suppliers).stream()
.filter(CollUtil::isNotEmpty)
.flatMap(Collection::stream)
.toList();
}
private Map<TaskAssigneeEnum, Map<String, String>> getNamesByTypes(Map<TaskAssigneeEnum, List<String>> typeIdMap) {
List<TaskAssigneeEnum> types = new ArrayList<>(typeIdMap.keySet());
List<Supplier<Map<String, String>>> suppliers = types.stream()
.map(type -> (Supplier<Map<String, String>>) () -> this.getNamesByType(type, typeIdMap.get(type)))
.toList();
List<Map<String, String>> names = ThreadUtils.virtualSubmitAll(suppliers);
Map<TaskAssigneeEnum, Map<String, String>> nameMap = new EnumMap<>(TaskAssigneeEnum.class);
for (int i = 0; i < types.size(); i++) {
nameMap.put(types.get(i), names.get(i));
}
return nameMap;
}
/**
* 根据指定的任务分配类型(TaskAssigneeEnum)和 ID 列表,获取对应的用户信息列表
*