refactor(common-oss): 优化 OSS 客户端生命周期与配置构建

- 下载 InputStream 回调后自动关闭响应流,避免底层连接释放不明确
  - 统一解包异步调用异常,保留真实 S3 错误原因
  - OssFactory 改为按 configKey 粒度加锁,降低多配置初始化阻塞
  - OSS 配置更新和删除后主动移除本地旧客户端,释放相关资源
  - 收敛 OSS region、ACL、路径风格和访问 URL 构建逻辑
  - Options 新增 create 方法,保留 builder 兼容旧调用
This commit is contained in:
疯狂的狮子Li
2026-05-16 14:56:07 +08:00
parent ce764f08a7
commit 287cce6238
5 changed files with 123 additions and 94 deletions

View File

@@ -37,7 +37,9 @@ import java.util.Collection;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiFunction;
import java.util.function.Consumer;
@@ -119,10 +121,7 @@ public abstract class AbstractOssClientImpl implements OssClient {
// 将状态转为已初始化
initialized.compareAndSet(false, true);
} catch (Exception e) {
if (e instanceof S3StorageException) {
throw e;
}
throw S3StorageException.form(e);
throw toStorageException(e);
}
}
@@ -168,10 +167,7 @@ public abstract class AbstractOssClientImpl implements OssClient {
.handleAsync(handleAsyncAction)
.join();
} catch (Exception e) {
if (e instanceof S3StorageException ex) {
throw ex;
}
throw S3StorageException.form(e);
throw toStorageException(e);
}
}
@@ -229,10 +225,7 @@ public abstract class AbstractOssClientImpl implements OssClient {
options.setLength(file.length());
return bucketUpload(bucket, key, file.getChannel(), -1L, options);
} catch (Exception e) {
if (e instanceof S3StorageException ex) {
throw ex;
}
throw S3StorageException.form(e);
throw toStorageException(e);
}
}
@@ -253,10 +246,7 @@ public abstract class AbstractOssClientImpl implements OssClient {
}
return bucketUpload(bucket, key, in, size, options);
} catch (Exception e) {
if (e instanceof S3StorageException ex) {
throw ex;
}
throw S3StorageException.form(e);
throw toStorageException(e);
}
}
@@ -282,10 +272,7 @@ public abstract class AbstractOssClientImpl implements OssClient {
try (ByteArrayInputStream in = new ByteArrayInputStream(data)) {
return bucketUpload(bucket, key, in, data.length, options);
} catch (Exception e) {
if (e instanceof S3StorageException ex) {
throw ex;
}
throw S3StorageException.form(e);
throw toStorageException(e);
}
}
@@ -341,10 +328,7 @@ public abstract class AbstractOssClientImpl implements OssClient {
.join()
.result();
} catch (Exception e) {
if (e instanceof S3StorageException ex) {
throw ex;
}
throw S3StorageException.form(e);
throw toStorageException(e);
}
}
@@ -356,25 +340,18 @@ public abstract class AbstractOssClientImpl implements OssClient {
publisher.subscribe(downloadSubscriber).join();
return getObjectResult;
} catch (Exception e) {
if (e instanceof S3StorageException ex) {
throw ex;
}
throw S3StorageException.form(e);
throw toStorageException(e);
}
}
@Override
public <T> T bucketDownload(String bucket, String key, BiFunction<GetObjectResult, InputStream, T> downloadTransformer) {
try {
ResponseInputStream<GetObjectResponse> responseInputStream = doCustomDownload(builder -> builder.bucket(bucket).key(key), AsyncResponseTransformer.toBlockingInputStream(), null);
try (ResponseInputStream<GetObjectResponse> responseInputStream = doCustomDownload(builder -> builder.bucket(bucket).key(key), AsyncResponseTransformer.toBlockingInputStream(), null)) {
GetObjectResponse response = responseInputStream.response();
GetObjectResult getObjectResult = buildGetObjectResult(key, response);
return downloadTransformer.apply(getObjectResult, responseInputStream);
} catch (Exception e) {
if (e instanceof S3StorageException ex) {
throw ex;
}
throw S3StorageException.form(e);
throw toStorageException(e);
}
}
@@ -383,10 +360,7 @@ public abstract class AbstractOssClientImpl implements OssClient {
try (OutputStream out = Files.newOutputStream(path)) {
return bucketDownload(bucket, key, out);
} catch (Exception e) {
if (e instanceof S3StorageException ex) {
throw ex;
}
throw S3StorageException.form(e);
throw toStorageException(e);
}
}
@@ -395,10 +369,7 @@ public abstract class AbstractOssClientImpl implements OssClient {
try (FileOutputStream out = new FileOutputStream(file)) {
return bucketDownload(bucket, key, out);
} catch (Exception e) {
if (e instanceof S3StorageException ex) {
throw ex;
}
throw S3StorageException.form(e);
throw toStorageException(e);
}
}
@@ -438,7 +409,7 @@ public abstract class AbstractOssClientImpl implements OssClient {
s3AsyncClient.deleteObject(builder -> builder.bucket(bucket).key(key)).join();
return true;
} catch (Exception e) {
throw S3StorageException.form(e);
throw toStorageException(e);
}
}
@@ -452,7 +423,7 @@ public abstract class AbstractOssClientImpl implements OssClient {
.url()
.toExternalForm();
} catch (Exception e) {
throw S3StorageException.form(e);
throw toStorageException(e);
}
}
@@ -466,7 +437,7 @@ public abstract class AbstractOssClientImpl implements OssClient {
.url()
.toExternalForm();
} catch (Exception e) {
throw S3StorageException.form(e);
throw toStorageException(e);
}
}
@@ -623,6 +594,22 @@ public abstract class AbstractOssClientImpl implements OssClient {
return fileName.substring(index);
}
private S3StorageException toStorageException(Throwable e) {
Throwable cause = unwrapAsyncException(e);
if (cause instanceof S3StorageException ex) {
return ex;
}
return S3StorageException.form(cause);
}
private Throwable unwrapAsyncException(Throwable e) {
Throwable cause = e;
while ((cause instanceof CompletionException || cause instanceof ExecutionException) && cause.getCause() != null) {
cause = cause.getCause();
}
return cause;
}
@Override
public void close() throws Exception {
if (s3TransferManager != null) {

View File

@@ -166,38 +166,17 @@ public class OssClientConfig implements Config<OssClientConfig, OssClientConfig.
* @return 客户端配置构造器
*/
public static OssClientConfigBuilder formPropertiesBuilder(OssProperties properties) {
String regionString = properties.getRegion();
Region region = Region.US_EAST_1;
if (StringUtils.isNotBlank(regionString)) {
region = Region.of(regionString);
}
// 是否使用路径风格应当由使用者明确去配置,此处的配置只是为了适配旧的配置项
// MinIO 使用 HTTPS 限制使用域名访问,站点填域名。需要启用路径样式访问
boolean usePathStyleAccess = !StringUtils.containsAny(properties.getEndpoint(), OssConstant.CLOUD_SERVICE);
// 绝大多数的云厂商都是不允许操作ACL的所以此处的默认配置也是禁用ACL的
AccessControlPolicyConfig accessControlPolicyConfig = AccessControlPolicyConfig.DEFAULT;
// 目前自定义实现的 Client 上传/下载/删除中并没有实际使用到ACL相关配置
// 仅有业务中的链接预签名使用到SysOssServiceImpl#matchingUrl更多只是作为一个扩展点保留如有需要ACL的自行实现调用逻辑
String accessPolicyString = properties.getAccessPolicy();
if (StringUtils.isNotBlank(accessPolicyString)) {
accessControlPolicyConfig = AccessControlPolicyConfig.builder()
.enabled(true)
.accessPolicy(AccessPolicy.formType(accessPolicyString))
.build();
}
return builder()
.endpoint(properties.getEndpoint())
.domain(properties.getDomainUrl())
.accessKey(properties.getAccessKey())
.secretKey(properties.getSecretKey())
.bucket(properties.getBucketName())
.region(region)
.region(parseRegion(properties.getRegion()))
.prefix(properties.getPrefix())
.useHttps(SystemConstants.YES.equals(properties.getIsHttps()))
.usePathStyleAccess(usePathStyleAccess)
.accessControlPolicyConfig(accessControlPolicyConfig);
.usePathStyleAccess(resolvePathStyleAccess(properties))
.accessControlPolicyConfig(resolveAccessControlPolicy(properties.getAccessPolicy()));
}
/**
@@ -206,10 +185,7 @@ public class OssClientConfig implements Config<OssClientConfig, OssClientConfig.
* @return 访问站点URL地址
*/
public String getEndpointUrl() {
String endpoint = endpoint()
.filter(s -> !s.isBlank())
.orElseThrow(() -> S3StorageException.form("endpoint is not configured."));
return BucketUrlUtil.rebuildUrlHeader(useHttps, endpoint);
return BucketUrlUtil.rebuildUrlHeader(useHttps, getEndpoint());
}
/**
@@ -221,7 +197,7 @@ public class OssClientConfig implements Config<OssClientConfig, OssClientConfig.
return domain()
// 如果已经配置了自定义域名,则优先使用域名
// 检查携带协议头
.filter(s -> HttpUtil.isHttp(s) || HttpUtil.isHttps(s))
.filter(OssClientConfig::hasHttpHeader)
// 否则使用站点
.orElseGet(this::getEndpointUrl);
}
@@ -245,20 +221,50 @@ public class OssClientConfig implements Config<OssClientConfig, OssClientConfig.
* @return 桶URL地址
*/
public String getBucketUrl(String bucket) {
// 如果已经配置了自定义域名,则优先使用域名
String url = domain()
// 检查携带协议头
.filter(s -> HttpUtil.isHttp(s) || HttpUtil.isHttps(s))
// 否则使用站点
.orElseGet(() ->
endpoint()
.filter(s -> !s.isBlank())
.orElseThrow(() -> S3StorageException.form("endpoint is not configured."))
);
String url = getAccessBaseUrl();
// 根据是否使用路径风格配置项决定存储桶的URL风格
return usePathStyleAccess ? BucketUrlUtil.getPathStyleBucketUrl(useHttps, url, bucket) : BucketUrlUtil.getSiteStyleBucketUrl(useHttps, url, bucket);
}
private static Region parseRegion(String regionString) {
if (StringUtils.isBlank(regionString)) {
return Region.US_EAST_1;
}
return Region.of(regionString);
}
private static boolean resolvePathStyleAccess(OssProperties properties) {
// 旧配置没有显式路径风格字段,只能继续按内置云厂商 endpoint 做兼容推断。
return !StringUtils.containsAny(properties.getEndpoint(), OssConstant.CLOUD_SERVICE);
}
private static AccessControlPolicyConfig resolveAccessControlPolicy(String accessPolicyString) {
// 绝大多数云厂商不允许操作 ACL默认禁用当前业务只用访问策略判断是否生成预签名 URL。
if (StringUtils.isBlank(accessPolicyString)) {
return AccessControlPolicyConfig.DEFAULT;
}
return AccessControlPolicyConfig.builder()
.enabled(true)
.accessPolicy(AccessPolicy.formType(accessPolicyString))
.build();
}
private String getAccessBaseUrl() {
return domain()
.filter(OssClientConfig::hasHttpHeader)
.orElseGet(this::getEndpoint);
}
private String getEndpoint() {
return endpoint()
.filter(s -> !s.isBlank())
.orElseThrow(() -> S3StorageException.form("endpoint is not configured."));
}
private static boolean hasHttpHeader(String url) {
return HttpUtil.isHttp(url) || HttpUtil.isHttps(url);
}
/**
* ACL访问策略配置
*/

View File

@@ -26,7 +26,7 @@ import java.util.concurrent.locks.ReentrantLock;
public class OssFactory {
private static final Map<String, OssClient> CLIENT_CACHE = new ConcurrentHashMap<>();
private static final ReentrantLock LOCK = new ReentrantLock();
private static final Map<String, ReentrantLock> CLIENT_LOCKS = new ConcurrentHashMap<>();
/**
* 获取默认实例
@@ -44,13 +44,17 @@ public class OssFactory {
* 根据类型获取实例
*/
public static OssClient instance(String configKey) {
if (StringUtils.isBlank(configKey)) {
throw S3StorageException.form("文件存储服务类型无法找到!");
}
String json = CacheUtils.get(CacheNames.SYS_OSS_CONFIG, configKey);
if (json == null) {
throw S3StorageException.form("系统异常, '" + configKey + "'配置信息不存在!");
}
OssProperties properties = JsonUtils.parseObject(json, OssProperties.class);
OssClientConfig config = OssClientConfig.formProperties(properties);
LOCK.lock();
ReentrantLock lock = getClientLock(configKey);
lock.lock();
try {
OssClient client = CLIENT_CACHE.get(configKey);
if (client != null) {
@@ -63,7 +67,7 @@ public class OssFactory {
CLIENT_CACHE.put(configKey, newClient);
return newClient;
} finally {
LOCK.unlock();
lock.unlock();
}
}
@@ -71,12 +75,25 @@ public class OssFactory {
* 移除实例
*/
public static boolean remove(String configKey) {
OssClient client = CLIENT_CACHE.remove(configKey);
if (client == null) {
if (StringUtils.isBlank(configKey)) {
return false;
}
closeClient(configKey, client);
return true;
ReentrantLock lock = getClientLock(configKey);
lock.lock();
try {
OssClient client = CLIENT_CACHE.remove(configKey);
if (client == null) {
return false;
}
closeClient(configKey, client);
return true;
} finally {
lock.unlock();
}
}
private static ReentrantLock getClientLock(String configKey) {
return CLIENT_LOCKS.computeIfAbsent(configKey, key -> new ReentrantLock());
}
private static void closeClient(String configKey, OssClient client) {

View File

@@ -74,6 +74,13 @@ public class Options {
* 创建可选项对象
*/
public static Options builder() {
return create();
}
/**
* 创建可选项对象
*/
public static Options create() {
return new Options();
}
}

View File

@@ -17,6 +17,7 @@ import org.dromara.common.json.utils.JsonUtils;
import org.dromara.common.mybatis.core.page.PageQuery;
import org.dromara.common.mybatis.core.query.QueryBuilder;
import org.dromara.common.oss.constant.OssConstant;
import org.dromara.common.oss.factory.OssFactory;
import org.dromara.common.redis.utils.CacheUtils;
import org.dromara.common.redis.utils.RedisUtils;
import org.dromara.system.domain.SysOssConfig;
@@ -29,6 +30,7 @@ import org.springframework.transaction.annotation.Transactional;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
/**
* 对象存储配置Service业务层处理
@@ -130,6 +132,7 @@ public class SysOssConfigServiceImpl implements ISysOssConfigService {
public Boolean updateByBo(SysOssConfigBo bo) {
SysOssConfig config = MapstructUtils.convert(bo, SysOssConfig.class);
validEntityBeforeSave(config);
SysOssConfig oldConfig = ossConfigMapper.selectById(config.getOssConfigId());
boolean flag = ossConfigMapper.lambda()
.set(ObjectUtil.isNull(config.getPrefix()), SysOssConfig::getPrefix, "")
.set(ObjectUtil.isNull(config.getRegion()), SysOssConfig::getRegion, "")
@@ -140,7 +143,12 @@ public class SysOssConfigServiceImpl implements ISysOssConfigService {
if (flag) {
// 从数据库查询完整的数据做缓存
config = ossConfigMapper.selectById(config.getOssConfigId());
if (ObjectUtil.isNotNull(oldConfig) && !Objects.equals(oldConfig.getConfigKey(), config.getConfigKey())) {
CacheUtils.evict(CacheNames.SYS_OSS_CONFIG, oldConfig.getConfigKey());
OssFactory.remove(oldConfig.getConfigKey());
}
CacheUtils.put(CacheNames.SYS_OSS_CONFIG, config.getConfigKey(), JsonUtils.toJsonString(config));
OssFactory.remove(config.getConfigKey());
}
return flag;
}
@@ -174,12 +182,16 @@ public class SysOssConfigServiceImpl implements ISysOssConfigService {
List<SysOssConfig> list = CollUtil.newArrayList();
for (Long configId : ids) {
SysOssConfig config = ossConfigMapper.selectById(configId);
list.add(config);
if (ObjectUtil.isNotNull(config)) {
list.add(config);
}
}
boolean flag = ossConfigMapper.deleteByIds(ids) > 0;
if (flag) {
list.forEach(sysOssConfig ->
CacheUtils.evict(CacheNames.SYS_OSS_CONFIG, sysOssConfig.getConfigKey()));
list.forEach(sysOssConfig -> {
CacheUtils.evict(CacheNames.SYS_OSS_CONFIG, sysOssConfig.getConfigKey());
OssFactory.remove(sysOssConfig.getConfigKey());
});
}
return flag;
}