mirror of
https://github.com/dromara/RuoYi-Vue-Plus.git
synced 2026-01-10 17:25:58 +08:00
Compare commits
7 Commits
v5.5.2
...
bbc684b335
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
bbc684b335 | ||
|
|
2b8f4e1d2c | ||
|
|
d634c2a292 | ||
|
|
8b97e7bc53 | ||
|
|
874ad7c9b7 | ||
|
|
89d6f6f247 | ||
|
|
1324a1cb16 |
@@ -35,7 +35,6 @@ MaxKey 业界领先单点登录产品 - https://gitee.com/dromara/MaxKey <br>
|
||||
CCFlow 驰聘低代码-流程-表单 - https://gitee.com/opencc/RuoYi-JFlow <br>
|
||||
数舵科技 软件定制开发APP小程序等 - http://www.shuduokeji.com/ <br>
|
||||
引迈信息 软件开发平台 - https://www.jnpfsoft.com/index.html?from=plus-doc <br>
|
||||
<font color="red">**启山商城系统 多租户商城源码可免费商用可二次开发 - https://www.73app.cn/** </font><br>
|
||||
Mall4J 高质量Java商城系统 - https://www.mall4j.com/cn/?statId=11 <br>
|
||||
aizuda flowlong 工作流 - https://gitee.com/aizuda/flowlong <br>
|
||||
Ruoyi-Plus-Uniapp - https://ruoyi.plus <br>
|
||||
|
||||
2
pom.xml
2
pom.xml
@@ -38,7 +38,7 @@
|
||||
<bouncycastle.version>1.80</bouncycastle.version>
|
||||
<justauth.version>1.16.7</justauth.version>
|
||||
<!-- 离线IP地址定位库 -->
|
||||
<ip2region.version>3.3.1</ip2region.version>
|
||||
<ip2region.version>3.3.2</ip2region.version>
|
||||
<!-- OSS 配置 -->
|
||||
<aws.sdk.version>2.28.22</aws.sdk.version>
|
||||
<!-- SMS 配置 -->
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
package org.dromara.common.core.utils.ip;
|
||||
|
||||
import cn.hutool.core.io.FileUtil;
|
||||
import cn.hutool.core.io.resource.ResourceUtil;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.dromara.common.core.exception.ServiceException;
|
||||
@@ -9,7 +8,6 @@ import org.lionsoul.ip2region.service.Config;
|
||||
import org.lionsoul.ip2region.service.Ip2Region;
|
||||
import org.lionsoul.ip2region.xdb.Util;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.InputStream;
|
||||
import java.time.Duration;
|
||||
|
||||
@@ -31,6 +29,11 @@ public class RegionUtils {
|
||||
// 下载地址:https://gitee.com/lionsoul/ip2region/blob/master/data/ip2region_v6.xdb
|
||||
public static final String DEFAULT_IPV6_XDB_PATH = "ip2region_v6.xdb";
|
||||
|
||||
// 默认缓存切片大小为15MB(仅针对BufferCache全量读取有效,如果你的xdb数据库很大,合理设置该值可以有效提升BufferCache模式下的查询效率,具体可以查看Ip2Region的README)
|
||||
// 注意:设置过大的值可能会申请内存时,因内存不足而导致OOM,请合理设置该值。
|
||||
// README:https://gitee.com/lionsoul/ip2region/tree/master/binding/java
|
||||
public static final int DEFAULT_CACHE_SLICE_BYTES = 1024 * 1024 * 15;
|
||||
|
||||
// 未知地址
|
||||
public static final String UNKNOWN_ADDRESS = "未知";
|
||||
|
||||
@@ -43,20 +46,18 @@ public class RegionUtils {
|
||||
// 注意:Ip2Region 的xdb文件加载策略 CachePolicy 有三种,分别是:BufferCache(全量读取xdb到内存中)、VIndexCache(默认策略,按需读取并缓存)、NoCache(实时读取)
|
||||
// 本项目工具使用的 CachePolicy 为 BufferCache,BufferCache会加载整个xdb文件到内存中,setXdbInputStream 仅支持 BufferCache 策略。
|
||||
// 因为加载整个xdb文件会耗费非常大的内存,如果你不希望加载整个xdb到内存中,更推荐使用 VIndexCache 或 NoCache(即实时读取文件)策略和 setXdbPath/setXdbFile 加载方法(需要注意的一点,setXdbPath 和 setXdbFile 不支持读取ClassPath(即源码和resource目录)中的文件)。
|
||||
// 一般而言,更建议把xdb数据库放到一个指定的文件目录中(即不打包进jar包中),然后使用 NoCache + 配合SearcherPool的并发池读取数据,更方便随时更新xdb数据库
|
||||
// 一般而言,更建议把xdb数据库放到一个指定的文件目录中(即不打包进jar包中),然后使用 VIndexCache + 配合SearcherPool的并发池读取数据,更方便随时更新xdb数据库
|
||||
|
||||
// TODO 2025年12月23日 Ip2Region封装的 InputStream 读取函数 Searcher.loadContentFromInputStream 在Linux环境下会申请过大的byte[]空间而导致OOM,这里先用临时文件的方案解决,等后续 Ip2Region 更新解决方案
|
||||
// 创建临时文件
|
||||
File v4TempXdb = FileUtil.writeFromStream(ResourceUtil.getStream(DEFAULT_IPV4_XDB_PATH), FileUtil.createTempFile());
|
||||
InputStream v4InputStream = ResourceUtil.getStream(DEFAULT_IPV4_XDB_PATH);
|
||||
|
||||
// IPv4配置
|
||||
Config v4Config = Config.custom()
|
||||
.setCachePolicy(Config.BufferCache)
|
||||
.setXdbFile(v4TempXdb)
|
||||
// .setXdbInputStream(ResourceUtil.getStream(DEFAULT_IPV4_XDB_PATH))
|
||||
//.setXdbFile(v4TempXdb)
|
||||
.setXdbInputStream(v4InputStream)
|
||||
//
|
||||
.setCacheSliceBytes(DEFAULT_CACHE_SLICE_BYTES)
|
||||
.asV4();
|
||||
// 删除临时文件
|
||||
v4TempXdb.delete();
|
||||
|
||||
// IPv6配置
|
||||
Config v6Config = null;
|
||||
@@ -64,17 +65,12 @@ public class RegionUtils {
|
||||
if (v6XdbInputStream == null) {
|
||||
log.warn("未加载 IPv6 地址库:未在类路径下找到文件 {}。当前仅启用 IPv4 查询。如需启用 IPv6,请将 ip2region_v6.xdb 放置到 resources 目录", DEFAULT_IPV6_XDB_PATH);
|
||||
} else {
|
||||
// 创建临时文件
|
||||
File v6TempXdb = FileUtil.writeFromStream(ResourceUtil.getStream(DEFAULT_IPV4_XDB_PATH), FileUtil.createTempFile());
|
||||
|
||||
v6Config = Config.custom()
|
||||
.setCachePolicy(Config.BufferCache)
|
||||
.setXdbFile(v6TempXdb)
|
||||
// .setXdbInputStream(v6XdbInputStream)
|
||||
//.setXdbFile(v6TempXdb)
|
||||
.setXdbInputStream(v6XdbInputStream)
|
||||
.setCacheSliceBytes(DEFAULT_CACHE_SLICE_BYTES)
|
||||
.asV6();
|
||||
|
||||
// 删除临时文件
|
||||
v6TempXdb.delete();
|
||||
}
|
||||
|
||||
// 初始化Ip2Region实例
|
||||
|
||||
@@ -141,7 +141,8 @@ public class OssClient {
|
||||
try {
|
||||
// 构建上传请求对象
|
||||
FileUpload fileUpload = transferManager.uploadFile(
|
||||
x -> x.putObjectRequest(
|
||||
x -> {
|
||||
x.source(filePath).putObjectRequest(
|
||||
y -> y.bucket(properties.getBucketName())
|
||||
.key(key)
|
||||
.contentMD5(StringUtils.isNotEmpty(md5Digest) ? md5Digest : null)
|
||||
@@ -149,10 +150,13 @@ public class OssClient {
|
||||
// 用于设置对象的访问控制列表(ACL)。不同云厂商对ACL的支持和实现方式有所不同,
|
||||
// 因此根据具体的云服务提供商,你可能需要进行不同的配置(自行开启,阿里云有acl权限配置,腾讯云没有acl权限配置)
|
||||
//.acl(getAccessPolicy().getObjectCannedACL())
|
||||
.build())
|
||||
.addTransferListener(LoggingTransferListener.create())
|
||||
.source(filePath).build());
|
||||
|
||||
.build()
|
||||
);
|
||||
if (log.isDebugEnabled()) {
|
||||
x.addTransferListener(LoggingTransferListener.create());
|
||||
}
|
||||
}
|
||||
);
|
||||
// 等待上传完成并获取上传结果
|
||||
CompletedFileUpload uploadResult = fileUpload.completionFuture().join();
|
||||
String eTag = uploadResult.response().eTag();
|
||||
@@ -192,16 +196,21 @@ public class OssClient {
|
||||
|
||||
// 使用 transferManager 进行上传
|
||||
Upload upload = transferManager.upload(
|
||||
x -> x.requestBody(body).addTransferListener(LoggingTransferListener.create())
|
||||
.putObjectRequest(
|
||||
x -> {
|
||||
x.requestBody(body).putObjectRequest(
|
||||
y -> y.bucket(properties.getBucketName())
|
||||
.key(key)
|
||||
.contentType(contentType)
|
||||
// 用于设置对象的访问控制列表(ACL)。不同云厂商对ACL的支持和实现方式有所不同,
|
||||
// 因此根据具体的云服务提供商,你可能需要进行不同的配置(自行开启,阿里云有acl权限配置,腾讯云没有acl权限配置)
|
||||
//.acl(getAccessPolicy().getObjectCannedACL())
|
||||
.build())
|
||||
.build());
|
||||
.build()
|
||||
);
|
||||
if (log.isDebugEnabled()) {
|
||||
x.addTransferListener(LoggingTransferListener.create());
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
// 将输入流写入请求体
|
||||
body.writeInputStream(inputStream);
|
||||
@@ -229,13 +238,17 @@ public class OssClient {
|
||||
Path tempFilePath = FileUtils.createTempFile().toPath();
|
||||
// 使用 S3TransferManager 下载文件
|
||||
FileDownload downloadFile = transferManager.downloadFile(
|
||||
x -> x.getObjectRequest(
|
||||
x -> {
|
||||
x.destination(tempFilePath).getObjectRequest(
|
||||
y -> y.bucket(properties.getBucketName())
|
||||
.key(removeBaseUrl(path))
|
||||
.build())
|
||||
.addTransferListener(LoggingTransferListener.create())
|
||||
.destination(tempFilePath)
|
||||
.build());
|
||||
.build()
|
||||
);
|
||||
if (log.isDebugEnabled()) {
|
||||
x.addTransferListener(LoggingTransferListener.create());
|
||||
}
|
||||
}
|
||||
);
|
||||
// 等待文件下载操作完成
|
||||
downloadFile.completionFuture().join();
|
||||
return tempFilePath;
|
||||
@@ -244,8 +257,8 @@ public class OssClient {
|
||||
/**
|
||||
* 下载文件从 Amazon S3 到 输出流
|
||||
*
|
||||
* @param key 文件在 Amazon S3 中的对象键
|
||||
* @param out 输出流
|
||||
* @param key 文件在 Amazon S3 中的对象键
|
||||
* @param out 输出流
|
||||
* @param consumer 自定义处理逻辑
|
||||
* @throws OssException 如果下载失败,抛出自定义异常
|
||||
*/
|
||||
@@ -260,26 +273,24 @@ public class OssClient {
|
||||
/**
|
||||
* 下载文件从 Amazon S3 到 输出流
|
||||
*
|
||||
* @param key 文件在 Amazon S3 中的对象键
|
||||
* @param key 文件在 Amazon S3 中的对象键
|
||||
* @param contentLengthConsumer 文件大小消费者函数
|
||||
* @return 写出订阅器
|
||||
* @throws OssException 如果下载失败,抛出自定义异常
|
||||
*/
|
||||
public WriteOutSubscriber<OutputStream> download(String key, Consumer<Long> contentLengthConsumer) {
|
||||
try {
|
||||
// 构建下载请求
|
||||
DownloadRequest<ResponsePublisher<GetObjectResponse>> publisherDownloadRequest = DownloadRequest.builder()
|
||||
// 文件对象
|
||||
.getObjectRequest(y -> y.bucket(properties.getBucketName())
|
||||
.key(key)
|
||||
.build())
|
||||
.addTransferListener(LoggingTransferListener.create())
|
||||
DownloadRequest.TypedBuilder<ResponsePublisher<GetObjectResponse>> typedBuilder = DownloadRequest.builder()
|
||||
// 使用发布订阅转换器
|
||||
.responseTransformer(AsyncResponseTransformer.toPublisher())
|
||||
.build();
|
||||
// 文件对象
|
||||
.getObjectRequest(y -> y.bucket(properties.getBucketName()).key(key).build());
|
||||
if (log.isDebugEnabled()) {
|
||||
typedBuilder.addTransferListener(LoggingTransferListener.create());
|
||||
}
|
||||
|
||||
// 使用 S3TransferManager 下载文件
|
||||
Download<ResponsePublisher<GetObjectResponse>> publisherDownload = transferManager.download(publisherDownloadRequest);
|
||||
Download<ResponsePublisher<GetObjectResponse>> publisherDownload = transferManager.download(typedBuilder.build());
|
||||
// 获取下载发布订阅转换器
|
||||
ResponsePublisher<GetObjectResponse> publisher = publisherDownload.completionFuture().join().result();
|
||||
// 执行文件大小消费者函数
|
||||
@@ -289,7 +300,7 @@ public class OssClient {
|
||||
// 构建写出订阅器对象
|
||||
return out -> {
|
||||
// 创建可写入的字节通道
|
||||
try(WritableByteChannel channel = Channels.newChannel(out)){
|
||||
try (WritableByteChannel channel = Channels.newChannel(out)) {
|
||||
// 订阅数据
|
||||
publisher.subscribe(byteBuffer -> {
|
||||
while (byteBuffer.hasRemaining()) {
|
||||
@@ -347,7 +358,7 @@ public class OssClient {
|
||||
*
|
||||
* @param objectKey 对象KEY
|
||||
* @param expiredTime 链接授权到期时间
|
||||
* @param metadata 元数据
|
||||
* @param metadata 元数据
|
||||
*/
|
||||
public String createPresignedPutUrl(String objectKey, Duration expiredTime, Map<String, String> metadata) {
|
||||
// 使用 AWS S3 预签名 URL 的生成器 获取上传文件对象的预签名 URL
|
||||
|
||||
@@ -14,6 +14,7 @@ import org.dromara.common.core.exception.SseException;
|
||||
import org.dromara.common.core.exception.base.BaseException;
|
||||
import org.dromara.common.core.utils.StreamUtils;
|
||||
import org.dromara.common.json.utils.JsonUtils;
|
||||
import org.springframework.context.MessageSourceResolvable;
|
||||
import org.springframework.context.support.DefaultMessageSourceResolvable;
|
||||
import org.springframework.expression.ExpressionException;
|
||||
import org.springframework.http.converter.HttpMessageNotReadableException;
|
||||
@@ -25,6 +26,7 @@ import org.springframework.web.bind.annotation.ExceptionHandler;
|
||||
import org.springframework.web.bind.annotation.ResponseStatus;
|
||||
import org.springframework.web.bind.annotation.RestControllerAdvice;
|
||||
import org.springframework.web.context.request.async.AsyncRequestTimeoutException;
|
||||
import org.springframework.web.method.annotation.HandlerMethodValidationException;
|
||||
import org.springframework.web.method.annotation.MethodArgumentTypeMismatchException;
|
||||
import org.springframework.web.servlet.NoHandlerFoundException;
|
||||
|
||||
@@ -191,6 +193,16 @@ public class GlobalExceptionHandler {
|
||||
return R.fail(message);
|
||||
}
|
||||
|
||||
/**
|
||||
* 方法参数校验异常 用于处理 @Validated 注解
|
||||
*/
|
||||
@ExceptionHandler(HandlerMethodValidationException.class)
|
||||
public R<Void> handlerMethodValidationException(HandlerMethodValidationException e) {
|
||||
log.error(e.getMessage());
|
||||
String message = StreamUtils.join(e.getAllErrors(), MessageSourceResolvable::getDefaultMessage, ", ");
|
||||
return R.fail(message);
|
||||
}
|
||||
|
||||
/**
|
||||
* JSON 解析异常(Jackson 在处理 JSON 格式出错时抛出)
|
||||
* 可能是请求体格式非法,也可能是服务端反序列化失败
|
||||
|
||||
@@ -8,6 +8,7 @@ import org.dromara.common.websocket.holder.WebSocketSessionHolder;
|
||||
import org.dromara.common.websocket.utils.WebSocketUtils;
|
||||
import org.springframework.web.socket.*;
|
||||
import org.springframework.web.socket.handler.AbstractWebSocketHandler;
|
||||
import org.springframework.web.socket.handler.ConcurrentWebSocketSessionDecorator;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
@@ -33,7 +34,7 @@ public class PlusWebSocketHandler extends AbstractWebSocketHandler {
|
||||
log.info("[connect] invalid token received. sessionId: {}", session.getId());
|
||||
return;
|
||||
}
|
||||
WebSocketSessionHolder.addSession(loginUser.getUserId(), session);
|
||||
WebSocketSessionHolder.addSession(loginUser.getUserId(), new ConcurrentWebSocketSessionDecorator(session, 10 * 1000, 64000));
|
||||
log.info("[connect] sessionId: {},userId:{},userType:{}", session.getId(), loginUser.getUserId(), loginUser.getUserType());
|
||||
}
|
||||
|
||||
|
||||
@@ -113,7 +113,7 @@ public class WebSocketUtils {
|
||||
* @param session WebSocket会话
|
||||
* @param message 要发送的WebSocket消息对象
|
||||
*/
|
||||
private synchronized static void sendMessage(WebSocketSession session, WebSocketMessage<?> message) {
|
||||
private static void sendMessage(WebSocketSession session, WebSocketMessage<?> message) {
|
||||
if (session == null || !session.isOpen()) {
|
||||
log.warn("[send] session会话已经关闭");
|
||||
} else {
|
||||
|
||||
@@ -1,146 +0,0 @@
|
||||
package com.aizuda.snailjob.server.common.register;
|
||||
|
||||
import cn.hutool.core.collection.CollUtil;
|
||||
import cn.hutool.core.util.IdUtil;
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import com.aizuda.snailjob.common.core.enums.NodeTypeEnum;
|
||||
import com.aizuda.snailjob.common.core.util.JsonUtil;
|
||||
import com.aizuda.snailjob.common.core.util.NetUtil;
|
||||
import com.aizuda.snailjob.common.core.util.SnailJobVersion;
|
||||
import com.aizuda.snailjob.common.core.util.StreamUtils;
|
||||
import com.aizuda.snailjob.common.log.SnailJobLog;
|
||||
import com.aizuda.snailjob.server.common.cache.CacheConsumerGroup;
|
||||
import com.aizuda.snailjob.server.common.config.SystemProperties;
|
||||
import com.aizuda.snailjob.server.common.convert.RegisterNodeInfoConverter;
|
||||
import com.aizuda.snailjob.server.common.dto.ServerNodeExtAttrs;
|
||||
import com.aizuda.snailjob.server.common.handler.InstanceManager;
|
||||
import com.aizuda.snailjob.template.datasource.persistence.po.ServerNode;
|
||||
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
||||
import com.google.common.collect.Lists;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import org.springframework.boot.autoconfigure.web.ServerProperties;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* 服务端注册
|
||||
*
|
||||
* @author opensnail
|
||||
* @date 2023-06-07
|
||||
* @since 1.6.0
|
||||
*/
|
||||
@Component(ServerRegister.BEAN_NAME)
|
||||
@RequiredArgsConstructor
|
||||
public class ServerRegister extends AbstractRegister {
|
||||
public static final String BEAN_NAME = "serverRegister";
|
||||
private final ScheduledExecutorService serverRegisterNode = Executors.newSingleThreadScheduledExecutor(r -> new Thread(r, "server-register-node"));
|
||||
public static final int DELAY_TIME = 30;
|
||||
public static final String CURRENT_CID;
|
||||
public static final String GROUP_NAME = "DEFAULT_SERVER";
|
||||
public static final String NAMESPACE_ID = "DEFAULT_SERVER_NAMESPACE_ID";
|
||||
private final InstanceManager instanceManager;
|
||||
private final SystemProperties systemProperties;
|
||||
private final ServerProperties serverProperties;
|
||||
|
||||
static {
|
||||
CURRENT_CID = IdUtil.getSnowflakeNextIdStr();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean supports(int type) {
|
||||
return getNodeType().equals(type);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void beforeProcessor(RegisterContext context) {
|
||||
// 新增扩展参数
|
||||
ServerNodeExtAttrs serverNodeExtAttrs = new ServerNodeExtAttrs();
|
||||
serverNodeExtAttrs.setWebPort(serverProperties.getPort());
|
||||
serverNodeExtAttrs.setSystemVersion(SnailJobVersion.getVersion());
|
||||
|
||||
context.setGroupName(GROUP_NAME);
|
||||
context.setHostId(CURRENT_CID);
|
||||
String serverHost = systemProperties.getServerHost();
|
||||
if (StrUtil.isEmptyIfStr(serverHost)) {
|
||||
serverHost = NetUtil.getLocalIpStr();
|
||||
}
|
||||
context.setHostIp(serverHost);
|
||||
context.setHostPort(systemProperties.getServerPort());
|
||||
context.setContextPath(Optional.ofNullable(serverProperties.getServlet().getContextPath()).orElse(StrUtil.EMPTY));
|
||||
context.setNamespaceId(NAMESPACE_ID);
|
||||
context.setExtAttrs(JsonUtil.toJsonString(serverNodeExtAttrs));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected LocalDateTime getExpireAt() {
|
||||
return LocalDateTime.now().plusSeconds(DELAY_TIME);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean doRegister(RegisterContext context, ServerNode serverNode) {
|
||||
refreshExpireAt(Lists.newArrayList(serverNode));
|
||||
return Boolean.TRUE;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
protected void afterProcessor(final ServerNode serverNode) {
|
||||
try {
|
||||
// 同步当前POD消费的组的节点信息
|
||||
// netty的client只会注册到一个服务端,若组分配的和client连接的不是一个POD则会导致当前POD没有其他客户端的注册信息
|
||||
ConcurrentMap<String /*groupName*/, Set<String>/*namespaceId*/> allConsumerGroupName = CacheConsumerGroup.getAllConsumerGroupName();
|
||||
if (CollUtil.isNotEmpty(allConsumerGroupName)) {
|
||||
Set<String> namespaceIdSets = StreamUtils.toSetByFlatMap(allConsumerGroupName.values(), Set::stream);
|
||||
if (CollUtil.isEmpty(namespaceIdSets)) {
|
||||
return;
|
||||
}
|
||||
|
||||
List<ServerNode> serverNodes = serverNodeMapper.selectList(
|
||||
new LambdaQueryWrapper<ServerNode>()
|
||||
.eq(ServerNode::getNodeType, NodeTypeEnum.CLIENT.getType())
|
||||
.in(ServerNode::getNamespaceId, namespaceIdSets)
|
||||
.in(ServerNode::getGroupName, allConsumerGroupName.keySet()));
|
||||
for (final ServerNode node : serverNodes) {
|
||||
// 刷新全量本地缓存
|
||||
instanceManager.registerOrUpdate(RegisterNodeInfoConverter.INSTANCE.toRegisterNodeInfo(node));
|
||||
// 刷新过期时间
|
||||
CacheConsumerGroup.addOrUpdate(node.getGroupName(), node.getNamespaceId());
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
SnailJobLog.LOCAL.error("Client refresh failed", e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Integer getNodeType() {
|
||||
return NodeTypeEnum.SERVER.getType();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void start() {
|
||||
SnailJobLog.LOCAL.info("ServerRegister start");
|
||||
|
||||
serverRegisterNode.scheduleAtFixedRate(() -> {
|
||||
try {
|
||||
this.register(new RegisterContext());
|
||||
} catch (Exception e) {
|
||||
SnailJobLog.LOCAL.error("Server-side registration failed", e);
|
||||
}
|
||||
}, 0, DELAY_TIME * 2 / 3, TimeUnit.SECONDS);
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
SnailJobLog.LOCAL.info("ServerRegister close");
|
||||
}
|
||||
}
|
||||
@@ -74,7 +74,7 @@ public class WorkflowGlobalListener implements GlobalListener {
|
||||
String ext = listenerVariable.getNode().getExt();
|
||||
if (StringUtils.isNotBlank(ext)) {
|
||||
Map<String, Object> variable = listenerVariable.getVariable();
|
||||
if (CollUtil.isNotEmpty(variable)) {
|
||||
if (CollUtil.isEmpty(variable)) {
|
||||
variable = new HashMap<>();
|
||||
}
|
||||
NodeExtVo nodeExt = nodeExtService.parseNodeExt(ext, variable);
|
||||
|
||||
Reference in New Issue
Block a user