diff --git a/zt-framework/zt-common/src/main/java/com/zt/plat/framework/common/util/asyncTask/AsyncLatchUtils.java b/zt-framework/zt-common/src/main/java/com/zt/plat/framework/common/util/asyncTask/AsyncLatchUtils.java new file mode 100644 index 00000000..6705c3e7 --- /dev/null +++ b/zt-framework/zt-common/src/main/java/com/zt/plat/framework/common/util/asyncTask/AsyncLatchUtils.java @@ -0,0 +1,133 @@ +package com.zt.plat.framework.common.util.asyncTask; + +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.*; + +/** + * 异步任务同步处理工具类 + * 多次提交,一次等待 + */ +public class AsyncLatchUtils { + private static final ThreadLocal> THREADLOCAL = ThreadLocal.withInitial(LinkedList::new); + + /** + * 提交一个异步任务 + * @param executor 指定执行此任务的线程池 + * @param runnable 需要异步执行的具体业务逻辑 + */ + public static void submitTask(Executor executor, Runnable runnable) { + THREADLOCAL.get().add(new TaskInfo(executor, runnable)); + } + + /** + * 获取当前线程已提交的任务列表,并自动清理当前线程的已提交任务列表。 + * @return + */ + private static List popTask() { + List taskInfos = THREADLOCAL.get(); + THREADLOCAL.remove(); + return taskInfos; + } + + /** + * 触发所有已提交任务的执行,并同步等待它们全部完成。 + * @param timeout 最长等待时间 + * @param timeUnit 等待时间单位 + * @return true: 如果所有任务在指定时间内成功完成。false: 如果等待超时。 + * 该方法在执行后会自动清理当前线程提交的任务列表,因此可以重复使用。 + */ + public static boolean waitFor(long timeout, TimeUnit timeUnit) { + List taskInfos = popTask(); + if (taskInfos.isEmpty()) { + return true; + } + CountDownLatch latch = new CountDownLatch(taskInfos.size()); + for (TaskInfo taskInfo : taskInfos) { + Executor executor = taskInfo.executor; + Runnable runnable = taskInfo.runnable; + executor.execute(() -> { + try { + runnable.run(); + } finally { + latch.countDown(); + } + }); + } + boolean await = false; + try { + await = latch.await(timeout, timeUnit); + } catch (Exception ignored) {} + return await; + } + + private static final class TaskInfo { + private final Executor executor; + private final Runnable runnable; + + public TaskInfo(Executor executor, Runnable runnable) { + this.executor = executor; + this.runnable = runnable; + } + } + + /** + * 调用样例 + * @param args + */ + public static void main(String[] args) { + // 1. 准备一个线程池 + ExecutorService executorService = Executors.newFixedThreadPool(3); + + System.out.println("主流程开始,准备分发异步任务..."); + + // 2. 提交多个异步任务 + // 任务一:获取用户信息 + AsyncLatchUtils.submitTask(executorService, () -> { + try { + System.out.println("开始获取用户信息..."); + Thread.sleep(1000); // 模拟耗时 + System.out.println("获取用户信息成功!"); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }); + + // 任务二:获取订单信息 + AsyncLatchUtils.submitTask(executorService, () -> { + try { + System.out.println("开始获取订单信息..."); + Thread.sleep(1500); // 模拟耗时 + System.out.println("获取订单信息成功!"); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }); + + // 任务三:获取商品信息 + AsyncLatchUtils.submitTask(executorService, () -> { + try { + System.out.println("开始获取商品信息..."); + Thread.sleep(500); // 模拟耗时 + System.out.println("获取商品信息成功!"); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }); + + System.out.println("所有异步任务已提交,主线程开始等待..."); + + // 3. 等待所有任务完成,最长等待5秒 + boolean allTasksCompleted = AsyncLatchUtils.waitFor(5, TimeUnit.SECONDS); + + // 4. 根据等待结果继续主流程 + if (allTasksCompleted) { + System.out.println("所有异步任务执行成功,主流程继续..."); + } else { + System.err.println("有任务执行超时,主流程中断!"); + } + + // 5. 关闭线程池 + executorService.shutdown(); + } +}