Flink Client Layer 学习指南
根据 Flink 源码的架构,Client Layer 主要负责作业提交、图转换、集群交互。本文规划了一个从易到难、循序渐进的学习路径。
作业提交入口
起点文件: StreamExecutionEnvironment.java
// FileSinkDemo.java 中的这行代码是整个 Client Layer 的入口
env.execute("Local FileSystem Debug");
// 学习路径:
// 1. StreamExecutionEnvironment.execute()
// 2. StreamExecutionEnvironment.executeAsync()
// 3. StreamExecutionEnvironment.getStreamGraph()
// 4. PipelineExecutor.execute()
学习方式:
// 创建调试 Demo
public class ClientLayerLearningDemo {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
StreamExecutionEnvironment env =
StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
env.setParallelism(2);
// 简单的作业
DataStream<String> source = env.fromElements("a", "b", "c");
source.map(x -> x.toUpperCase()).print();
// 在这里打断点,开始追踪 Client Layer 的执行流程
env.execute("Client Layer Learning");
}
}
重点方法:
StreamExecutionEnvironment.execute()StreamExecutionEnvironment.executeAsync(StreamGraph)StreamExecutionEnvironment.getStreamGraph()PipelineExecutor.execute()
Transformation → StreamGraph 转换
核心文件:
StreamGraph.javaStreamGraphGenerator.java
学习目标:
- 理解
Transformation是什么(逻辑算子) - 理解
StreamGraph是什么(逻辑执行图) - 理解如何从 Transformation 生成 StreamGraph
学习示例:
public class StreamGraphLearningDemo {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 每个算子都会生成一个 Transformation
DataStream<String> source = env.fromData("a", "b", "c");
DataStream<String> mapped = source.map(String::toUpperCase);
DataStream<String> filtered = mapped.filter(x -> !x.equals("B"));
filtered.print();
// 查看 Transformation 列表
List<Transformation<?>> transformations = env.getTransformations();
System.out.println("Transformations count: " + transformations.size());
for (Transformation<?> t : transformations) {
System.out.println(" - " + " [" + t + "]");
}
// 获取 StreamGraph(不清空 transformations)
StreamGraph streamGraph = env.getStreamGraph(false);
// 打印 StreamGraph 信息
System.out.println("streamGraph Job Name: " + streamGraph.getJobName());
System.out.println("StreamNodes: " + streamGraph.getStreamNodes());
System.out.println("StreamEdges: ");
streamGraph
.getStreamNodes()
.forEach(
node -> {
System.out.println(
" - node: "
+ node.getOperatorName()
+ " StreamEdge: "
+ streamGraph.getStreamEdges(node.getId()));
});
}
}
Transformations count: 3
- [OneInputTransformation{id=2, name='Map', outputType=String,
parallelism=12}]
- [OneInputTransformation{id=3, name='Filter', outputType=String,
parallelism=12}]
- [LegacySinkTransformation{id=4, name='Print to Std. Out',
outputType=String, parallelism=12}]
streamGraph Job Name: Flink Streaming Job
StreamNodes: [Source: Collection Source-1, Map-2, Filter-3,
Sink: Print to Std. Out-4]
StreamEdges:
- node: Source: Collection Source StreamEdge: [(Source: Collection Source-1 -> Map-2,
typeNumber=0, outputPartitioner=REBALANCE, exchangeMode=UNDEFINED,
bufferTimeout=100, outputTag=null, uniqueId=0)]
- node: Map StreamEdge: [(Map-2 -> Filter-3, typeNumber=0, outputPartitioner=FORWARD,
exchangeMode=UNDEFINED, bufferTimeout=100, outputTag=null, uniqueId=0)]
- node: Filter StreamEdge: [(Filter-3 -> Sink: Print to Std. Out-4, typeNumber=0,
outputPartitioner=FORWARD, exchangeMode=UNDEFINED, bufferTimeout=100,
outputTag=null, uniqueId=0)]
- node: Sink: Print to Std. Out StreamEdge: []
关键类和方法:
// 1. Transformation 体系(逻辑算子)
org.apache.flink.api.dag.Transformation
├── SourceTransformation // Source 算子
├── OneInputTransformation // map, filter 等单输入算子
├── TwoInputTransformation // join, coGroup 等双输入算子
├── SinkTransformation // Sink 算子
└── PartitionTransformation // shuffle, rebalance 等
// 2. StreamGraph 生成
StreamGraphGenerator.generate()
└── StreamGraphGenerator.transformations()
└── StreamGraphGenerator.transform(Transformation)
└── 根据不同的 Transformation 类型生成 StreamNode 和 StreamEdge
重点方法:
StreamGraphGenerator.generate()— 生成 StreamGraph 入口StreamGraphGenerator.transform()— 转换每个 TransformationStreamGraph.addNode()/StreamGraph.addEdge()— 添加节点和边
StreamGraph → JobGraph 转换
核心文件:
StreamingJobGraphGenerator.javaJobGraph.java
学习目标:
- 理解
JobGraph是什么(优化后的逻辑图,包含算子链) - 理解 Operator Chain 优化(算子链)
- 理解 StreamNode → JobVertex 的转换
- 理解 StreamEdge → JobEdge 的转换
为什么需要转换?
StreamGraph 和 JobGraph 代表了 Flink 作业的两个不同抽象层次:
| 对比维度 | StreamGraph | JobGraph |
|---|---|---|
| 抽象层次 | 逻辑执行图 | 优化后的逻辑图 |
| 节点单位 | StreamNode(单个算子) | JobVertex(算子链) |
| 优化程度 | 未优化,1:1 映射用户代码 | 已优化,包含算子链 |
| 用途 | 表达用户意图 | 提交给集群执行 |
注:真正的物理执行图是 ExecutionGraph, 它在 JobManager 端生成,将 JobVertex 按并行度展开为多个 ExecutionVertex。
转换的核心目的是性能优化,主要体现在 Operator Chain(算子链) 机制上:
转换前(StreamGraph):
Source → Map → Filter → KeyBy → Reduce → Sink
↓ ↓ ↓ ↓ ↓ ↓
6个独立的 StreamNode
转换后(JobGraph):
[Source → Map → Filter] → [Reduce → Sink]
↓ ↓
1个 JobVertex(Chain) 1个 JobVertex(Chain)
注:KeyBy 会导致 shuffle,断开 Chain
Operator Chain 的好处:
- 减少线程切换:Chain 内的算子在同一个线程中执行
- 减少序列化开销:Chain 内数据以对象形式传递,无需序列化
- 减少网络传输:Chain 内无网络 I/O
- 减少 Task 数量:更少的 Task 意味着更少的调度开销
转换的具体工作
转换过程由 StreamingJobGraphGenerator 完成:
1. 构建 Operator Chain — 判断相邻算子是否可以合并
2. StreamNode → JobVertex — 多个可 Chain 的 StreamNode 合并为一个 JobVertex,设置并行度、资源需求
3. StreamEdge → JobEdge — Chain 之间的边转换为 JobEdge,配置数据分发策略
4. 序列化算子逻辑 — 将 UDF 序列化打包成 StreamConfig 存入 JobVertex
这种两层图结构体现了 关注点分离 的思想:
- StreamGraph 关注表达:忠实反映用户的业务逻辑,便于理解和调试
- JobGraph 关注执行:包含所有优化,是真正提交给集群的执行计划
学习示例:
public class JobGraphLearningDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
// 构建一个可以观察 Operator Chain 的作业
DataStream<String> source = env.fromData("a", "b", "c");
// 这些算子会被 Chain 在一起(如果没有 shuffle)
DataStream<String> result = source
.map(String::toUpperCase) // 会被 Chain
.filter(x -> !x.equals("B")) // 会被 Chain
.keyBy(x -> x) // 这里会断开 Chain(有 shuffle)
.map(x -> x + "!"); // 会被 Chain
result.print();
// 获取 StreamGraph
StreamGraph streamGraph = env.getStreamGraph();
System.out.println("StreamNodes count: " + streamGraph.getStreamNodes().size());
// 打印 StreamGraph 信息
System.out.println("StreamNodes: " + streamGraph.getStreamNodes());
System.out.println("StreamEdges: ");
streamGraph
.getStreamNodes()
.forEach(
node -> {
System.out.println(
" - node: "
+ node.getOperatorName()
+ " StreamEdge: "
+ streamGraph.getStreamEdges(node.getId()));
});
// 转换为 JobGraph
JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(streamGraph);
// 打印 JobGraph 信息
System.out.println("\nJobGraph Info:");
System.out.println("Job ID: " + jobGraph.getJobID());
System.out.println("Job Name: " + jobGraph.getName());
System.out.println("JobVertices count: " + jobGraph.getNumberOfVertices());
// 查看每个 JobVertex(被 Chain 后的算子)
for (JobVertex vertex : jobGraph.getVertices()) {
System.out.println("\nJobVertex: " + vertex.getName());
System.out.println(" ID: " + vertex.getID());
System.out.println(" Parallelism: " + vertex.getParallelism());
System.out.println(" Operator IDs: " + vertex.getOperatorIDs());
System.out.println(" Inputs: " + vertex.getInputs().size());
}
}
}
StreamNodes count: 5
StreamNodes: [Source: Collection Source-1, Map-2, Filter-3, Map-5, Sink: Print to Std. Out-6]
StreamEdges:
- node: Source: Collection Source StreamEdge: [(Source: Collection Source-1 -> Map-2,
typeNumber=0, outputPartitioner=REBALANCE, exchangeMode=UNDEFINED,
bufferTimeout=100, outputTag=null, uniqueId=0)]
- node: Map StreamEdge: [(Map-2 -> Filter-3, typeNumber=0, outputPartitioner=FORWARD,
exchangeMode=UNDEFINED, bufferTimeout=100, outputTag=null, uniqueId=0)]
- node: Filter StreamEdge: [(Filter-3 -> Map-5, typeNumber=0, outputPartitioner=HASH,
exchangeMode=UNDEFINED, bufferTimeout=100, outputTag=null, uniqueId=0)]
- node: Map StreamEdge: [(Map-5 -> Sink: Print to Std. Out-6, typeNumber=0,
outputPartitioner=FORWARD, exchangeMode=UNDEFINED, bufferTimeout=100,
outputTag=null, uniqueId=0)]
- node: Sink: Print to Std. Out StreamEdge: []
JobGraph Info:
Job ID: 1ad251732c79901f3f421c5d66b325e9
Job Name: Flink Streaming Job
JobVertices count: 3
JobVertex: Map -> Sink: Print to Std. Out
ID: b5c8d46f3e7b141acf271f12622e752b
Parallelism: 2
Operator IDs: [org.apache.flink.runtime.OperatorIDPair@2c35e847,
org.apache.flink.runtime.OperatorIDPair@7bd4937b]
Inputs: 1
JobVertex: Map -> Filter
ID: 20ba6b65f97481d5570070de90e4e791
Parallelism: 2
Operator IDs: [org.apache.flink.runtime.OperatorIDPair@21e360a,
org.apache.flink.runtime.OperatorIDPair@5ba3f27a]
Inputs: 1
JobVertex: Source: Collection Source
ID: bc764cd8ddf7a0cff126f51c16239658
Parallelism: 1
Operator IDs: [org.apache.flink.runtime.OperatorIDPair@58d75e99]
Inputs: 0
从结果上来看,在构建 DataStream node 时,一共有6个节点,从结果来看,node 只有5个,为什么 keyBy 算子"消失"了?
核心原因: keyBy 不是一个真正的算子(Operator),它只是一个逻辑分区操作,因此不会生成独立的 StreamNode。
从源码可以看到,keyBy 方法只是:
public <K> KeyedStream<T, K> keyBy(KeySelector<T, K> key) {
Preconditions.checkNotNull(key);
return new KeyedStream<>(this, clean(key)); // 只是创建了一个 KeyedStream
}
而 KeyedStream 的构造函数做的事情是:
public KeyedStream(
DataStream<T> dataStream,
KeySelector<T, KEY> keySelector,
TypeInformation<KEY> keyType) {
this(
dataStream,
new PartitionTransformation<>( // 创建一个分区转换
dataStream.getTransformation(),
new KeyGroupStreamPartitioner<>( // 使用 Hash 分区器
keySelector,
StreamGraphGenerator.DEFAULT_LOWER_BOUND_MAX_PARALLELISM)),
keySelector,
keyType);
}
关键点:
keyBy不创建新的算子(Operator)keyBy只创建一个PartitionTransformation(分区转换)keyBy设置了KeyGroupStreamPartitioner(Hash 分区器)
keyBy 的作用体现在 StreamEdge 的 outputPartitioner 上!
StreamEdges:
- node: Filter StreamEdge: [(Filter-3 -> Map-5,
typeNumber=0,
outputPartitioner=HASH, ⬅️ 这就是 keyBy 的作用!
exchangeMode=UNDEFINED,
bufferTimeout=100,
outputTag=null,
uniqueId=0)]
对比其他边:
Source -> Map: outputPartitioner=REBALANCE (默认分区)
Map -> Filter: outputPartitioner=FORWARD (可以 Chain)
Filter -> Map: outputPartitioner=HASH ⬅️ keyBy 设置的!
Map -> Sink: outputPartitioner=FORWARD (可以 Chain)
为什么这样设计?
keyBy 不处理数据
// keyBy 不是一个数据转换操作
DataStream<String> stream = ...;
// map 会转换数据:String -> String
stream.map(String::toUpperCase) // 有实际的计算逻辑
// filter 会过滤数据:保留或丢弃
stream.filter(x -> !x.equals("B")) // 有实际的计算逻辑
// keyBy 不转换数据,只是改变数据的分发方式
stream.keyBy(x -> x) // 没有计算逻辑,只是设置分区策略
keyBy 是逻辑操作
// keyBy 只是告诉 Flink:
// "接下来的算子需要按照 key 分组处理数据"
stream
.keyBy(x -> x) // 设置分区策略
.map(x -> x + "!") // 这个 map 才是真正的算子
.sum(1) // 这个 sum 才是真正的算子
避免不必要的网络传输;如果 keyBy 生成一个独立的算子,会导致:
❌ 不好的设计(如果 keyBy 是算子):
Source -> Map -> Filter -> KeyBy算子 -> Map -> Sink
↑
不必要的算子
✅ 好的设计(keyBy 只是分区策略):
Source -> Map -> Filter --[HASH分区]--> Map -> Sink
↑
只是改变数据分发方式
keyBy 在不同阶段的体现
| 阶段 | keyBy 的体现 | 说明 |
|---|---|---|
| 用户代码 | .keyBy(x -> x) |
用户调用 keyBy 方法 |
| StreamGraph | StreamEdge.partitioner = KeyGroupStreamPartitioner |
设置边的分区器为 HASH |
| JobGraph | 打断 Operator Chain | 因为分区策略改变,不能 Chain |
| ExecutionGraph | 创建 IntermediateResultPartition | 创建中间结果分区 |
| 物理执行 | 数据通过网络 Shuffle | 按照 key 的 hash 值分发到不同的 Task |
类比理解
keyBy 就像是道路上的"分流指示牌":
- 它不是一个收费站(算子)
- 它只是告诉车辆(数据)应该走哪条路(分区)
- 车辆本身没有变化,只是行驶路线变了
StreamGraph → JobGraph 关键概念:
// Operator Chain 的条件(满足以下所有条件才能 Chain):
1. 下游算子只有一个输入
2. 上游算子只有一个输出
3. 两个算子的并行度相同
4. 没有 shuffle(即 ForwardPartitioner)
5. 两个算子在同一个 SlotSharingGroup
6. Chain 策略允许(ChainingStrategy)
// StreamingJobGraphGenerator 的核心方法:
createJobGraph(StreamGraph)
└── createJobVertex() // 创建 JobVertex
└── createChain() // 构建 Operator Chain
└── isChainable() // 判断是否可以 Chain
重点方法:
StreamingJobGraphGenerator.createJobGraph()— 转换入口StreamingJobGraphGenerator.createChain()— 构建 Operator ChainStreamingJobGraphGenerator.isChainable()— 判断是否可以 Chain
PipelineExecutor 体系
核心文件:
PipelineExecutor.javaLocalExecutor.javaRemoteExecutor.java
学习目标:
- 理解
PipelineExecutor的作用(执行器抽象) - 理解不同执行器的实现(Local、Remote、Embedded)
- 理解 SPI 机制加载执行器
PipelineExecutor 体系:
PipelineExecutor
├── LocalExecutor
├── RemoteExecutor
├── AbstractJobClusterExecutor
│ ├── YarnJobClusterExecutor
│ └── KubernetesJobClusterExecutor
└── AbstractSessionClusterExecutor
├── YarnSessionClusterExecutor
└── KubernetesSessionClusterExecutor
学习示例:
public class PipelineExecutorLearningDemo {
public static void main(String[] args) throws Exception {
// 1. 本地执行器
Configuration localConf = new Configuration();
localConf.setString(DeploymentOptions.TARGET, "local");
StreamExecutionEnvironment localEnv =
StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(localConf);
// 查看使用的执行器
PipelineExecutor localExecutor = localEnv.getPipelineExecutor();
System.out.println("Local Executor: " + localExecutor.getClass().getName());
// 2. 远程执行器(连接到已有集群)
Configuration remoteConf = new Configuration();
remoteConf.setString(DeploymentOptions.TARGET, "remote");
remoteConf.setString(JobManagerOptions.ADDRESS, "localhost");
remoteConf.setInteger(JobManagerOptions.PORT, 8081);
StreamExecutionEnvironment remoteEnv =
StreamExecutionEnvironment.getExecutionEnvironment(remoteConf);
PipelineExecutor remoteExecutor = remoteEnv.getPipelineExecutor();
System.out.println("Remote Executor: " + remoteExecutor.getClass().getName());
// 3. 查看所有可用的执行器工厂
PipelineExecutorServiceLoader serviceLoader =
new DefaultExecutorServiceLoader();
System.out.println("\nAvailable Executor Factories:");
// 注意:这需要访问内部 API,仅用于学习
}
}
关键类:
// 1. 执行器接口
PipelineExecutor.execute(Pipeline, Configuration, ClassLoader)
└── 返回 CompletableFuture<JobClient>
// 2. LocalExecutor(本地执行)
LocalExecutor.execute()
└── PerJobMiniClusterFactory.submitJob()
└── MiniCluster.submitJob()
// 3. RemoteExecutor(远程执行)
RemoteExecutor.execute()
└── RestClusterClient.submitJob()
└── 通过 REST API 提交到远程集群
ClusterClient 体系
核心文件:
ClusterClient.javaRestClusterClient.javaMiniClusterClient.java
学习目标:
- 理解
ClusterClient的作用(与集群交互的客户端) - 理解作业提交、取消、Savepoint 等操作
- 理解 REST API 的使用
学习示例:
public class ClusterClientLearningDemo {
public static void main(String[] args) throws Exception {
// 1. 创建 RestClusterClient(连接到运行中的集群)
Configuration conf = new Configuration();
conf.setString(RestOptions.ADDRESS, "localhost");
conf.setInteger(RestOptions.PORT, 8081);
try (RestClusterClient<StandaloneClusterId> client =
new RestClusterClient<>(conf, StandaloneClusterId.getInstance())) {
// 2. 获取集群信息
System.out.println("Cluster ID: " + client.getClusterId());
// 3. 列出正在运行的作业
Collection<JobStatusMessage> jobs = client.listJobs().get();
System.out.println("\nRunning Jobs:");
for (JobStatusMessage job : jobs) {
System.out.println(" - " + job.getJobName() +
" [" + job.getJobState() + "]");
}
// 4. 提交作业(需要 JobGraph)
// JobGraph jobGraph = ...;
// JobID jobId = client.submitJob(jobGraph).get();
// 5. 取消作业
// client.cancel(jobId).get();
// 6. 触发 Savepoint
// String savepointPath = client.triggerSavepoint(jobId, null).get();
}
}
}
CLI 命令行工具
核心文件:
CliFrontend.javaCliFrontendParser.java
学习目标:
- 理解
flink run命令的实现 - 理解命令行参数解析
- 理解 PackagedProgram 的加载
学习方式:
# 1. 查看 flink run 命令的实现
# CliFrontend.run() 方法
# 2. 理解参数解析
# CliFrontendParser.parse()
# 3. 理解 JAR 包加载
# PackagedProgram.newBuilder().build()
PackagedProgram(JAR 包加载)
核心文件:
PackagedProgram.javaPackagedProgramUtils.java
学习目标:
- 理解如何加载用户 JAR 包
- 理解如何调用用户的 main 方法
- 理解 ClassLoader 隔离
Application Mode
核心文件:
ApplicationDispatcherBootstrap.javaEmbeddedExecutor.java
Deployment 抽象
核心文件:
ClusterDescriptor.javaClusterClientFactory.java
学习路线总结
FileSinkDemo.java(起点)
↓
StreamExecutionEnvironment.execute()
↓
getStreamGraph() ─────────────────> StreamGraphGenerator
↓ (Transformation → StreamGraph)
executeAsync()
↓
StreamingJobGraphGenerator ────────> (StreamGraph → JobGraph)
↓
PipelineExecutor.execute()
├── LocalExecutor ────────────> MiniCluster.submitJob()
└── RemoteExecutor ────────────> RestClusterClient.submitJob()
调试实践步骤
修改 FileSinkDemo 添加调试代码
public class FileSinkDemo {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
StreamExecutionEnvironment env =
StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
env.setParallelism(1);
env.enableCheckpointing(5000);
DataStream<String> source = env.fromData("hello", "world", "flink");
FileSink<String> sink = FileSink
.forRowFormat(new Path("file:///tmp/flink-debug-output"),
new SimpleStringEncoder<String>("UTF-8"))
.build();
source.sinkTo(sink);
// 1. 查看 Transformations
System.out.println("\n=== Transformations ===");
List<Transformation<?>> transformations = env.getTransformations();
for (Transformation<?> t : transformations) {
System.out.println(" - " + t.getName() +
" [" + t.getClass().getSimpleName() + "]");
}
// 2. 查看 StreamGraph
System.out.println("\n=== StreamGraph ===");
StreamGraph streamGraph = env.getStreamGraph(false);
System.out.println("Job Name: " + streamGraph.getJobName());
System.out.println("StreamNodes: " + streamGraph.getStreamNodes().size());
// 3. 查看 JobGraph
System.out.println("\n=== JobGraph ===");
JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph(streamGraph);
System.out.println("JobVertices: " + jobGraph.getNumberOfVertices());
for (JobVertex vertex : jobGraph.getVertices()) {
System.out.println(" - " + vertex.getName() +
" (parallelism=" + vertex.getParallelism() + ")");
}
// 4. 查看 PipelineExecutor
System.out.println("\n=== PipelineExecutor ===");
PipelineExecutor executor = env.getPipelineExecutor();
System.out.println("Executor: " + executor.getClass().getName());
// ===== 执行作业 =====
env.execute("Local FileSystem Debug");
}
}
设置断点开始调试
重点方法:
StreamExecutionEnvironment.execute()StreamExecutionEnvironment.getStreamGraph()StreamGraphGenerator.generate()StreamingJobGraphGenerator.createJobGraph()LocalExecutor.execute()
阅读源码理解核心流程
按照以下顺序阅读:
StreamExecutionEnvironment→StreamGraph→StreamGraphGeneratorStreamingJobGraphGenerator→JobGraph→ Operator ChainPipelineExecutor→LocalExecutor→MiniClusterClusterClient→RestClusterClient→ REST APICliFrontend→PackagedProgram→ CLI 工具
实战:手动启动集群并提交作业
在理解了 Client Layer 的核心流程后,我们可以通过手动启动集群的方式,深入理解 ClusterClient 与集群的交互过程。
启动本地 Standalone 集群
通过编程方式启动 JobManager 和 TaskManager,完整体验集群启动流程:
public class StartClusterDemo {
public static void main(String[] args) throws Exception {
Configuration config = createConfiguration();
// 启动 JobManager
StandaloneSessionClusterEntrypoint jobManager = startJobManager(config);
Thread.sleep(3000);
// 启动 TaskManager
TaskManagerRunner taskManager = startTaskManager(config);
Thread.sleep(3000);
System.out.println("Access WebUI at: http://localhost:8081");
Thread.currentThread().join();
}
private static Configuration createConfiguration() {
Configuration config = new Configuration();
config.set(JobManagerOptions.ADDRESS, "localhost");
config.set(JobManagerOptions.PORT, 6123);
config.set(RestOptions.PORT, 8081);
config.set(TaskManagerOptions.NUM_TASK_SLOTS, 4);
// 自动调整本地执行配置
TaskExecutorResourceUtils.adjustForLocalExecution(config);
return config;
}
private static StandaloneSessionClusterEntrypoint startJobManager(Configuration config)
throws Exception {
StandaloneSessionClusterEntrypoint entrypoint =
new StandaloneSessionClusterEntrypoint(config);
Thread jmThread = new Thread(() -> {
try {
ClusterEntrypoint.runClusterEntrypoint(entrypoint);
} catch (Exception e) {
e.printStackTrace();
}
}, "JobManager-Thread");
jmThread.setDaemon(true);
jmThread.start();
return entrypoint;
}
private static TaskManagerRunner startTaskManager(Configuration config) throws Exception {
TaskManagerRunner taskManagerRunner = new TaskManagerRunner(
config,
PluginUtils.createPluginManagerFromRootFolder(config),
TaskManagerRunner::createTaskExecutorService);
taskManagerRunner.start();
return taskManagerRunner;
}
}
通过 RestClusterClient 提交作业
使用 RestClusterClient 连接到集群并提交作业,这是理解 Client 与 JobManager 交互的关键:
public class SubmitJobToClusterDemo {
public static void main(String[] args) throws Exception {
Configuration config = new Configuration();
config.set(RestOptions.PORT, 8081);
config.set(RestOptions.ADDRESS, "localhost");
try (ClusterClient<String> clusterClient =
new RestClusterClient<>(config, "standalone-cluster")) {
System.out.println("Connected: " + clusterClient.getWebInterfaceURL());
// 构建 JobGraph
JobGraph jobGraph = createJobGraph(config);
// 提交作业
JobID jobId = clusterClient.submitJob(jobGraph).get();
System.out.println("Submitted job: " + jobId);
// 等待作业完成
clusterClient.requestJobResult(jobId).get();
}
}
private static JobGraph createJobGraph(Configuration config) {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment(config);
env.setParallelism(2);
DataStream<String> source = env.fromData("a", "b", "c", "d", "e");
source.map(String::toUpperCase).print();
StreamGraph streamGraph = env.getStreamGraph();
streamGraph.setJobName("Standalone Cluster Demo");
return streamGraph.getJobGraph();
}
}
通过这个实战,可以清晰地观察到:
- 集群启动过程:JobManager 和 TaskManager 的启动顺序和依赖关系
- REST API 交互:ClusterClient 如何通过 REST API 与 JobManager 通信
- 作业提交流程:从 StreamGraph → JobGraph → 提交到集群的完整链路
Client Layer 的核心职责
Client Layer 的三大核心职责:
-
图转换(Graph Translation) Transformation → StreamGraph → JobGraph
-
作业提交(Job Submission) PipelineExecutor → ClusterClient → REST API
-
集群交互(Cluster Interaction) 提交作业、取消作业、触发 Savepoint、查询状态