update 改用发布订阅的方式替代阻塞流,优化大文件下载时的内存占用

This commit is contained in:
秋辞未寒 2025-08-12 16:17:07 +08:00
parent e2801037cf
commit ad6b3d4b3f
2 changed files with 62 additions and 15 deletions

View File

@ -13,9 +13,7 @@ import org.dromara.common.oss.exception.OssException;
import org.dromara.common.oss.properties.OssProperties; import org.dromara.common.oss.properties.OssProperties;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.core.ResponseInputStream; import software.amazon.awssdk.core.async.*;
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
import software.amazon.awssdk.core.async.BlockingInputStreamAsyncRequestBody;
import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient; import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.S3AsyncClient;
@ -29,9 +27,12 @@ import software.amazon.awssdk.transfer.s3.progress.LoggingTransferListener;
import java.io.*; import java.io.*;
import java.net.URI; import java.net.URI;
import java.net.URL; import java.net.URL;
import java.nio.channels.Channels;
import java.nio.channels.WritableByteChannel;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.time.Duration; import java.time.Duration;
import java.util.Optional;
import java.util.function.Consumer; import java.util.function.Consumer;
/** /**
@ -237,30 +238,61 @@ public class OssClient {
* @param key 文件在 Amazon S3 中的对象键 * @param key 文件在 Amazon S3 中的对象键
* @param out 输出流 * @param out 输出流
* @param consumer 自定义处理逻辑 * @param consumer 自定义处理逻辑
* @return 输出流中写入的字节数长度
* @throws OssException 如果下载失败抛出自定义异常 * @throws OssException 如果下载失败抛出自定义异常
*/ */
public void download(String key, OutputStream out, Consumer<Long> consumer) { public void download(String key, OutputStream out, Consumer<Long> consumer) {
try {
this.download(key, consumer).writeTo(out);
} catch (Exception e) {
throw new OssException("文件下载失败,错误信息:[" + e.getMessage() + "]");
}
}
/**
* 下载文件从 Amazon S3 输出流
*
* @param key 文件在 Amazon S3 中的对象键
* @param contentLengthConsumer 文件大小消费者函数
* @return 写出订阅器
* @throws OssException 如果下载失败抛出自定义异常
*/
public WriteOutSubscriber<OutputStream> download(String key, Consumer<Long> contentLengthConsumer) {
try { try {
// 构建下载请求 // 构建下载请求
DownloadRequest<ResponseInputStream<GetObjectResponse>> downloadRequest = DownloadRequest.builder() DownloadRequest<ResponsePublisher<GetObjectResponse>> publisherDownloadRequest = DownloadRequest.builder()
// 文件对象 // 文件对象
.getObjectRequest(y -> y.bucket(properties.getBucketName()) .getObjectRequest(y -> y.bucket(properties.getBucketName())
.key(key) .key(key)
.build()) .build())
.addTransferListener(LoggingTransferListener.create()) .addTransferListener(LoggingTransferListener.create())
// 使用订阅转换器 // 使用发布订阅转换器
.responseTransformer(AsyncResponseTransformer.toBlockingInputStream()) .responseTransformer(AsyncResponseTransformer.toPublisher())
.build(); .build();
// 使用 S3TransferManager 下载文件 // 使用 S3TransferManager 下载文件
Download<ResponseInputStream<GetObjectResponse>> responseFuture = transferManager.download(downloadRequest); Download<ResponsePublisher<GetObjectResponse>> publisherDownload = transferManager.download(publisherDownloadRequest);
// 输出到流中 // 获取下载发布订阅转换器
try (ResponseInputStream<GetObjectResponse> responseStream = responseFuture.completionFuture().join().result()) { // auto-closeable stream ResponsePublisher<GetObjectResponse> publisher = publisherDownload.completionFuture().join().result();
if (consumer != null) { // 执行文件大小消费者函数
consumer.accept(responseStream.response().contentLength()); Optional.ofNullable(contentLengthConsumer)
} .ifPresent(lengthConsumer -> lengthConsumer.accept(publisher.response().contentLength()));
responseStream.transferTo(out); // 阻塞调用线程 blocks the calling thread
} // 构建写出订阅器对象
return out -> {
// 注意此处不需要显式关闭 channel channel 会在 out 关闭时自动关闭
WritableByteChannel channel = Channels.newChannel(out);
// 订阅数据
publisher.subscribe(byteBuffer -> {
try {
while (byteBuffer.hasRemaining()) {
channel.write(byteBuffer);
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}).join();
};
} catch (Exception e) { } catch (Exception e) {
throw new OssException("文件下载失败,错误信息:[" + e.getMessage() + "]"); throw new OssException("文件下载失败,错误信息:[" + e.getMessage() + "]");
} }

View File

@ -0,0 +1,15 @@
package org.dromara.common.oss.core;
import java.io.IOException;
/**
* 写出订阅器
*
* @author 秋辞未寒
*/
@FunctionalInterface
public interface WriteOutSubscriber<T> {
void writeTo(T out) throws IOException;
}