update 优化 mqtt 客户端 增加虚拟线程支持

This commit is contained in:
疯狂的狮子Li
2026-02-10 16:32:50 +08:00
parent 529f614dae
commit 3b5d7eba37
2 changed files with 40 additions and 1 deletions

View File

@@ -50,7 +50,7 @@
<!-- 工作流配置 -->
<warm-flow.version>1.8.4</warm-flow.version>
<!-- mqtt客户端 -->
<mica-mqtt.version>2.5.11</mica-mqtt.version>
<mica-mqtt.version>2.5.12</mica-mqtt.version>
<!-- 插件版本 -->
<maven-jar-plugin.version>3.4.2</maven-jar-plugin.version>

View File

@@ -3,9 +3,19 @@ package org.dromara.common.mqtt.config;
import org.dromara.common.mqtt.listener.MqttClientConnectListener;
import org.dromara.common.mqtt.listener.MqttClientGlobalMessageListener;
import org.dromara.mica.mqtt.core.client.MqttClientCreator;
import org.dromara.mica.mqtt.core.client.MqttClientCustomizer;
import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.core.task.VirtualThreadTaskExecutor;
import org.tio.utils.thread.ThreadUtils;
import org.tio.utils.thread.pool.SynThreadPoolExecutor;
import org.tio.utils.thread.pool.TioCallerRunsPolicy;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* mqtt客户端配置初始化
@@ -31,4 +41,33 @@ public class MqttAutoConfiguration {
return new MqttClientGlobalMessageListener();
}
/**
* 客户端使用虚拟线程配置
*/
@Bean
public MqttClientCustomizer mqttClientCustomizer() {
return creator -> {
// 这个数不重要 已经使用虚拟线程 就是填一下防止报错
int corePoolSize = ThreadUtils.CORE_POOL_SIZE;
ThreadFactory factory = new VirtualThreadTaskExecutor("tio-worker-virtual").getVirtualThreadFactory();
SynThreadPoolExecutor tioExecutor = new SynThreadPoolExecutor(corePoolSize, corePoolSize,
0L, new LinkedBlockingQueue<>(), factory, new TioCallerRunsPolicy());
tioExecutor.prestartCoreThread();
creator.tioExecutor(tioExecutor);
ThreadFactory factory1 = new VirtualThreadTaskExecutor("tio-group-virtual").getVirtualThreadFactory();
ThreadPoolExecutor groupExecutor = new ThreadPoolExecutor(corePoolSize, corePoolSize,
0L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), factory1, new TioCallerRunsPolicy());
groupExecutor.prestartCoreThread();
creator.groupExecutor(groupExecutor);
ThreadFactory factory2 = new VirtualThreadTaskExecutor("biz-worker-virtual").getVirtualThreadFactory();
ThreadPoolExecutor mqttExecutor = new ThreadPoolExecutor(corePoolSize, corePoolSize,
0L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), factory2, new TioCallerRunsPolicy());
mqttExecutor.prestartCoreThread();
creator.mqttExecutor(mqttExecutor);
};
}
}