Flink源码-CompletableFuture异步编程

2025/12/25

Tags: Flink

1. 基础概念

1.1 什么是 CompletableFuture

CompletableFuture 是 Java 8 引入的异步编程工具,它代表一个可能尚未完成的异步计算结果。与传统的 Future 相比,它支持:

1.2 同步 vs 异步对比

// ==================== 同步方式 - 阻塞等待 ====================
public String processSync() {
    String result1 = step1();           // 阻塞 2 秒
    String result2 = step2(result1);    // 阻塞 2 秒
    String result3 = step3(result2);    // 阻塞 2 秒
    return result3;                     // 总耗时: 6 秒
}

// ==================== 异步方式 - 非阻塞 ====================
public CompletableFuture<String> processAsync() {
    return CompletableFuture
        .supplyAsync(() -> step1())              // 异步执行
        .thenApplyAsync(result1 -> step2(result1))  // 链式处理
        .thenApplyAsync(result2 -> step3(result2)); // 继续链式
    // 主线程立即返回,不阻塞
}

1.3 核心接口关系

                    ┌─────────────────┐
                    │     Future      │
                    └────────┬────────┘
                             │
              ┌──────────────┴─────────────┐
              │                            │
    ┌─────────┴─────────┐       ┌──────────┴──────────┐
    │  CompletionStage  │       │   CompletableFuture │
    │   (接口)           │◄------│   (实现类)           │
    └───────────────────┘       └─────────────────────┘

2. 创建 CompletableFuture

2.1 已完成的 Future

// 创建一个已经完成的 Future(成功)
CompletableFuture<String> completed = CompletableFuture.completedFuture("result");
System.out.println(completed.isDone());  // true
System.out.println(completed.get());     // "result"

// 创建一个已经完成的 Future(失败)- Java 9+
CompletableFuture<String> failed = CompletableFuture.failedFuture(
    new RuntimeException("error"));

// Java 8 兼容写法
CompletableFuture<String> failedJava8 = new CompletableFuture<>();
failedJava8.completeExceptionally(new RuntimeException("error"));

2.2 异步执行有返回值的任务 - supplyAsync

// 使用默认线程池(ForkJoinPool.commonPool())
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
    System.out.println("执行线程: " + Thread.currentThread().getName());
    // 模拟耗时操作
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    }
    return "async result";
});

// 使用自定义线程池
ExecutorService executor = Executors.newFixedThreadPool(4);
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
    return "custom executor result";
}, executor);

2.3 异步执行无返回值的任务 - runAsync

// 无返回值的异步任务
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
    System.out.println("执行异步任务,线程: " + Thread.currentThread().getName());
    // 执行一些操作,没有返回值
    doSomething();
});

// 等待完成
future.join();

2.4 手动创建并控制完成

// 手动创建 CompletableFuture
CompletableFuture<String> manualFuture = new CompletableFuture<>();

// 在另一个线程中完成它
new Thread(() -> {
    try {
        Thread.sleep(2000);
        // 正常完成
        manualFuture.complete("manual result");
        
        // 或者异常完成
        // manualFuture.completeExceptionally(new RuntimeException("error"));
    } catch (InterruptedException e) {
        manualFuture.completeExceptionally(e);
    }
}).start();

// 主线程可以等待结果
String result = manualFuture.get();  // 阻塞等待

3. 链式处理

3.1 thenApply - 转换结果(同步)

// thenApply: 对结果进行同步转换
// Function<T, U>: 接收 T 类型,返回 U 类型
CompletableFuture<String> future = CompletableFuture
    .supplyAsync(() -> 42)                          // Integer
    .thenApply(num -> num * 2)                      // Integer -> Integer
    .thenApply(num -> "Result: " + num);            // Integer -> String

System.out.println(future.get());  // "Result: 84"

3.2 thenApplyAsync - 转换结果(异步)

// thenApplyAsync: 在异步线程中执行转换
ExecutorService executor = Executors.newFixedThreadPool(2);

CompletableFuture<String> future = CompletableFuture
    .supplyAsync(() -> {
        System.out.println("Step 1: " + Thread.currentThread().getName());
        return 42;
    })
    .thenApplyAsync(num -> {
        System.out.println("Step 2: " + Thread.currentThread().getName());
        return num * 2;
    }, executor)  // 使用自定义线程池
    .thenApplyAsync(num -> {
        System.out.println("Step 3: " + Thread.currentThread().getName());
        return "Result: " + num;
    });  // 使用默认线程池

executor.shutdown();

3.3 thenAccept - 消费结果(无返回值)

// thenAccept: 消费结果,不返回新值
// Consumer<T>: 接收 T 类型,无返回值
CompletableFuture<Void> future = CompletableFuture
    .supplyAsync(() -> "Hello")
    .thenApply(s -> s + " World")
    .thenAccept(result -> {
        System.out.println("最终结果: " + result);
        // 可以在这里保存到数据库、发送通知等
    });

future.join();  // 等待完成

3.4 thenRun - 执行后续操作(不关心结果)

// thenRun: 不关心前一步的结果,只是在完成后执行某些操作
// Runnable: 无参数,无返回值
CompletableFuture<Void> future = CompletableFuture
    .supplyAsync(() -> {
        System.out.println("执行主要任务");
        return "result";
    })
    .thenRun(() -> {
        System.out.println("任务完成,执行清理操作");
        // 清理资源、记录日志等
    });

3.5 thenCompose - 扁平化嵌套 Future(重要!)

// 问题:thenApply 会产生嵌套的 CompletableFuture
CompletableFuture<CompletableFuture<String>> nested = CompletableFuture
    .supplyAsync(() -> "input")
    .thenApply(input -> asyncProcess(input));  // 返回 CompletableFuture<String>

// 解决:使用 thenCompose 扁平化
CompletableFuture<String> flattened = CompletableFuture
    .supplyAsync(() -> "input")
    .thenCompose(input -> asyncProcess(input));  // 自动扁平化

// 辅助方法
private CompletableFuture<String> asyncProcess(String input) {
    return CompletableFuture.supplyAsync(() -> "processed: " + input);
}

3.6 thenCompose vs thenApply

// ==================== thenApply ====================
// 用于同步转换,lambda 返回普通值
CompletableFuture<String> f1 = CompletableFuture
    .supplyAsync(() -> 1)
    .thenApply(n -> "Number: " + n);  // 返回 String

// ==================== thenCompose ====================
// 用于异步转换,lambda 返回 CompletableFuture
CompletableFuture<String> f2 = CompletableFuture
    .supplyAsync(() -> 1)
    .thenCompose(n -> CompletableFuture.supplyAsync(() -> "Number: " + n));

// 类比 Stream API:
// thenApply  ≈ map()
// thenCompose ≈ flatMap()

3.7 Function.identity() 模式

// 这是 Flink 中常见的模式
// 当 handleAsync 返回 CompletableFuture 时,需要扁平化

CompletableFuture<String> result = CompletableFuture
    .supplyAsync(() -> "input")
    .handleAsync((value, throwable) -> {
        if (throwable != null) {
            return CompletableFuture.completedFuture("error fallback");
        }
        return asyncProcess(value);  // 返回 CompletableFuture<String>
    })
    // handleAsync 返回 CompletableFuture<CompletableFuture<String>>
    // 使用 thenCompose(Function.identity()) 扁平化
    .thenCompose(Function.identity());  // 等价于 .thenCompose(f -> f)

4. 异常处理

4.1 exceptionally - 异常恢复

// exceptionally: 只处理异常情况,正常情况直接传递
CompletableFuture<String> future = CompletableFuture
    .supplyAsync(() -> {
        if (Math.random() > 0.5) {
            throw new RuntimeException("Random error");
        }
        return "success";
    })
    .exceptionally(throwable -> {
        System.err.println("发生异常: " + throwable.getMessage());
        return "default value";  // 返回默认值
    });

System.out.println(future.get());  // "success" 或 "default value"

4.2 handle - 统一处理结果和异常

// handle: 同时处理正常结果和异常
// BiFunction<T, Throwable, U>: 接收结果和异常,返回新值
CompletableFuture<String> future = CompletableFuture
    .supplyAsync(() -> {
        if (Math.random() > 0.5) {
            throw new RuntimeException("Random error");
        }
        return "success";
    })
    .handle((result, throwable) -> {
        if (throwable != null) {
            // 异常情况
            System.err.println("异常: " + throwable.getMessage());
            return "error: " + throwable.getMessage();
        }
        // 正常情况
        return "result: " + result;
    });

4.3 handleAsync - 异步处理结果和异常

ExecutorService executor = Executors.newFixedThreadPool(2);

CompletableFuture<String> future = CompletableFuture
    .supplyAsync(() -> {
        System.out.println("主任务线程: " + Thread.currentThread().getName());
        return "data";
    })
    .handleAsync((result, throwable) -> {
        System.out.println("处理线程: " + Thread.currentThread().getName());
        if (throwable != null) {
            return "error";
        }
        return "processed: " + result;
    }, executor);  // 在指定线程池中执行

executor.shutdown();

4.4 whenComplete - 观察结果(不改变结果)

// whenComplete: 观察结果和异常,但不改变它们
// BiConsumer<T, Throwable>: 接收结果和异常,无返回值
CompletableFuture<String> future = CompletableFuture
    .supplyAsync(() -> "result")
    .whenComplete((result, throwable) -> {
        if (throwable != null) {
            System.err.println("任务失败: " + throwable.getMessage());
        } else {
            System.out.println("任务成功: " + result);
        }
        // 注意:这里不能改变结果
    });

// whenComplete 返回的 Future 包含原始结果
System.out.println(future.get());  // "result"

4.5 异常处理链

CompletableFuture<String> future = CompletableFuture
    .supplyAsync(() -> {
        throw new RuntimeException("Step 1 error");
    })
    .thenApply(result -> {
        System.out.println("Step 2");  // 不会执行
        return result + " step2";
    })
    .thenApply(result -> {
        System.out.println("Step 3");  // 不会执行
        return result + " step3";
    })
    .exceptionally(throwable -> {
        // 捕获之前所有步骤的异常
        System.err.println("捕获异常: " + throwable.getMessage());
        return "recovered";
    })
    .thenApply(result -> {
        System.out.println("Step 4: " + result);  // 会执行
        return result + " step4";
    });

System.out.println(future.get());  // "recovered step4"

4.6 异常类型处理

CompletableFuture<String> future = CompletableFuture
    .supplyAsync(() -> {
        throw new IllegalArgumentException("Invalid argument");
    })
    .handle((result, throwable) -> {
        if (throwable != null) {
            // 获取真正的异常(去除 CompletionException 包装)
            Throwable cause = throwable.getCause();
            if (cause == null) {
                cause = throwable;
            }
            
            if (cause instanceof IllegalArgumentException) {
                return "参数错误: " + cause.getMessage();
            } else if (cause instanceof IOException) {
                return "IO错误: " + cause.getMessage();
            } else {
                return "未知错误: " + cause.getMessage();
            }
        }
        return result;
    });

5. 组合多个异步操作

5.1 thenCombine - 合并两个 Future 的结果

// thenCombine: 等待两个 Future 都完成,然后合并结果
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
    sleep(1000);
    return "Hello";
});

CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
    sleep(1500);
    return "World";
});

CompletableFuture<String> combined = future1.thenCombine(future2, (s1, s2) -> {
    return s1 + " " + s2;
});

System.out.println(combined.get());  // "Hello World"(约 1.5 秒后)

5.2 thenAcceptBoth - 消费两个 Future 的结果

// thenAcceptBoth: 类似 thenCombine,但没有返回值
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "User");
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> 25);

CompletableFuture<Void> result = future1.thenAcceptBoth(future2, (name, age) -> {
    System.out.println(name + " is " + age + " years old");
});

result.join();

5.3 runAfterBoth - 两个都完成后执行

// runAfterBoth: 两个 Future 都完成后执行操作,不关心结果
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "task1");
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "task2");

CompletableFuture<Void> result = future1.runAfterBoth(future2, () -> {
    System.out.println("两个任务都完成了");
});

5.4 applyToEither - 任一完成即处理

// applyToEither: 哪个先完成就用哪个的结果
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
    sleep(2000);
    return "slow result";
});

CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
    sleep(1000);
    return "fast result";
});

CompletableFuture<String> result = future1.applyToEither(future2, s -> {
    return "Winner: " + s;
});

System.out.println(result.get());  // "Winner: fast result"

5.5 acceptEither - 任一完成即消费

// acceptEither: 哪个先完成就消费哪个的结果
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
    sleep(2000);
    return "slow";
});

CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
    sleep(1000);
    return "fast";
});

future1.acceptEither(future2, result -> {
    System.out.println("First completed: " + result);
});

5.6 allOf - 等待所有完成

// allOf: 等待所有 Future 完成
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> "Result1");
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> "Result2");
CompletableFuture<String> f3 = CompletableFuture.supplyAsync(() -> "Result3");

// allOf 返回 CompletableFuture<Void>
CompletableFuture<Void> allFuture = CompletableFuture.allOf(f1, f2, f3);

// 等待所有完成后,收集结果
CompletableFuture<List<String>> resultsFuture = allFuture.thenApply(v -> {
    return Stream.of(f1, f2, f3)
        .map(CompletableFuture::join)
        .collect(Collectors.toList());
});

List<String> results = resultsFuture.get();
System.out.println(results);  // [Result1, Result2, Result3]

5.7 anyOf - 任一完成即返回

// anyOf: 任一 Future 完成就返回
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> {
    sleep(3000);
    return "slow";
});

CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> {
    sleep(1000);
    return "fast";
});

CompletableFuture<String> f3 = CompletableFuture.supplyAsync(() -> {
    sleep(2000);
    return "medium";
});

// anyOf 返回 CompletableFuture<Object>
CompletableFuture<Object> anyFuture = CompletableFuture.anyOf(f1, f2, f3);

System.out.println(anyFuture.get());  // "fast"

5.8 批量处理模式

// 批量处理多个异步任务
public <T> CompletableFuture<List<T>> sequence(List<CompletableFuture<T>> futures) {
    return CompletableFuture
        .allOf(futures.toArray(new CompletableFuture[0]))
        .thenApply(v -> futures.stream()
            .map(CompletableFuture::join)
            .collect(Collectors.toList()));
}

// 使用示例
List<CompletableFuture<String>> futures = Arrays.asList(
    CompletableFuture.supplyAsync(() -> "A"),
    CompletableFuture.supplyAsync(() -> "B"),
    CompletableFuture.supplyAsync(() -> "C")
);

CompletableFuture<List<String>> resultFuture = sequence(futures);
List<String> results = resultFuture.get();  // [A, B, C]

6. 结合线程池

6.1 默认线程池

// 默认使用 ForkJoinPool.commonPool()
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    System.out.println("线程: " + Thread.currentThread().getName());
    // 输出类似: ForkJoinPool.commonPool-worker-1
    return "result";
});

6.2 自定义线程池

// 创建自定义线程池
ExecutorService customExecutor = Executors.newFixedThreadPool(4, r -> {
    Thread t = new Thread(r);
    t.setName("custom-thread-" + t.getId());
    t.setDaemon(true);
    return t;
});

// 使用自定义线程池
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    System.out.println("线程: " + Thread.currentThread().getName());
    return "result";
}, customExecutor);

// 链式操作也可以指定线程池
CompletableFuture<String> result = future
    .thenApplyAsync(s -> s.toUpperCase(), customExecutor)
    .thenApplyAsync(s -> s + "!", customExecutor);

// 记得关闭线程池
customExecutor.shutdown();

6.3 不同类型的线程池

// 1. 固定大小线程池 - 适合 CPU 密集型任务
ExecutorService fixedPool = Executors.newFixedThreadPool(
    Runtime.getRuntime().availableProcessors());

// 2. 缓存线程池 - 适合短期异步任务
ExecutorService cachedPool = Executors.newCachedThreadPool();

// 3. 单线程池 - 保证顺序执行
ExecutorService singlePool = Executors.newSingleThreadExecutor();

// 4. 调度线程池 - 支持延迟和周期性任务
ScheduledExecutorService scheduledPool = Executors.newScheduledThreadPool(2);

// 5. 自定义 ThreadPoolExecutor
ThreadPoolExecutor customPool = new ThreadPoolExecutor(
    4,                      // 核心线程数
    8,                      // 最大线程数
    60L,                    // 空闲线程存活时间
    TimeUnit.SECONDS,       // 时间单位
    new LinkedBlockingQueue<>(100),  // 工作队列
    new ThreadPoolExecutor.CallerRunsPolicy()  // 拒绝策略
);
public class FlinkStyleExecutor {
    
    // 主线程执行器 - 保证线程安全
    private final Executor mainThreadExecutor;
    
    // IO 执行器 - 用于 IO 密集型操作
    private final ExecutorService ioExecutor;
    
    // 计算执行器 - 用于 CPU 密集型操作
    private final ExecutorService computeExecutor;
    
    public FlinkStyleExecutor() {
        // 主线程执行器(单线程,保证顺序)
        this.mainThreadExecutor = Executors.newSingleThreadExecutor(
            r -> new Thread(r, "main-thread"));
        
        // IO 执行器(多线程,处理阻塞 IO)
        this.ioExecutor = Executors.newCachedThreadPool(
            r -> new Thread(r, "io-thread-" + System.currentTimeMillis()));
        
        // 计算执行器(固定大小,处理 CPU 密集任务)
        this.computeExecutor = Executors.newFixedThreadPool(
            Runtime.getRuntime().availableProcessors(),
            r -> new Thread(r, "compute-thread-" + System.currentTimeMillis()));
    }
    
    public CompletableFuture<String> processData(String input) {
        return CompletableFuture
            // IO 操作:读取数据
            .supplyAsync(() -> {
                System.out.println("IO 读取, 线程: " + Thread.currentThread().getName());
                return readFromDatabase(input);
            }, ioExecutor)
            // 计算操作:处理数据
            .thenApplyAsync(data -> {
                System.out.println("计算处理, 线程: " + Thread.currentThread().getName());
                return processData(data);
            }, computeExecutor)
            // 主线程:更新状态
            .thenApplyAsync(result -> {
                System.out.println("更新状态, 线程: " + Thread.currentThread().getName());
                updateState(result);
                return result;
            }, mainThreadExecutor);
    }
    
    private String readFromDatabase(String input) { return "data-" + input; }
    private String processData(String data) { return "processed-" + data; }
    private void updateState(String result) { /* 更新状态 */ }
    
    public void shutdown() {
        ((ExecutorService) mainThreadExecutor).shutdown();
        ioExecutor.shutdown();
        computeExecutor.shutdown();
    }
}

6.5 线程池选择策略

// 根据任务类型选择合适的线程池
public class ExecutorSelector {
    
    private final ExecutorService cpuBoundExecutor;
    private final ExecutorService ioBoundExecutor;
    
    public ExecutorSelector() {
        // CPU 密集型:线程数 = CPU 核心数
        int cpuCores = Runtime.getRuntime().availableProcessors();
        this.cpuBoundExecutor = Executors.newFixedThreadPool(cpuCores);
        
        // IO 密集型:线程数 = CPU 核心数 * 2(或更多)
        this.ioBoundExecutor = Executors.newFixedThreadPool(cpuCores * 2);
    }
    
    // CPU 密集型任务
    public <T> CompletableFuture<T> submitCpuTask(Supplier<T> task) {
        return CompletableFuture.supplyAsync(task, cpuBoundExecutor);
    }
    
    // IO 密集型任务
    public <T> CompletableFuture<T> submitIoTask(Supplier<T> task) {
        return CompletableFuture.supplyAsync(task, ioBoundExecutor);
    }
}

7. 结合 Runnable 和 Callable

7.1 Runnable 转 CompletableFuture

// Runnable: 无返回值
Runnable task = () -> {
    System.out.println("执行任务");
    // 执行一些操作
};

// 方式 1: 使用 runAsync
CompletableFuture<Void> future1 = CompletableFuture.runAsync(task);

// 方式 2: 使用自定义线程池
ExecutorService executor = Executors.newFixedThreadPool(2);
CompletableFuture<Void> future2 = CompletableFuture.runAsync(task, executor);

// 方式 3: 手动包装
CompletableFuture<Void> future3 = new CompletableFuture<>();
executor.submit(() -> {
    try {
        task.run();
        future3.complete(null);
    } catch (Exception e) {
        future3.completeExceptionally(e);
    }
});

7.2 Callable 转 CompletableFuture

// Callable: 有返回值,可抛出异常
Callable<String> callable = () -> {
    Thread.sleep(1000);
    return "callable result";
};

// 方式 1: 使用 supplyAsync(推荐)
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
    try {
        return callable.call();
    } catch (Exception e) {
        throw new CompletionException(e);
    }
});

// 方式 2: 手动包装
CompletableFuture<String> future2 = new CompletableFuture<>();
ExecutorService executor = Executors.newSingleThreadExecutor();
executor.submit(() -> {
    try {
        String result = callable.call();
        future2.complete(result);
    } catch (Exception e) {
        future2.completeExceptionally(e);
    }
});

// 方式 3: 工具方法
public static <T> CompletableFuture<T> fromCallable(
        Callable<T> callable, Executor executor) {
    CompletableFuture<T> future = new CompletableFuture<>();
    executor.execute(() -> {
        try {
            future.complete(callable.call());
        } catch (Throwable t) {
            future.completeExceptionally(t);
        }
    });
    return future;
}

7.3 Future 转 CompletableFuture

// 传统 Future
ExecutorService executor = Executors.newSingleThreadExecutor();
Future<String> legacyFuture = executor.submit(() -> {
    Thread.sleep(1000);
    return "legacy result";
});

// 转换为 CompletableFuture(需要轮询或阻塞)
// 方式 1: 阻塞转换(不推荐,会阻塞线程)
CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
    try {
        return legacyFuture.get();
    } catch (Exception e) {
        throw new CompletionException(e);
    }
});

// 方式 2: 轮询转换
public static <T> CompletableFuture<T> fromFuture(
        Future<T> future, ScheduledExecutorService scheduler) {
    CompletableFuture<T> cf = new CompletableFuture<>();
    pollFuture(future, cf, scheduler);
    return cf;
}

private static <T> void pollFuture(
        Future<T> future, CompletableFuture<T> cf, 
        ScheduledExecutorService scheduler) {
    if (future.isDone()) {
        try {
            cf.complete(future.get());
        } catch (Exception e) {
            cf.completeExceptionally(e);
        }
    } else {
        scheduler.schedule(() -> pollFuture(future, cf, scheduler), 
            10, TimeUnit.MILLISECONDS);
    }
}

7.4 CompletableFuture 与 Runnable 链式组合

// 在异步链中插入 Runnable
CompletableFuture<String> future = CompletableFuture
    .supplyAsync(() -> "step1")
    .thenApply(s -> {
        System.out.println("处理: " + s);
        return s + "-step2";
    })
    // 插入一个不关心结果的操作
    .whenComplete((result, error) -> {
        // 这里可以执行 Runnable 风格的操作
        Runnable logTask = () -> System.out.println("日志记录: " + result);
        logTask.run();
    })
    .thenApply(s -> s + "-step3");

7.5 批量执行 Runnable

// 批量执行多个 Runnable 并等待全部完成
public CompletableFuture<Void> runAll(List<Runnable> tasks, Executor executor) {
    CompletableFuture<?>[] futures = tasks.stream()
        .map(task -> CompletableFuture.runAsync(task, executor))
        .toArray(CompletableFuture[]::new);
    
    return CompletableFuture.allOf(futures);
}

// 使用示例
List<Runnable> tasks = Arrays.asList(
    () -> System.out.println("Task 1"),
    () -> System.out.println("Task 2"),
    () -> System.out.println("Task 3")
);

ExecutorService executor = Executors.newFixedThreadPool(3);
runAll(tasks, executor).join();
executor.shutdown();

8. 高级模式

8.1 超时处理

// Java 9+ 方式
CompletableFuture<String> future = CompletableFuture
    .supplyAsync(() -> {
        sleep(5000);  // 模拟长时间操作
        return "result";
    })
    .orTimeout(2, TimeUnit.SECONDS)  // 2 秒超时
    .exceptionally(throwable -> {
        if (throwable.getCause() instanceof TimeoutException) {
            return "timeout fallback";
        }
        return "error fallback";
    });

// Java 8 兼容方式
public static <T> CompletableFuture<T> withTimeout(
        CompletableFuture<T> future, long timeout, TimeUnit unit) {
    
    CompletableFuture<T> timeoutFuture = new CompletableFuture<>();
    
    ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
    scheduler.schedule(() -> {
        timeoutFuture.completeExceptionally(
            new TimeoutException("Operation timed out after " + timeout + " " + unit));
    }, timeout, unit);
    
    return future.applyToEither(timeoutFuture, Function.identity());
}

// 使用示例
CompletableFuture<String> original = CompletableFuture.supplyAsync(() -> {
    sleep(5000);
    return "result";
});

CompletableFuture<String> withTimeout = withTimeout(original, 2, TimeUnit.SECONDS)
    .exceptionally(e -> "timeout fallback");

8.2 重试机制

// 带重试的异步操作
public static <T> CompletableFuture<T> retryAsync(
        Supplier<CompletableFuture<T>> supplier,
        int maxRetries,
        long delayMs,
        Predicate<Throwable> retryOn) {
    
    return supplier.get().handle((result, throwable) -> {
        if (throwable == null) {
            return CompletableFuture.completedFuture(result);
        }
        
        if (maxRetries > 0 && retryOn.test(throwable)) {
            System.out.println("重试中... 剩余次数: " + maxRetries);
            sleep(delayMs);
            return retryAsync(supplier, maxRetries - 1, delayMs, retryOn);
        }
        
        CompletableFuture<T> failed = new CompletableFuture<>();
        failed.completeExceptionally(throwable);
        return failed;
    }).thenCompose(Function.identity());
}

// 使用示例
AtomicInteger attempts = new AtomicInteger(0);

CompletableFuture<String> result = retryAsync(
    () -> CompletableFuture.supplyAsync(() -> {
        int attempt = attempts.incrementAndGet();
        System.out.println("尝试 #" + attempt);
        if (attempt < 3) {
            throw new RuntimeException("模拟失败");
        }
        return "成功";
    }),
    5,      // 最大重试次数
    1000,   // 重试间隔(毫秒)
    e -> e instanceof RuntimeException  // 重试条件
);

System.out.println(result.get());  // "成功"

8.3 指数退避重试

public static <T> CompletableFuture<T> retryWithBackoff(
        Supplier<CompletableFuture<T>> supplier,
        int maxRetries,
        long initialDelayMs,
        double multiplier) {
    
    return retryWithBackoffInternal(supplier, maxRetries, initialDelayMs, multiplier, 0);
}

private static <T> CompletableFuture<T> retryWithBackoffInternal(
        Supplier<CompletableFuture<T>> supplier,
        int maxRetries,
        long currentDelayMs,
        double multiplier,
        int currentAttempt) {
    
    return supplier.get().handle((result, throwable) -> {
        if (throwable == null) {
            return CompletableFuture.completedFuture(result);
        }
        
        if (currentAttempt < maxRetries) {
            System.out.printf("重试 #%d,等待 %dms%n", currentAttempt + 1, currentDelayMs);
            sleep(currentDelayMs);
            
            long nextDelay = (long) (currentDelayMs * multiplier);
            return retryWithBackoffInternal(
                supplier, maxRetries, nextDelay, multiplier, currentAttempt + 1);
        }
        
        CompletableFuture<T> failed = new CompletableFuture<>();
        failed.completeExceptionally(throwable);
        return failed;
    }).thenCompose(Function.identity());
}

// 使用示例
CompletableFuture<String> result = retryWithBackoff(
    () -> CompletableFuture.supplyAsync(() -> {
        throw new RuntimeException("总是失败");
    }),
    3,       // 最大重试 3 次
    100,     // 初始延迟 100ms
    2.0      // 每次延迟翻倍: 100ms -> 200ms -> 400ms
);

8.4 断路器模式

public class CircuitBreaker {
    
    private enum State { CLOSED, OPEN, HALF_OPEN }
    
    private final int failureThreshold;
    private final long resetTimeoutMs;
    
    private State state = State.CLOSED;
    private int failureCount = 0;
    private long lastFailureTime = 0;
    
    public CircuitBreaker(int failureThreshold, long resetTimeoutMs) {
        this.failureThreshold = failureThreshold;
        this.resetTimeoutMs = resetTimeoutMs;
    }
    
    public <T> CompletableFuture<T> execute(Supplier<CompletableFuture<T>> supplier) {
        if (state == State.OPEN) {
            if (System.currentTimeMillis() - lastFailureTime > resetTimeoutMs) {
                state = State.HALF_OPEN;
            } else {
                CompletableFuture<T> failed = new CompletableFuture<>();
                failed.completeExceptionally(
                    new RuntimeException("Circuit breaker is OPEN"));
                return failed;
            }
        }
        
        return supplier.get()
            .whenComplete((result, throwable) -> {
                if (throwable != null) {
                    recordFailure();
                } else {
                    recordSuccess();
                }
            });
    }
    
    private synchronized void recordFailure() {
        failureCount++;
        lastFailureTime = System.currentTimeMillis();
        if (failureCount >= failureThreshold) {
            state = State.OPEN;
            System.out.println("断路器打开!");
        }
    }
    
    private synchronized void recordSuccess() {
        failureCount = 0;
        state = State.CLOSED;
    }
}

// 使用示例
CircuitBreaker breaker = new CircuitBreaker(3, 5000);

for (int i = 0; i < 10; i++) {
    breaker.execute(() -> CompletableFuture.supplyAsync(() -> {
        throw new RuntimeException("服务不可用");
    })).exceptionally(e -> {
        System.out.println("错误: " + e.getMessage());
        return null;
    }).join();
}

8.5 限流器

public class RateLimiter {
    
    private final Semaphore semaphore;
    private final ExecutorService executor;
    
    public RateLimiter(int maxConcurrent) {
        this.semaphore = new Semaphore(maxConcurrent);
        this.executor = Executors.newCachedThreadPool();
    }
    
    public <T> CompletableFuture<T> execute(Supplier<T> task) {
        return CompletableFuture.supplyAsync(() -> {
            try {
                semaphore.acquire();
                try {
                    return task.get();
                } finally {
                    semaphore.release();
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new CompletionException(e);
            }
        }, executor);
    }
    
    public void shutdown() {
        executor.shutdown();
    }
}

// 使用示例
RateLimiter limiter = new RateLimiter(3);  // 最多 3 个并发

List<CompletableFuture<String>> futures = IntStream.range(0, 10)
    .mapToObj(i -> limiter.execute(() -> {
        System.out.println("执行任务 " + i + ",线程: " + 
            Thread.currentThread().getName());
        sleep(1000);
        return "Result " + i;
    }))
    .collect(Collectors.toList());

futures.forEach(f -> System.out.println(f.join()));
limiter.shutdown();

9.1 JobManagerRunner 结果处理模式

// Flink 源码中的典型模式
final CompletableFuture<CleanupJobState> cleanupJobStateFuture =
    jobManagerRunner
        .getResultFuture()  // 获取作业结果 Future
        .handleAsync(       // 异步处理结果和异常
            (jobManagerRunnerResult, throwable) -> {
                // 1. 验证状态一致性
                Preconditions.checkState(
                    jobManagerRunnerRegistry.isRegistered(jobId) &&
                    jobManagerRunnerRegistry.get(jobId) == jobManagerRunner,
                    "Job entry must be bound to JobManagerRunner lifetime.");

                // 2. 根据结果或异常进行不同处理
                if (jobManagerRunnerResult != null) {
                    // 正常完成,返回另一个 CompletableFuture
                    return handleJobManagerRunnerResult(
                        jobManagerRunnerResult, executionType);
                } else {
                    // 异常情况,返回已完成的 Future
                    return CompletableFuture.completedFuture(
                        jobManagerRunnerFailed(jobId, JobStatus.FAILED, throwable));
                }
            },
            getMainThreadExecutor())  // 在主线程执行器中执行
        .thenCompose(Function.identity());  // 扁平化嵌套的 Future
public class FlinkJobSubmissionDemo {
    
    private final ExecutorService executorService = 
        Executors.newFixedThreadPool(4);
    
    // 模拟作业提交
    public CompletableFuture<JobID> submitJob(JobGraph jobGraph) {
        // 步骤 1: 异步序列化 JobGraph
        CompletableFuture<Path> jobGraphFileFuture = 
            CompletableFuture.supplyAsync(() -> {
                System.out.println("序列化 JobGraph,线程: " + 
                    Thread.currentThread().getName());
                return serializeJobGraph(jobGraph);
            }, executorService);
        
        // 步骤 2: 准备请求体
        CompletableFuture<JobSubmitRequest> requestFuture = 
            jobGraphFileFuture.thenApply(jobGraphFile -> {
                System.out.println("准备请求体,线程: " + 
                    Thread.currentThread().getName());
                return prepareRequest(jobGraphFile, jobGraph);
            });
        
        // 步骤 3: 发送请求
        CompletableFuture<JobSubmitResponse> submissionFuture = 
            requestFuture.thenCompose(request -> {
                System.out.println("发送请求,线程: " + 
                    Thread.currentThread().getName());
                return sendRequest(request);
            });
        
        // 步骤 4: 提取 JobID
        return submissionFuture
            .thenApply(response -> {
                System.out.println("提交成功: " + response.getJobId());
                return response.getJobId();
            })
            .whenComplete((jobId, error) -> {
                if (error != null) {
                    System.err.println("提交失败: " + error.getMessage());
                }
            });
    }
    
    // 模拟方法
    private Path serializeJobGraph(JobGraph jobGraph) {
        sleep(500);
        return Path.of("/tmp/jobgraph-" + jobGraph.getJobID() + ".bin");
    }
    
    private JobSubmitRequest prepareRequest(Path jobGraphFile, JobGraph jobGraph) {
        return new JobSubmitRequest(jobGraphFile, jobGraph.getJobID());
    }
    
    private CompletableFuture<JobSubmitResponse> sendRequest(JobSubmitRequest request) {
        return CompletableFuture.supplyAsync(() -> {
            sleep(1000);  // 模拟网络延迟
            return new JobSubmitResponse(request.getJobId());
        }, executorService);
    }
    
    // 内部类
    static class JobGraph {
        private final JobID jobId = new JobID();
        public JobID getJobID() { return jobId; }
    }
    
    static class JobID {
        private final String id = UUID.randomUUID().toString();
        @Override public String toString() { return id; }
    }
    
    static class JobSubmitRequest {
        private final Path jobGraphFile;
        private final JobID jobId;
        
        JobSubmitRequest(Path jobGraphFile, JobID jobId) {
            this.jobGraphFile = jobGraphFile;
            this.jobId = jobId;
        }
        
        public JobID getJobId() { return jobId; }
    }
    
    static class JobSubmitResponse {
        private final JobID jobId;
        
        JobSubmitResponse(JobID jobId) { this.jobId = jobId; }
        public JobID getJobId() { return jobId; }
    }
    
    public void shutdown() {
        executorService.shutdown();
    }
}

9.3 带重试的请求发送

public class RetriableRequestSender {
    
    private final RestClient restClient;
    private final int maxRetries;
    private final Predicate<Throwable> retryPredicate;
    
    public RetriableRequestSender(RestClient restClient, int maxRetries) {
        this.restClient = restClient;
        this.maxRetries = maxRetries;
        this.retryPredicate = throwable -> 
            throwable instanceof IOException ||
            throwable instanceof TimeoutException;
    }
    
    public <T> CompletableFuture<T> sendRetriableRequest(
            String url, Object request, Class<T> responseType) {
        
        return retry(
            () -> restClient.sendRequest(url, request, responseType),
            maxRetries,
            retryPredicate);
    }
    
    private <T> CompletableFuture<T> retry(
            Supplier<CompletableFuture<T>> operation,
            int retriesLeft,
            Predicate<Throwable> shouldRetry) {
        
        return operation.get()
            .handle((result, throwable) -> {
                if (throwable == null) {
                    return CompletableFuture.completedFuture(result);
                }
                
                Throwable cause = throwable.getCause() != null ? 
                    throwable.getCause() : throwable;
                
                if (retriesLeft > 0 && shouldRetry.test(cause)) {
                    System.out.println("请求失败,重试中... 剩余次数: " + retriesLeft);
                    return retry(operation, retriesLeft - 1, shouldRetry);
                }
                
                CompletableFuture<T> failed = new CompletableFuture<>();
                failed.completeExceptionally(throwable);
                return failed;
            })
            .thenCompose(Function.identity());
    }
    
    // 模拟 RestClient
    static class RestClient {
        public <T> CompletableFuture<T> sendRequest(
                String url, Object request, Class<T> responseType) {
            return CompletableFuture.supplyAsync(() -> {
                // 模拟请求
                return null;
            });
        }
    }
}

10. 最佳实践

10.1 避免阻塞

// ❌ 错误:在异步链中阻塞
CompletableFuture<String> bad = CompletableFuture
    .supplyAsync(() -> "step1")
    .thenApply(s -> {
        // 不要在这里调用 get() 或 join()
        String other = anotherFuture.get();  // 阻塞!
        return s + other;
    });

// ✅ 正确:使用 thenCompose 组合
CompletableFuture<String> good = CompletableFuture
    .supplyAsync(() -> "step1")
    .thenCompose(s -> anotherFuture.thenApply(other -> s + other));

10.2 正确处理异常

// ❌ 错误:忽略异常
CompletableFuture<String> bad = CompletableFuture
    .supplyAsync(() -> {
        throw new RuntimeException("error");
    });
// 异常被吞掉,没有任何处理

// ✅ 正确:始终处理异常
CompletableFuture<String> good = CompletableFuture
    .supplyAsync(() -> {
        throw new RuntimeException("error");
    })
    .exceptionally(e -> {
        log.error("操作失败", e);
        return "fallback";
    });

10.3 使用合适的线程池

// ❌ 错误:所有操作使用默认线程池
CompletableFuture.supplyAsync(() -> blockingIoOperation());  // 可能阻塞公共线程池

// ✅ 正确:IO 操作使用专用线程池
ExecutorService ioExecutor = Executors.newCachedThreadPool();
CompletableFuture.supplyAsync(() -> blockingIoOperation(), ioExecutor);

10.4 避免过长的链式调用

// ❌ 难以阅读和调试
CompletableFuture<String> bad = cf
    .thenApply(...)
    .thenCompose(...)
    .thenApply(...)
    .thenCompose(...)
    .thenApply(...)
    .thenCompose(...)
    .thenApply(...);

// ✅ 拆分成有意义的方法
CompletableFuture<String> good = cf
    .thenCompose(this::validateInput)
    .thenCompose(this::processData)
    .thenCompose(this::saveResult);

10.5 资源清理

// ✅ 使用 whenComplete 确保资源清理
CompletableFuture<String> future = CompletableFuture
    .supplyAsync(() -> {
        Resource resource = acquireResource();
        try {
            return process(resource);
        } finally {
            // 这里的清理可能不会执行(如果是异步的)
        }
    })
    .whenComplete((result, error) -> {
        // 无论成功还是失败都会执行
        releaseResource();
    });

附录:常用方法速查表

方法 描述 返回类型
supplyAsync(Supplier) 异步执行有返回值的任务 CompletableFuture<T>
runAsync(Runnable) 异步执行无返回值的任务 CompletableFuture<Void>
thenApply(Function) 同步转换结果 CompletableFuture<U>
thenApplyAsync(Function) 异步转换结果 CompletableFuture<U>
thenCompose(Function) 扁平化嵌套 Future CompletableFuture<U>
thenAccept(Consumer) 消费结果 CompletableFuture<Void>
thenRun(Runnable) 执行后续操作 CompletableFuture<Void>
handle(BiFunction) 处理结果和异常 CompletableFuture<U>
exceptionally(Function) 异常恢复 CompletableFuture<T>
whenComplete(BiConsumer) 观察完成(不改变结果) CompletableFuture<T>
thenCombine(CF, BiFunction) 合并两个 Future CompletableFuture<V>
allOf(CF...) 等待所有完成 CompletableFuture<Void>
anyOf(CF...) 任一完成 CompletableFuture<Object>

参考资料


本文档基于 Flink 1.19 源码分析编写