From ad6b3d4b3fa44af1f6958a83c21bf1643757cffd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=A7=8B=E8=BE=9E=E6=9C=AA=E5=AF=92?= <545073804@qq.com> Date: Tue, 12 Aug 2025 16:17:07 +0800 Subject: [PATCH] =?UTF-8?q?update=20=E6=94=B9=E7=94=A8=E5=8F=91=E5=B8=83?= =?UTF-8?q?=E8=AE=A2=E9=98=85=E7=9A=84=E6=96=B9=E5=BC=8F=E6=9B=BF=E4=BB=A3?= =?UTF-8?q?=E9=98=BB=E5=A1=9E=E6=B5=81=EF=BC=8C=E4=BC=98=E5=8C=96=E5=A4=A7?= =?UTF-8?q?=E6=96=87=E4=BB=B6=E4=B8=8B=E8=BD=BD=E6=97=B6=E7=9A=84=E5=86=85?= =?UTF-8?q?=E5=AD=98=E5=8D=A0=E7=94=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../dromara/common/oss/core/OssClient.java | 62 ++++++++++++++----- .../common/oss/core/WriteOutSubscriber.java | 15 +++++ 2 files changed, 62 insertions(+), 15 deletions(-) create mode 100644 ruoyi-common/ruoyi-common-oss/src/main/java/org/dromara/common/oss/core/WriteOutSubscriber.java diff --git a/ruoyi-common/ruoyi-common-oss/src/main/java/org/dromara/common/oss/core/OssClient.java b/ruoyi-common/ruoyi-common-oss/src/main/java/org/dromara/common/oss/core/OssClient.java index 7eab05a3b..6ced41b52 100644 --- a/ruoyi-common/ruoyi-common-oss/src/main/java/org/dromara/common/oss/core/OssClient.java +++ b/ruoyi-common/ruoyi-common-oss/src/main/java/org/dromara/common/oss/core/OssClient.java @@ -13,9 +13,7 @@ import org.dromara.common.oss.exception.OssException; import org.dromara.common.oss.properties.OssProperties; import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; -import software.amazon.awssdk.core.ResponseInputStream; -import software.amazon.awssdk.core.async.AsyncResponseTransformer; -import software.amazon.awssdk.core.async.BlockingInputStreamAsyncRequestBody; +import software.amazon.awssdk.core.async.*; import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.s3.S3AsyncClient; @@ -29,9 +27,12 @@ import software.amazon.awssdk.transfer.s3.progress.LoggingTransferListener; import java.io.*; import java.net.URI; import java.net.URL; +import java.nio.channels.Channels; +import java.nio.channels.WritableByteChannel; import java.nio.file.Files; import java.nio.file.Path; import java.time.Duration; +import java.util.Optional; import java.util.function.Consumer; /** @@ -237,30 +238,61 @@ public class OssClient { * @param key 文件在 Amazon S3 中的对象键 * @param out 输出流 * @param consumer 自定义处理逻辑 - * @return 输出流中写入的字节数(长度) * @throws OssException 如果下载失败,抛出自定义异常 */ public void download(String key, OutputStream out, Consumer consumer) { + try { + this.download(key, consumer).writeTo(out); + } catch (Exception e) { + throw new OssException("文件下载失败,错误信息:[" + e.getMessage() + "]"); + } + } + + /** + * 下载文件从 Amazon S3 到 输出流 + * + * @param key 文件在 Amazon S3 中的对象键 + * @param contentLengthConsumer 文件大小消费者函数 + * @return 写出订阅器 + * @throws OssException 如果下载失败,抛出自定义异常 + */ + public WriteOutSubscriber download(String key, Consumer contentLengthConsumer) { try { // 构建下载请求 - DownloadRequest> downloadRequest = DownloadRequest.builder() + DownloadRequest> publisherDownloadRequest = DownloadRequest.builder() // 文件对象 .getObjectRequest(y -> y.bucket(properties.getBucketName()) .key(key) .build()) .addTransferListener(LoggingTransferListener.create()) - // 使用订阅转换器 - .responseTransformer(AsyncResponseTransformer.toBlockingInputStream()) + // 使用发布订阅转换器 + .responseTransformer(AsyncResponseTransformer.toPublisher()) .build(); + // 使用 S3TransferManager 下载文件 - Download> responseFuture = transferManager.download(downloadRequest); - // 输出到流中 - try (ResponseInputStream responseStream = responseFuture.completionFuture().join().result()) { // auto-closeable stream - if (consumer != null) { - consumer.accept(responseStream.response().contentLength()); - } - responseStream.transferTo(out); // 阻塞调用线程 blocks the calling thread - } + Download> publisherDownload = transferManager.download(publisherDownloadRequest); + // 获取下载发布订阅转换器 + ResponsePublisher publisher = publisherDownload.completionFuture().join().result(); + // 执行文件大小消费者函数 + Optional.ofNullable(contentLengthConsumer) + .ifPresent(lengthConsumer -> lengthConsumer.accept(publisher.response().contentLength())); + + // 构建写出订阅器对象 + return out -> { + // 注意,此处不需要显式关闭 channel ,channel 会在 out 关闭时自动关闭 + WritableByteChannel channel = Channels.newChannel(out); + + // 订阅数据 + publisher.subscribe(byteBuffer -> { + try { + while (byteBuffer.hasRemaining()) { + channel.write(byteBuffer); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + }).join(); + }; } catch (Exception e) { throw new OssException("文件下载失败,错误信息:[" + e.getMessage() + "]"); } diff --git a/ruoyi-common/ruoyi-common-oss/src/main/java/org/dromara/common/oss/core/WriteOutSubscriber.java b/ruoyi-common/ruoyi-common-oss/src/main/java/org/dromara/common/oss/core/WriteOutSubscriber.java new file mode 100644 index 000000000..d3a9841a1 --- /dev/null +++ b/ruoyi-common/ruoyi-common-oss/src/main/java/org/dromara/common/oss/core/WriteOutSubscriber.java @@ -0,0 +1,15 @@ +package org.dromara.common.oss.core; + +import java.io.IOException; + +/** + * 写出订阅器 + * + * @author 秋辞未寒 + */ +@FunctionalInterface +public interface WriteOutSubscriber { + + void writeTo(T out) throws IOException; + +}