Browse Source

feat(async): 添加异步任务配置和线程池管理

- 新增异步任务配置类 AsyncConfig,支持多线程池管理
- 配置核心业务线程池和 IO 密集型任务线程池
- 实现自定义线程工厂,支持线程优先级和异常处理
- 提供 DirectAsyncService 和 DirectCompletableFutureService 异步服务示例
- 支持通过 ThreadPoolProperties 配置线程池参数
- 添加异步任务异常处理机制和优雅关闭配置
wzq 2 weeks ago
parent
commit
b420f1be7a

+ 104 - 0
src/main/java/com/zsElectric/boot/common/async/AsyncConfig.java

@@ -0,0 +1,104 @@
+package com.zsElectric.boot.common.async;
+
+import jakarta.annotation.Resource;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.scheduling.annotation.AsyncConfigurer;
+import org.springframework.scheduling.annotation.EnableAsync;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.ThreadPoolExecutor;
+
+/**
+ * 异步任务配置类
+ * 功能:配置线程池参数、异常处理、优雅关闭等
+ */
+@Slf4j
+@Configuration
+@EnableAsync
+@EnableConfigurationProperties(ThreadPoolProperties.class)
+public class AsyncConfig implements AsyncConfigurer {
+
+    @Resource
+    private ThreadPoolProperties threadPoolProperties;
+
+    /**
+     * 核心业务线程池 - 用于处理重要异步任务
+     */
+    @Bean(name = "businessTaskExecutor")
+    public ThreadPoolTaskExecutor businessTaskExecutor() {
+        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
+        
+        // 核心线程池配置
+        executor.setCorePoolSize(threadPoolProperties.getCorePoolSize());
+        executor.setMaxPoolSize(threadPoolProperties.getMaxPoolSize());
+        executor.setQueueCapacity(threadPoolProperties.getQueueCapacity());
+        executor.setKeepAliveSeconds(threadPoolProperties.getKeepAliveSeconds());
+        
+        // 线程配置
+        executor.setThreadNamePrefix(threadPoolProperties.getThreadNamePrefix());
+        executor.setAllowCoreThreadTimeOut(threadPoolProperties.isAllowCoreThreadTimeOut());
+        
+        // 拒绝策略配置
+        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
+        
+        // 优雅关闭配置
+        executor.setWaitForTasksToCompleteOnShutdown(true);
+        executor.setAwaitTerminationSeconds(threadPoolProperties.getAwaitTerminationSeconds());
+        
+        // 线程工厂自定义(设置线程优先级、守护线程等)
+        executor.setThreadFactory(new CustomThreadFactory());
+        
+        executor.initialize();
+        return executor;
+    }
+
+    /**
+     * IO密集型任务线程池 - 适用于网络请求、文件操作等
+     */
+    @Bean(name = "ioIntensiveTaskExecutor")
+    public ThreadPoolTaskExecutor ioIntensiveTaskExecutor() {
+        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
+        
+        // IO密集型任务可以设置更大的线程数
+        int cpuCores = Runtime.getRuntime().availableProcessors();
+        executor.setCorePoolSize(cpuCores * 2);
+        executor.setMaxPoolSize(cpuCores * 4);
+        executor.setQueueCapacity(500);
+        executor.setKeepAliveSeconds(120);
+        executor.setThreadNamePrefix("io-task-");
+        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
+        executor.setWaitForTasksToCompleteOnShutdown(true);
+        executor.setAwaitTerminationSeconds(60);
+        executor.initialize();
+        
+        return executor;
+    }
+
+    /**
+     * 默认异步执行器(覆盖Spring默认配置)
+     */
+    @Override
+    public Executor getAsyncExecutor() {
+        return businessTaskExecutor();
+    }
+
+    /**
+     * 异步任务异常处理
+     * 用于处理无返回值的@Async方法的异常
+     *  第一层:业务异常处理
+     */
+    @Override
+    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
+        return (throwable, method, params) -> {
+            // 记录异步任务执行异常
+            log.info("异步任务执行异常 - 方法: {}, 异常: {}", method.getName(), throwable.getMessage());
+            // 这里可以接入日志框架、告警系统等
+            throwable.printStackTrace();
+        };
+    }
+}

+ 46 - 0
src/main/java/com/zsElectric/boot/common/async/CustomThreadFactory.java

@@ -0,0 +1,46 @@
+package com.zsElectric.boot.common.async;
+
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * 自定义线程工厂
+ * 用于设置线程优先级、守护线程状态等
+ */
+public class CustomThreadFactory implements ThreadFactory {
+    private final AtomicInteger threadNumber = new AtomicInteger(1);
+    private final String namePrefix;
+    private final ThreadGroup group;
+
+    public CustomThreadFactory() {
+        SecurityManager s = System.getSecurityManager();
+        group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
+        namePrefix = "custom-async-thread-";
+    }
+
+    @Override
+    public Thread newThread(Runnable r) {
+        Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
+        
+        // 设置线程为非守护线程
+        if (t.isDaemon()) {
+            t.setDaemon(false);
+        }
+        
+        // 设置线程优先级
+        if (t.getPriority() != Thread.NORM_PRIORITY) {
+            t.setPriority(Thread.NORM_PRIORITY);
+        }
+        
+        // 设置线程异常处理器
+        t.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
+            //当线程中发生未捕获的异常时,就会调用这个处理器。第二层:系统级异常处理
+            @Override
+            public void uncaughtException(Thread t, Throwable e) {
+                System.err.println("线程执行异常: " + t.getName() + ", 异常: " + e.getMessage());
+            }
+        });
+        
+        return t;
+    }
+}

+ 42 - 0
src/main/java/com/zsElectric/boot/common/async/DirectAsyncService.java

@@ -0,0 +1,42 @@
+package com.zsElectric.boot.common.async;
+
+import com.youlai.boot.common.result.Result;
+import org.apache.poi.ss.formula.functions.T;
+import org.springframework.scheduling.annotation.Async;
+import org.springframework.stereotype.Service;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * 使用@async 进行异步任务处理
+ */
+@Service
+public class DirectAsyncService {
+    
+    /**
+     * 使用默认线程池执行异步任务
+     */
+    @Async
+    public void processOrderAsync(String orderId) {
+        // 异步处理订单逻辑
+        System.out.println("订单处理线程: " + Thread.currentThread().getName());
+    }
+    
+    /**
+     * 使用指定的IO密集型线程池
+     */
+    @Async("ioIntensiveTaskExecutor")
+    public CompletableFuture<String> downloadFileAsync(String fileUrl) {
+        // 异步下载文件
+        return CompletableFuture.completedFuture("下载完成");
+    }
+    
+    /**
+     * 有返回值的异步任务
+     */
+    @Async("businessTaskExecutor")
+    public CompletableFuture<Result<T>> heavyCalculationAsync(String data) {
+        // 复杂的计算任务
+        return CompletableFuture.completedFuture(Result.success("计算完成"));
+    }
+}

+ 48 - 0
src/main/java/com/zsElectric/boot/common/async/DirectCompletableFutureService.java

@@ -0,0 +1,48 @@
+package com.zsElectric.boot.common.async;
+
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+import org.springframework.stereotype.Service;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * 使用 CompletableFuture 进行异步任务处理
+ */
+@Slf4j
+@Service
+@AllArgsConstructor
+public class DirectCompletableFutureService {
+
+    @Autowired
+    @Qualifier("businessTaskExecutor")
+    private ThreadPoolTaskExecutor businessTaskExecutor;
+
+
+    /**
+     * 执行有返回值的异步任务
+     */
+    public CompletableFuture<String> executeTaskWithResult(String taskData) {
+        // 使用 supplyAsync 并指定自定义线程池
+        return CompletableFuture.supplyAsync(() -> {
+            // 这里是你的异步任务逻辑
+            System.out.println("执行线程: " + Thread.currentThread().getName());
+            return "处理结果: " + taskData;
+        }, businessTaskExecutor); // 关键:传入线程池实例
+    }
+
+    /**
+     * 执行无返回值的异步任务
+     */
+    public CompletableFuture<Void> executeTaskWithoutResult() {
+        // 使用 runAsync 并指定自定义线程池
+        return CompletableFuture.runAsync(() -> {
+            // 异步任务逻辑
+            System.out.println("执行线程: " + Thread.currentThread().getName());
+            System.out.println("执行无返回值任务");
+        }, businessTaskExecutor);
+    }
+}

+ 36 - 0
src/main/java/com/zsElectric/boot/common/async/ThreadPoolProperties.java

@@ -0,0 +1,36 @@
+package com.zsElectric.boot.common.async;
+
+import lombok.Data;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.stereotype.Component;
+
+/**
+ * 线程池配置属性类
+ * 支持在application.yml中动态配置
+ */
+@Component
+@Data
+@ConfigurationProperties(prefix = "async.thread-pool")
+public class ThreadPoolProperties {
+
+    // 核心线程数
+    private int corePoolSize = 10;
+    
+    // 最大线程数
+    private int maxPoolSize = 50;
+    
+    // 队列容量
+    private int queueCapacity = 1000;
+    
+    // 线程空闲存活时间(秒)
+    private int keepAliveSeconds = 60;
+    
+    // 线程名称前缀
+    private String threadNamePrefix = "async-business-";
+    
+    // 是否允许核心线程超时
+    private boolean allowCoreThreadTimeOut = false;
+    
+    // 优雅关闭等待时间(秒)
+    private int awaitTerminationSeconds = 30;
+}