Flink源码-Flink Client

2025/12/04

Tags: Flink

根据 Flink 源码的架构,Client Layer 主要负责作业提交、图转换、集群交互。本文规划了一个从易到难、循序渐进的学习路径。


作业提交入口

起点文件: StreamExecutionEnvironment.java

// FileSinkDemo.java 中的这行代码是整个 Client Layer 的入口
env.execute("Local FileSystem Debug");

// 学习路径:
// 1. StreamExecutionEnvironment.execute()
// 2. StreamExecutionEnvironment.executeAsync()
// 3. StreamExecutionEnvironment.getStreamGraph()
// 4. PipelineExecutor.execute()

学习方式:

// 创建调试 Demo
public class ClientLayerLearningDemo {
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        StreamExecutionEnvironment env = 
            StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
        
        env.setParallelism(2);
        
        // 简单的作业
        DataStream<String> source = env.fromElements("a", "b", "c");
        source.map(x -> x.toUpperCase()).print();
        
        // 在这里打断点,开始追踪 Client Layer 的执行流程
        env.execute("Client Layer Learning");
    }
}

重点方法:

  1. StreamExecutionEnvironment.execute()
  2. StreamExecutionEnvironment.executeAsync(StreamGraph)
  3. StreamExecutionEnvironment.getStreamGraph()
  4. PipelineExecutor.execute()

Transformation → StreamGraph 转换

核心文件:

学习目标:

学习示例:

public class StreamGraphLearningDemo {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 每个算子都会生成一个 Transformation
        DataStream<String> source = env.fromData("a", "b", "c");
        DataStream<String> mapped = source.map(String::toUpperCase);
        DataStream<String> filtered = mapped.filter(x -> !x.equals("B"));
        filtered.print();

        // 查看 Transformation 列表
        List<Transformation<?>> transformations = env.getTransformations();
        System.out.println("Transformations count: " + transformations.size());
        for (Transformation<?> t : transformations) {
            System.out.println("  - " + " [" + t + "]");
        }

        // 获取 StreamGraph(不清空 transformations)
        StreamGraph streamGraph = env.getStreamGraph(false);

        // 打印 StreamGraph 信息
        System.out.println("streamGraph Job Name: " + streamGraph.getJobName());
        System.out.println("StreamNodes: " + streamGraph.getStreamNodes());

        System.out.println("StreamEdges: ");
        streamGraph
                .getStreamNodes()
                .forEach(
                        node -> {
                            System.out.println(
                                    " - node: "
                                            + node.getOperatorName()
                                            + " StreamEdge: "
                                            + streamGraph.getStreamEdges(node.getId()));
                        });
    }
}



Transformations count: 3
  -  [OneInputTransformation{id=2, name='Map', outputType=String, 
      parallelism=12}]
  -  [OneInputTransformation{id=3, name='Filter', outputType=String, 
      parallelism=12}]
  -  [LegacySinkTransformation{id=4, name='Print to Std. Out', 
      outputType=String, parallelism=12}]

streamGraph Job Name: Flink Streaming Job
StreamNodes: [Source: Collection Source-1, Map-2, Filter-3, 
             Sink: Print to Std. Out-4]

StreamEdges: 
 - node: Source: Collection Source StreamEdge: [(Source: Collection Source-1 -> Map-2, 
   typeNumber=0, outputPartitioner=REBALANCE, exchangeMode=UNDEFINED, 
   bufferTimeout=100, outputTag=null, uniqueId=0)]
 - node: Map StreamEdge: [(Map-2 -> Filter-3, typeNumber=0, outputPartitioner=FORWARD, 
   exchangeMode=UNDEFINED, bufferTimeout=100, outputTag=null, uniqueId=0)]
 - node: Filter StreamEdge: [(Filter-3 -> Sink: Print to Std. Out-4, typeNumber=0, 
   outputPartitioner=FORWARD, exchangeMode=UNDEFINED, bufferTimeout=100, 
   outputTag=null, uniqueId=0)]
 - node: Sink: Print to Std. Out StreamEdge: []

关键类和方法:

// 1. Transformation 体系(逻辑算子)
org.apache.flink.api.dag.Transformation
  ├── SourceTransformation          // Source 算子
  ├── OneInputTransformation        // map, filter 等单输入算子
  ├── TwoInputTransformation        // join, coGroup 等双输入算子
  ├── SinkTransformation            // Sink 算子
  └── PartitionTransformation       // shuffle, rebalance 等

// 2. StreamGraph 生成
StreamGraphGenerator.generate()
  └── StreamGraphGenerator.transformations()
      └── StreamGraphGenerator.transform(Transformation)
          └── 根据不同的 Transformation 类型生成 StreamNode 和 StreamEdge

重点方法:


StreamGraph → JobGraph 转换

核心文件:

学习目标:

为什么需要转换?

StreamGraph 和 JobGraph 代表了 Flink 作业的两个不同抽象层次:

对比维度 StreamGraph JobGraph
抽象层次 逻辑执行图 优化后的逻辑图
节点单位 StreamNode(单个算子) JobVertex(算子链)
优化程度 未优化,1:1 映射用户代码 已优化,包含算子链
用途 表达用户意图 提交给集群执行

注:真正的物理执行图是 ExecutionGraph, 它在 JobManager 端生成,将 JobVertex 按并行度展开为多个 ExecutionVertex。

转换的核心目的是性能优化,主要体现在 Operator Chain(算子链) 机制上:

转换前(StreamGraph):
Source → Map → Filter → KeyBy → Reduce → Sink
  ↓       ↓      ↓        ↓        ↓       ↓
 6个独立的 StreamNode

转换后(JobGraph):
[Source → Map → Filter] → [Reduce → Sink]
         ↓                       ↓
   1个 JobVertex(Chain)   1个 JobVertex(Chain)
   
注:KeyBy 会导致 shuffle,断开 Chain

Operator Chain 的好处:

  1. 减少线程切换:Chain 内的算子在同一个线程中执行
  2. 减少序列化开销:Chain 内数据以对象形式传递,无需序列化
  3. 减少网络传输:Chain 内无网络 I/O
  4. 减少 Task 数量:更少的 Task 意味着更少的调度开销

转换的具体工作

转换过程由 StreamingJobGraphGenerator 完成:

1. 构建 Operator Chain — 判断相邻算子是否可以合并

2. StreamNode → JobVertex — 多个可 Chain 的 StreamNode 合并为一个 JobVertex,设置并行度、资源需求

3. StreamEdge → JobEdge — Chain 之间的边转换为 JobEdge,配置数据分发策略

4. 序列化算子逻辑 — 将 UDF 序列化打包成 StreamConfig 存入 JobVertex

这种两层图结构体现了 关注点分离 的思想:

学习示例:

public class JobGraphLearningDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env =
                StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(2);

        // 构建一个可以观察 Operator Chain 的作业
        DataStream<String> source = env.fromData("a", "b", "c");

        // 这些算子会被 Chain 在一起(如果没有 shuffle)
        DataStream<String> result = source
                .map(String::toUpperCase)      // 会被 Chain
                .filter(x -> !x.equals("B"))    // 会被 Chain
                .keyBy(x -> x)                  // 这里会断开 Chain(有 shuffle)
                .map(x -> x + "!");                       // 会被 Chain

        result.print();
        // 获取 StreamGraph
        StreamGraph streamGraph = env.getStreamGraph();
        System.out.println("StreamNodes count: " + streamGraph.getStreamNodes().size());

        // 打印 StreamGraph 信息
        System.out.println("StreamNodes: " + streamGraph.getStreamNodes());

        System.out.println("StreamEdges: ");
        streamGraph
                .getStreamNodes()
                .forEach(
                        node -> {
                            System.out.println(
                                    " - node: "
                                            + node.getOperatorName()
                                            + " StreamEdge: "
                                            + streamGraph.getStreamEdges(node.getId()));
                        });

        // 转换为 JobGraph
        JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(streamGraph);

        // 打印 JobGraph 信息
        System.out.println("\nJobGraph Info:");
        System.out.println("Job ID: " + jobGraph.getJobID());
        System.out.println("Job Name: " + jobGraph.getName());
        System.out.println("JobVertices count: " + jobGraph.getNumberOfVertices());

        // 查看每个 JobVertex(被 Chain 后的算子)
        for (JobVertex vertex : jobGraph.getVertices()) {
            System.out.println("\nJobVertex: " + vertex.getName());
            System.out.println("  ID: " + vertex.getID());
            System.out.println("  Parallelism: " + vertex.getParallelism());
            System.out.println("  Operator IDs: " + vertex.getOperatorIDs());
            System.out.println("  Inputs: " + vertex.getInputs().size());
        }
    }
}

StreamNodes count: 5
StreamNodes: [Source: Collection Source-1, Map-2, Filter-3, Map-5, Sink: Print to Std. Out-6]
StreamEdges: 
 - node: Source: Collection Source StreamEdge: [(Source: Collection Source-1 -> Map-2, 
   typeNumber=0, outputPartitioner=REBALANCE, exchangeMode=UNDEFINED, 
   bufferTimeout=100, outputTag=null, uniqueId=0)]
 - node: Map StreamEdge: [(Map-2 -> Filter-3, typeNumber=0, outputPartitioner=FORWARD, 
   exchangeMode=UNDEFINED, bufferTimeout=100, outputTag=null, uniqueId=0)]
 - node: Filter StreamEdge: [(Filter-3 -> Map-5, typeNumber=0, outputPartitioner=HASH, 
   exchangeMode=UNDEFINED, bufferTimeout=100, outputTag=null, uniqueId=0)]
 - node: Map StreamEdge: [(Map-5 -> Sink: Print to Std. Out-6, typeNumber=0, 
   outputPartitioner=FORWARD, exchangeMode=UNDEFINED, bufferTimeout=100, 
   outputTag=null, uniqueId=0)]
 - node: Sink: Print to Std. Out StreamEdge: []

JobGraph Info:
Job ID: 1ad251732c79901f3f421c5d66b325e9
Job Name: Flink Streaming Job
JobVertices count: 3

JobVertex: Map -> Sink: Print to Std. Out
  ID: b5c8d46f3e7b141acf271f12622e752b
  Parallelism: 2
  Operator IDs: [org.apache.flink.runtime.OperatorIDPair@2c35e847, 
                 org.apache.flink.runtime.OperatorIDPair@7bd4937b]
  Inputs: 1

JobVertex: Map -> Filter
  ID: 20ba6b65f97481d5570070de90e4e791
  Parallelism: 2
  Operator IDs: [org.apache.flink.runtime.OperatorIDPair@21e360a, 
                 org.apache.flink.runtime.OperatorIDPair@5ba3f27a]
  Inputs: 1

JobVertex: Source: Collection Source
  ID: bc764cd8ddf7a0cff126f51c16239658
  Parallelism: 1
  Operator IDs: [org.apache.flink.runtime.OperatorIDPair@58d75e99]
  Inputs: 0

从结果上来看,在构建 DataStream node 时,一共有6个节点,从结果来看,node 只有5个,为什么 keyBy 算子"消失"了?

核心原因: keyBy 不是一个真正的算子(Operator),它只是一个逻辑分区操作,因此不会生成独立的 StreamNode。

从源码可以看到,keyBy 方法只是:

public <K> KeyedStream<T, K> keyBy(KeySelector<T, K> key) {
    Preconditions.checkNotNull(key);
    return new KeyedStream<>(this, clean(key));  // 只是创建了一个 KeyedStream
}

KeyedStream 的构造函数做的事情是:

public KeyedStream(
        DataStream<T> dataStream,
        KeySelector<T, KEY> keySelector,
        TypeInformation<KEY> keyType) {
    this(
        dataStream,
        new PartitionTransformation<>(  // 创建一个分区转换
            dataStream.getTransformation(),
            new KeyGroupStreamPartitioner<>(  // 使用 Hash 分区器
                keySelector,
                StreamGraphGenerator.DEFAULT_LOWER_BOUND_MAX_PARALLELISM)),
        keySelector,
        keyType);
}

关键点:

keyBy 的作用体现在 StreamEdge 的 outputPartitioner 上!

StreamEdges: 
 - node: Filter StreamEdge: [(Filter-3 -> Map-5, 
     typeNumber=0, 
     outputPartitioner=HASH,  ⬅️ 这就是 keyBy 的作用!
     exchangeMode=UNDEFINED, 
     bufferTimeout=100, 
     outputTag=null, 
     uniqueId=0)]

对比其他边:

Source -> Map:    outputPartitioner=REBALANCE  (默认分区)
Map -> Filter:    outputPartitioner=FORWARD    (可以 Chain)
Filter -> Map:    outputPartitioner=HASH       ⬅️ keyBy 设置的!
Map -> Sink:      outputPartitioner=FORWARD    (可以 Chain)

为什么这样设计?

keyBy 不处理数据

// keyBy 不是一个数据转换操作
DataStream<String> stream = ...;

// map 会转换数据:String -> String
stream.map(String::toUpperCase)  // 有实际的计算逻辑

// filter 会过滤数据:保留或丢弃
stream.filter(x -> !x.equals("B"))  // 有实际的计算逻辑

// keyBy 不转换数据,只是改变数据的分发方式
stream.keyBy(x -> x)  // 没有计算逻辑,只是设置分区策略

keyBy 是逻辑操作

// keyBy 只是告诉 Flink:
// "接下来的算子需要按照 key 分组处理数据"

stream
    .keyBy(x -> x)           // 设置分区策略
    .map(x -> x + "!")       // 这个 map 才是真正的算子
    .sum(1)                  // 这个 sum 才是真正的算子

避免不必要的网络传输;如果 keyBy 生成一个独立的算子,会导致:

❌ 不好的设计(如果 keyBy 是算子):
Source -> Map -> Filter -> KeyBy算子 -> Map -> Sink
                              ↑
                         不必要的算子
                         
✅ 好的设计(keyBy 只是分区策略):
Source -> Map -> Filter --[HASH分区]--> Map -> Sink
                              ↑
                         只是改变数据分发方式

keyBy 在不同阶段的体现

阶段 keyBy 的体现 说明
用户代码 .keyBy(x -> x) 用户调用 keyBy 方法
StreamGraph StreamEdge.partitioner = KeyGroupStreamPartitioner 设置边的分区器为 HASH
JobGraph 打断 Operator Chain 因为分区策略改变,不能 Chain
ExecutionGraph 创建 IntermediateResultPartition 创建中间结果分区
物理执行 数据通过网络 Shuffle 按照 key 的 hash 值分发到不同的 Task

类比理解

keyBy 就像是道路上的"分流指示牌":
- 它不是一个收费站(算子)
- 它只是告诉车辆(数据)应该走哪条路(分区)
- 车辆本身没有变化,只是行驶路线变了

StreamGraph → JobGraph 关键概念:

// Operator Chain 的条件(满足以下所有条件才能 Chain):
1. 下游算子只有一个输入
2. 上游算子只有一个输出
3. 两个算子的并行度相同
4. 没有 shuffle(即 ForwardPartitioner)
5. 两个算子在同一个 SlotSharingGroup
6. Chain 策略允许(ChainingStrategy)

// StreamingJobGraphGenerator 的核心方法:
createJobGraph(StreamGraph)
  └── createJobVertex()              // 创建 JobVertex
      └── createChain()              // 构建 Operator Chain
          └── isChainable()          // 判断是否可以 Chain

重点方法:


PipelineExecutor 体系

核心文件:

学习目标:

PipelineExecutor 体系:

PipelineExecutor
  ├── LocalExecutor
  ├── RemoteExecutor
  ├── AbstractJobClusterExecutor
  │     ├── YarnJobClusterExecutor
  │     └── KubernetesJobClusterExecutor
  └── AbstractSessionClusterExecutor
        ├── YarnSessionClusterExecutor
        └── KubernetesSessionClusterExecutor

学习示例:

public class PipelineExecutorLearningDemo {
    public static void main(String[] args) throws Exception {
        // 1. 本地执行器
        Configuration localConf = new Configuration();
        localConf.setString(DeploymentOptions.TARGET, "local");
        
        StreamExecutionEnvironment localEnv = 
            StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(localConf);
        
        // 查看使用的执行器
        PipelineExecutor localExecutor = localEnv.getPipelineExecutor();
        System.out.println("Local Executor: " + localExecutor.getClass().getName());
        
        // 2. 远程执行器(连接到已有集群)
        Configuration remoteConf = new Configuration();
        remoteConf.setString(DeploymentOptions.TARGET, "remote");
        remoteConf.setString(JobManagerOptions.ADDRESS, "localhost");
        remoteConf.setInteger(JobManagerOptions.PORT, 8081);
        
        StreamExecutionEnvironment remoteEnv = 
            StreamExecutionEnvironment.getExecutionEnvironment(remoteConf);
        
        PipelineExecutor remoteExecutor = remoteEnv.getPipelineExecutor();
        System.out.println("Remote Executor: " + remoteExecutor.getClass().getName());
        
        // 3. 查看所有可用的执行器工厂
        PipelineExecutorServiceLoader serviceLoader = 
            new DefaultExecutorServiceLoader();
        
        System.out.println("\nAvailable Executor Factories:");
        // 注意:这需要访问内部 API,仅用于学习
    }
}

关键类:

// 1. 执行器接口
PipelineExecutor.execute(Pipeline, Configuration, ClassLoader)
  └── 返回 CompletableFuture<JobClient>

// 2. LocalExecutor(本地执行)
LocalExecutor.execute()
  └── PerJobMiniClusterFactory.submitJob()
      └── MiniCluster.submitJob()

// 3. RemoteExecutor(远程执行)
RemoteExecutor.execute()
  └── RestClusterClient.submitJob()
      └── 通过 REST API 提交到远程集群

ClusterClient 体系

核心文件:

学习目标:

学习示例:

public class ClusterClientLearningDemo {
    public static void main(String[] args) throws Exception {
        // 1. 创建 RestClusterClient(连接到运行中的集群)
        Configuration conf = new Configuration();
        conf.setString(RestOptions.ADDRESS, "localhost");
        conf.setInteger(RestOptions.PORT, 8081);
        
        try (RestClusterClient<StandaloneClusterId> client = 
                new RestClusterClient<>(conf, StandaloneClusterId.getInstance())) {
            
            // 2. 获取集群信息
            System.out.println("Cluster ID: " + client.getClusterId());
            
            // 3. 列出正在运行的作业
            Collection<JobStatusMessage> jobs = client.listJobs().get();
            System.out.println("\nRunning Jobs:");
            for (JobStatusMessage job : jobs) {
                System.out.println("  - " + job.getJobName() + 
                                 " [" + job.getJobState() + "]");
            }
            
            // 4. 提交作业(需要 JobGraph)
            // JobGraph jobGraph = ...;
            // JobID jobId = client.submitJob(jobGraph).get();
            
            // 5. 取消作业
            // client.cancel(jobId).get();
            
            // 6. 触发 Savepoint
            // String savepointPath = client.triggerSavepoint(jobId, null).get();
        }
    }
}

CLI 命令行工具

核心文件:

学习目标:

学习方式:

# 1. 查看 flink run 命令的实现
# CliFrontend.run() 方法

# 2. 理解参数解析
# CliFrontendParser.parse()

# 3. 理解 JAR 包加载
# PackagedProgram.newBuilder().build()

PackagedProgram(JAR 包加载)

核心文件:

学习目标:


Application Mode

核心文件:


Deployment 抽象

核心文件:


学习路线总结

FileSinkDemo.java(起点)
    ↓
StreamExecutionEnvironment.execute()
    ↓
getStreamGraph() ─────────────────> StreamGraphGenerator
    ↓                               (Transformation → StreamGraph)
executeAsync()
    ↓
StreamingJobGraphGenerator ────────> (StreamGraph → JobGraph)
    ↓
PipelineExecutor.execute()
    ├── LocalExecutor  ────────────> MiniCluster.submitJob()
    └── RemoteExecutor ────────────> RestClusterClient.submitJob()

调试实践步骤

修改 FileSinkDemo 添加调试代码

public class FileSinkDemo {
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        StreamExecutionEnvironment env = 
            StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
        
        env.setParallelism(1);
        env.enableCheckpointing(5000);
        
        DataStream<String> source = env.fromData("hello", "world", "flink");
        
        FileSink<String> sink = FileSink
            .forRowFormat(new Path("file:///tmp/flink-debug-output"), 
                         new SimpleStringEncoder<String>("UTF-8"))
            .build();
        
        source.sinkTo(sink);
        
        // 1. 查看 Transformations
        System.out.println("\n=== Transformations ===");
        List<Transformation<?>> transformations = env.getTransformations();
        for (Transformation<?> t : transformations) {
            System.out.println("  - " + t.getName() + 
                             " [" + t.getClass().getSimpleName() + "]");
        }
        
        // 2. 查看 StreamGraph
        System.out.println("\n=== StreamGraph ===");
        StreamGraph streamGraph = env.getStreamGraph(false);
        System.out.println("Job Name: " + streamGraph.getJobName());
        System.out.println("StreamNodes: " + streamGraph.getStreamNodes().size());
        
        // 3. 查看 JobGraph
        System.out.println("\n=== JobGraph ===");
        JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(streamGraph);
        System.out.println("JobVertices: " + jobGraph.getNumberOfVertices());
        for (JobVertex vertex : jobGraph.getVertices()) {
            System.out.println("  - " + vertex.getName() + 
                             " (parallelism=" + vertex.getParallelism() + ")");
        }
        
        // 4. 查看 PipelineExecutor
        System.out.println("\n=== PipelineExecutor ===");
        PipelineExecutor executor = env.getPipelineExecutor();
        System.out.println("Executor: " + executor.getClass().getName());
        
        // ===== 执行作业 =====
        env.execute("Local FileSystem Debug");
    }
}

设置断点开始调试

重点方法:

  1. StreamExecutionEnvironment.execute()
  2. StreamExecutionEnvironment.getStreamGraph()
  3. StreamGraphGenerator.generate()
  4. StreamingJobGraphGenerator.createJobGraph()
  5. LocalExecutor.execute()

阅读源码理解核心流程

按照以下顺序阅读:

  1. StreamExecutionEnvironmentStreamGraphStreamGraphGenerator
  2. StreamingJobGraphGeneratorJobGraph → Operator Chain
  3. PipelineExecutorLocalExecutorMiniCluster
  4. ClusterClientRestClusterClient → REST API
  5. CliFrontendPackagedProgram → CLI 工具

实战:手动启动集群并提交作业

在理解了 Client Layer 的核心流程后,我们可以通过手动启动集群的方式,深入理解 ClusterClient 与集群的交互过程。

启动本地 Standalone 集群

通过编程方式启动 JobManager 和 TaskManager,完整体验集群启动流程:

public class StartClusterDemo {
    public static void main(String[] args) throws Exception {
        Configuration config = createConfiguration();
        
        // 启动 JobManager
        StandaloneSessionClusterEntrypoint jobManager = startJobManager(config);
        Thread.sleep(3000);
        
        // 启动 TaskManager
        TaskManagerRunner taskManager = startTaskManager(config);
        Thread.sleep(3000);
        
        System.out.println("Access WebUI at: http://localhost:8081");
        Thread.currentThread().join();
    }
    
    private static Configuration createConfiguration() {
        Configuration config = new Configuration();
        config.set(JobManagerOptions.ADDRESS, "localhost");
        config.set(JobManagerOptions.PORT, 6123);
        config.set(RestOptions.PORT, 8081);
        config.set(TaskManagerOptions.NUM_TASK_SLOTS, 4);
        
        // 自动调整本地执行配置
        TaskExecutorResourceUtils.adjustForLocalExecution(config);
        return config;
    }
    
    private static StandaloneSessionClusterEntrypoint startJobManager(Configuration config) 
            throws Exception {
        StandaloneSessionClusterEntrypoint entrypoint = 
            new StandaloneSessionClusterEntrypoint(config);
        
        Thread jmThread = new Thread(() -> {
            try {
                ClusterEntrypoint.runClusterEntrypoint(entrypoint);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }, "JobManager-Thread");
        jmThread.setDaemon(true);
        jmThread.start();
        
        return entrypoint;
    }
    
    private static TaskManagerRunner startTaskManager(Configuration config) throws Exception {
        TaskManagerRunner taskManagerRunner = new TaskManagerRunner(
            config,
            PluginUtils.createPluginManagerFromRootFolder(config),
            TaskManagerRunner::createTaskExecutorService);
        taskManagerRunner.start();
        return taskManagerRunner;
    }
}

通过 RestClusterClient 提交作业

使用 RestClusterClient 连接到集群并提交作业,这是理解 Client 与 JobManager 交互的关键:

public class SubmitJobToClusterDemo {
    public static void main(String[] args) throws Exception {
        Configuration config = new Configuration();
        config.set(RestOptions.PORT, 8081);
        config.set(RestOptions.ADDRESS, "localhost");
        
        try (ClusterClient<String> clusterClient = 
                new RestClusterClient<>(config, "standalone-cluster")) {
            
            System.out.println("Connected: " + clusterClient.getWebInterfaceURL());
            
            // 构建 JobGraph
            JobGraph jobGraph = createJobGraph(config);
            
            // 提交作业
            JobID jobId = clusterClient.submitJob(jobGraph).get();
            System.out.println("Submitted job: " + jobId);
            
            // 等待作业完成
            clusterClient.requestJobResult(jobId).get();
        }
    }
    
    private static JobGraph createJobGraph(Configuration config) {
        StreamExecutionEnvironment env = 
            StreamExecutionEnvironment.getExecutionEnvironment(config);
        env.setParallelism(2);
        
        DataStream<String> source = env.fromData("a", "b", "c", "d", "e");
        source.map(String::toUpperCase).print();
        
        StreamGraph streamGraph = env.getStreamGraph();
        streamGraph.setJobName("Standalone Cluster Demo");
        return streamGraph.getJobGraph();
    }
}

通过这个实战,可以清晰地观察到:

  1. 集群启动过程:JobManager 和 TaskManager 的启动顺序和依赖关系
  2. REST API 交互:ClusterClient 如何通过 REST API 与 JobManager 通信
  3. 作业提交流程:从 StreamGraph → JobGraph → 提交到集群的完整链路

Client Layer 的核心职责

Client Layer 的三大核心职责:

  1. 图转换(Graph Translation) Transformation → StreamGraph → JobGraph

  2. 作业提交(Job Submission) PipelineExecutor → ClusterClient → REST API

  3. 集群交互(Cluster Interaction) 提交作业、取消作业、触发 Savepoint、查询状态