diff --git a/pom.xml b/pom.xml index 158e4f95c..7afa6fd80 100644 --- a/pom.xml +++ b/pom.xml @@ -50,7 +50,7 @@ 1.8.4 - 2.5.11 + 2.5.12 3.4.2 diff --git a/ruoyi-common/ruoyi-common-mqtt/src/main/java/org/dromara/common/mqtt/config/MqttAutoConfiguration.java b/ruoyi-common/ruoyi-common-mqtt/src/main/java/org/dromara/common/mqtt/config/MqttAutoConfiguration.java index d0c06f19c..11f352988 100644 --- a/ruoyi-common/ruoyi-common-mqtt/src/main/java/org/dromara/common/mqtt/config/MqttAutoConfiguration.java +++ b/ruoyi-common/ruoyi-common-mqtt/src/main/java/org/dromara/common/mqtt/config/MqttAutoConfiguration.java @@ -3,9 +3,19 @@ package org.dromara.common.mqtt.config; import org.dromara.common.mqtt.listener.MqttClientConnectListener; import org.dromara.common.mqtt.listener.MqttClientGlobalMessageListener; import org.dromara.mica.mqtt.core.client.MqttClientCreator; +import org.dromara.mica.mqtt.core.client.MqttClientCustomizer; import org.springframework.boot.autoconfigure.AutoConfiguration; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Bean; +import org.springframework.core.task.VirtualThreadTaskExecutor; +import org.tio.utils.thread.ThreadUtils; +import org.tio.utils.thread.pool.SynThreadPoolExecutor; +import org.tio.utils.thread.pool.TioCallerRunsPolicy; + +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; /** * mqtt客户端配置初始化 @@ -31,4 +41,33 @@ public class MqttAutoConfiguration { return new MqttClientGlobalMessageListener(); } + /** + * 客户端使用虚拟线程配置 + */ + @Bean + public MqttClientCustomizer mqttClientCustomizer() { + return creator -> { + // 这个数不重要 已经使用虚拟线程 就是填一下防止报错 + int corePoolSize = ThreadUtils.CORE_POOL_SIZE; + + ThreadFactory factory = new VirtualThreadTaskExecutor("tio-worker-virtual").getVirtualThreadFactory(); + SynThreadPoolExecutor tioExecutor = new SynThreadPoolExecutor(corePoolSize, corePoolSize, + 0L, new LinkedBlockingQueue<>(), factory, new TioCallerRunsPolicy()); + tioExecutor.prestartCoreThread(); + creator.tioExecutor(tioExecutor); + + ThreadFactory factory1 = new VirtualThreadTaskExecutor("tio-group-virtual").getVirtualThreadFactory(); + ThreadPoolExecutor groupExecutor = new ThreadPoolExecutor(corePoolSize, corePoolSize, + 0L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), factory1, new TioCallerRunsPolicy()); + groupExecutor.prestartCoreThread(); + creator.groupExecutor(groupExecutor); + + ThreadFactory factory2 = new VirtualThreadTaskExecutor("biz-worker-virtual").getVirtualThreadFactory(); + ThreadPoolExecutor mqttExecutor = new ThreadPoolExecutor(corePoolSize, corePoolSize, + 0L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), factory2, new TioCallerRunsPolicy()); + mqttExecutor.prestartCoreThread(); + creator.mqttExecutor(mqttExecutor); + }; + } + }