从 Flink 的整体架构开始,建立全局视野,再深入源码细节。
Flink 整体架构概览
三层架构模型
Flink 采用经典的主从架构,分为三层:
Client Layer(客户端层)
- Client:用户程序入口
- JobGraph Generation:将用户代码转换为 JobGraph
- Job Submission:提交作业到集群
Master Layer(主节点层)
- JobManager:集群主节点,包含以下组件:
- Dispatcher:接收作业提交,管理作业生命周期
- ResourceManager:管理 TaskManager 资源,分配 slot
- JobMaster:单个作业的协调者,负责调度和 checkpoint
- Checkpoint Coordinator:协调 checkpoint 流程
- Scheduler:任务调度器
Worker Layer(工作节点层)
- TaskManager:工作节点,包含以下组件:
- Task Slot:执行具体任务的容器
- Network Stack:处理数据交换
- State Backend:管理算子状态
作业执行流程
以一个简单的 FileSinkDemo 为例:
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
DataStream<String> source = env.fromData("hello", "world", "flink");
source.sinkTo(fileSink);
env.execute();
完整执行流程:
用户代码 ---> StreamGraph ---> JobGraph ---> ExecutionGraph ---> 物理执行
| | | | |
Client Client Client JobMaster TaskManager
源码深入分析
第一阶段:StreamGraph 生成
入口:StreamExecutionEnvironment
当调用 env.fromData() 时,实际执行的是:
// StreamExecutionEnvironment.java
public <OUT> DataStreamSource<OUT> fromData(OUT... data) {
return fromData(Arrays.asList(data));
}
public <OUT> DataStreamSource<OUT> fromData(Collection<OUT> data) {
// 1. 推断数据类型
TypeInformation<OUT> typeInfo = TypeExtractor.getForObject(data.iterator().next());
// 2. 创建 Source Function
return fromCollection(data, typeInfo);
}
关键方法 addSource():
// StreamExecutionEnvironment.java
public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function) {
// 核心:向 transformations 列表添加一个 LegacySourceTransformation
TypeInformation<OUT> outTypeInfo = extractTypeInfo(function);
StreamSource<OUT, ?> sourceOperator = new StreamSource<>(function);
return new DataStreamSource<>(this, outTypeInfo, sourceOperator, isParallel, sourceName);
}
关键数据结构:transformations 是一个 List<Transformation<?>> ,存储了所有的算子转换。
DataStream 的链式调用
每次调用 map(), filter(), sinkTo() 等方法,都会创建新的 Transformation 并加入列表:
// DataStream.java
public <R> SingleOutputStreamOperator<R> map(MapFunction<T, R> mapper) {
TypeInformation<R> outType = TypeExtractor.getMapReturnTypes(...);
// 创建 OneInputTransformation
return transform("Map", outType, new StreamMap<>(clean(mapper)));
}
protected <R> SingleOutputStreamOperator<R> transform(String operatorName,
TypeInformation<R> outTypeInfo, OneInputStreamOperator<T, R> operator) {
// 1. 创建 Transformation
OneInputTransformation<T, R> transformation = new OneInputTransformation<>(
this.transformation, // 上游 transformation
operatorName,
operator,
outTypeInfo,
environment.getParallelism());
// 2. 加入 transformations 列表
getExecutionEnvironment().addOperator(transformation);
return new SingleOutputStreamOperator<>(environment, transformation);
}
Transformation 继承体系:
Transformation<T>
├── SourceTransformation # Source 算子
├── SinkTransformation # Sink 算子
├── OneInputTransformation # 单输入算子 (map, filter)
├── TwoInputTransformation # 双输入算子 (join, connect)
├── PartitionTransformation # 分区算子 (keyBy, rebalance)
└── UnionTransformation # union 算子
execute() 触发 StreamGraph 生成
// StreamExecutionEnvironment.java
public JobExecutionResult execute(String jobName) throws Exception {
// 1. 生成 StreamGraph
StreamGraph streamGraph = getStreamGraph();
// 2. 提交执行
return execute(streamGraph);
}
public StreamGraph getStreamGraph() {
// 核心:StreamGraphGenerator 遍历 transformations 生成 StreamGraph
return getStreamGraphGenerator(transformations).generate();
}
StreamGraphGenerator 核心逻辑:
// StreamGraphGenerator.java
public StreamGraph generate() {
streamGraph = new StreamGraph(executionConfig, checkpointConfig, ...);
// 遍历所有 transformation,转换为 StreamNode
for (Transformation<?> transformation : transformations) {
transform(transformation);
}
return streamGraph;
}
private Collection<Integer> transform(Transformation<?> transform) {
// 根据不同类型的 Transformation 调用不同的转换方法
if (transform instanceof OneInputTransformation) {
return transformOneInputTransform((OneInputTransformation<?, ?>) transform);
} else if (transform instanceof SourceTransformation) {
return transformSource((SourceTransformation<?>) transform);
}
// ... 其他类型
}
StreamGraph 核心数据结构:
// StreamGraph.java
public class StreamGraph {
// 节点:算子
private Map<Integer, StreamNode> streamNodes = new HashMap<>();
// 边:数据流向
private Set<StreamEdge> edges = new HashSet<>();
// Source 节点 ID 列表
private Set<Integer> sources = new HashSet<>();
// Sink 节点 ID 列表
private Set<Integer> sinks = new HashSet<>();
}
// StreamNode.java
public class StreamNode {
private final int id; // 节点 ID
private String operatorName; // 算子名称
private StreamOperatorFactory<?> operatorFactory; // 算子工厂
private List<StreamEdge> inEdges; // 输入边
private List<StreamEdge> outEdges; // 输出边
private int parallelism; // 并行度
}
第二阶段:JobGraph 生成
StreamGraph 生成后,需要转换为 JobGraph 用于提交:
// StreamGraph.java
public JobGraph getJobGraph() {
return StreamingJobGraphGenerator.createJobGraph(this);
}
核心优化:算子链(Operator Chaining)
JobGraph 生成时会进行算子链优化,将可以合并的算子放到同一个 Task 中执行:
// StreamingJobGraphGenerator.java
private void createChain(...) {
// 判断是否可以 chain
if (isChainable(currentOperator, downstreamOperator)) {
// 合并到同一个 JobVertex
chainedOperators.add(downstreamOperator);
} else {
// 创建新的 JobVertex
createJobVertex(currentOperator);
}
}
// 判断是否可以 chain 的条件
public static boolean isChainable(StreamEdge edge, StreamGraph streamGraph) {
// 1. 下游算子只有一个输入
// 2. 上下游算子在同一个 slot sharing group
// 3. 上下游并行度相同
// 4. 连接策略是 ALWAYS 或 HEAD
// 5. 分区方式是 ForwardPartitioner
// 6. 上下游算子都允许 chaining
return downStreamOperator.getInEdges().size() == 1
&& upStreamOperator.getSlotSharingGroup().equals(downStreamOperator.getSlotSharingGroup())
&& upStreamOperator.getParallelism() == downStreamOperator.getParallelism()
&& edge.getPartitioner() instanceof ForwardPartitioner
&& upStreamOperator.getChainingStrategy() != ChainingStrategy.NEVER
&& downStreamOperator.getChainingStrategy() != ChainingStrategy.HEAD;
}
JobGraph 核心数据结构:
// JobGraph.java
public class JobGraph {
private final JobID jobID;
// 顶点:一个 JobVertex 对应一个或多个 chained 的算子
private final Map<JobVertexID, JobVertex> taskVertices = new LinkedHashMap<>();
}
// JobVertex.java
public class JobVertex {
private final JobVertexID id;
private final List<IntermediateDataSet> results; // 输出数据集
private final List<JobEdge> inputs; // 输入边
private int parallelism; // 并行度
private String invokableClassName; // Task 类名
}
第三阶段:作业提交与调度
Dispatcher 接收作业
// Dispatcher.java
public CompletableFuture<Acknowledge> submitJob(JobGraph jobGraph) {
// 1. 持久化 JobGraph
jobGraphStore.putJobGraph(jobGraph);
// 2. 创建 JobMaster
runJob(jobGraph);
}
private void runJob(JobGraph jobGraph) {
// 创建 JobManagerRunner,内部包含 JobMaster
JobManagerRunner jobManagerRunner = createJobManagerRunner(jobGraph);
jobManagerRunner.start();
}
JobMaster 生成 ExecutionGraph
// JobMaster.java
public JobMaster(..., JobGraph jobGraph, ...) {
// 核心:从 JobGraph 生成 ExecutionGraph
this.executionGraph = createAndRestoreExecutionGraph(jobGraph);
}
private ExecutionGraph createExecutionGraph(JobGraph jobGraph) {
// ExecutionGraphBuilder 构建 ExecutionGraph
return ExecutionGraphBuilder.buildGraph(
jobGraph,
configuration,
slotProvider,
...);
}
ExecutionGraph 核心数据结构:
// ExecutionGraph.java
public class ExecutionGraph {
// JobVertex -> ExecutionJobVertex 的映射
private final Map<JobVertexID, ExecutionJobVertex> tasks = new HashMap<>();
// 所有的 Execution(Task 实例)
private final Map<ExecutionAttemptID, Execution> currentExecutions = new HashMap<>();
}
// ExecutionJobVertex.java(对应 JobVertex)
public class ExecutionJobVertex {
private final ExecutionVertex[] taskVertices; // 并行度个 ExecutionVertex
}
// ExecutionVertex.java(对应一个并行实例)
public class ExecutionVertex {
private final ExecutionJobVertex jobVertex;
private final int subTaskIndex; // 子任务索引
private Execution currentExecution; // 当前执行实例
}
// Execution.java(一次执行尝试)
public class Execution {
private final ExecutionVertex vertex;
private final ExecutionAttemptID attemptId;
// CREATED -> SCHEDULED -> DEPLOYING -> RUNNING -> FINISHED
private ExecutionState state;
private LogicalSlot assignedResource; // 分配的 slot
}
图的转换关系:
StreamGraph JobGraph ExecutionGraph
----------- -------- --------------
StreamNode ---> JobVertex ---> ExecutionJobVertex
(每个算子) (chained算子) (并行度个ExecutionVertex)
|
v
Execution (实际执行)
Scheduler 调度 Task
// DefaultScheduler.java
public void startScheduling() {
// 1. 分配 slot
allocateSlots();
// 2. 部署 Task
deployTasks();
}
private void deployTasks(List<ExecutionVertex> vertices) {
for (ExecutionVertex vertex : vertices) {
// 获取分配的 slot
LogicalSlot slot = vertex.getCurrentExecution().getAssignedResource();
// 创建 TaskDeploymentDescriptor
TaskDeploymentDescriptor tdd = createTaskDeploymentDescriptor(vertex);
// 向 TaskManager 提交 Task
slot.getTaskManagerGateway().submitTask(tdd);
}
}
第四阶段:TaskManager 执行
Task 启动
// TaskExecutor.java(TaskManager 的 RPC 端点)
public CompletableFuture<Acknowledge> submitTask(TaskDeploymentDescriptor tdd) {
// 1. 创建 Task 对象
Task task = new Task(tdd, ...);
// 2. 注册到 TaskSlot
taskSlotTable.addTask(task);
// 3. 启动 Task 线程
task.startTaskThread();
}
// Task.java
public void run() {
// 1. 加载用户代码
invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass);
// 2. 执行 Task
invokable.invoke();
}
StreamTask 执行流程
StreamTask 是所有流式 Task 的基类:
// StreamTask.java
public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> {
public final void invoke() throws Exception {
// 1. 初始化
beforeInvoke();
// 2. 执行(核心循环)
runMailboxLoop();
// 3. 清理
afterInvoke();
}
// 初始化算子链
protected void beforeInvoke() {
// 创建 OperatorChain
operatorChain = new OperatorChain<>(this, recordWriter);
// 初始化所有算子
operatorChain.initializeStateAndOpenOperators(createStreamTaskStateInitializer());
}
// 核心处理循环
public void runMailboxLoop() {
while (true) {
// 处理邮件(checkpoint、timer 等)
processMail();
// 处理输入数据
inputProcessor.processInput();
}
}
}
数据处理流程
// StreamInputProcessor.java
public InputStatus processInput() {
// 1. 从 InputGate 读取数据
BufferOrEvent bufferOrEvent = inputGate.getNext();
// 2. 反序列化
StreamRecord<IN> record = deserializer.getNextRecord();
// 3. 调用算子处理
operator.processElement(record);
}
// StreamMap.java(以 map 算子为例)
public class StreamMap<IN, OUT> extends AbstractUdfStreamOperator<OUT, MapFunction<IN, OUT>> {
@Override
public void processElement(StreamRecord<IN> element) throws Exception {
// 1. 调用用户函数
OUT result = userFunction.map(element.getValue());
// 2. 输出到下游
output.collect(element.replace(result));
}
}
第五阶段:Checkpoint 机制
Checkpoint 触发
// CheckpointCoordinator.java
public void triggerCheckpoint(long timestamp) {
// 1. 生成 checkpoint ID
long checkpointId = checkpointIdCounter.getAndIncrement();
// 2. 向所有 Source Task 发送 barrier
for (ExecutionVertex sourceTask : sourceTasks) {
sourceTask.getCurrentExecution().triggerCheckpoint(checkpointId, timestamp);
}
}
Barrier 传播与对齐
// CheckpointBarrierHandler.java
public void processBarrier(CheckpointBarrier barrier, int channelIndex) {
// 1. 记录收到的 barrier
barrierCount[channelIndex]++;
// 2. 检查是否所有 channel 都收到了 barrier
if (barrierCount.allReceived()) {
// 3. 触发本地 checkpoint
triggerCheckpoint(barrier.getId());
// 4. 向下游发送 barrier
output.emitBarrier(barrier);
}
}
状态快照
// StreamTask.java
public void triggerCheckpoint(long checkpointId) {
// 1. 对所有算子做状态快照
for (StreamOperator<?> operator : operatorChain.getAllOperators()) {
operator.snapshotState(checkpointId);
}
// 2. 上报 checkpoint 完成
reportCheckpointComplete(checkpointId);
}
关键类速查表
| 阶段 | 关键类 | 职责 |
|---|---|---|
| Client | StreamExecutionEnvironment |
用户程序入口,管理 transformations |
| Client | StreamGraphGenerator |
生成 StreamGraph |
| Client | StreamingJobGraphGenerator |
生成 JobGraph,算子链优化 |
| Master | Dispatcher |
接收作业,创建 JobMaster |
| Master | JobMaster |
作业协调,生成 ExecutionGraph |
| Master | DefaultScheduler |
调度 Task 到 TaskManager |
| Master | CheckpointCoordinator |
触发和协调 checkpoint |
| Worker | TaskExecutor |
TaskManager 的 RPC 端点 |
| Worker | Task |
执行单元,包装 StreamTask |
| Worker | StreamTask |
流式 Task 基类,核心处理循环 |
| Worker | OperatorChain |
管理 chained 算子 |
调试技巧
关键断点位置
- StreamGraph 生成:
StreamGraphGenerator.transform() - JobGraph 生成:
StreamingJobGraphGenerator.createChain() - 作业提交:
Dispatcher.submitJob() - Task 部署:
TaskExecutor.submitTask() - 数据处理:
StreamTask.processInput() - Checkpoint:
CheckpointCoordinator.triggerCheckpoint()
日志配置
在 log4j.properties 中添加:
# 查看 StreamGraph 生成
logger.graph.name = org.apache.flink.streaming.api.graph
logger.graph.level = DEBUG
# 查看调度过程
logger.scheduler.name = org.apache.flink.runtime.scheduler
logger.scheduler.level = DEBUG
# 查看 Checkpoint
logger.checkpoint.name = org.apache.flink.runtime.checkpoint
logger.checkpoint.level = DEBUG
Web UI 观察点
访问 http://localhost:8081:
- Running Jobs → 查看作业 DAG 图
- Task Managers → 查看 slot 分配情况
- Checkpoints → 查看 checkpoint 历史和耗时
- Back Pressure → 查看反压情况