Flink源码学习指南

2025/12/04

Tags: Flink

从 Flink 的整体架构开始,建立全局视野,再深入源码细节。

三层架构模型

Flink 采用经典的主从架构,分为三层:

Client Layer(客户端层)

Master Layer(主节点层)

Worker Layer(工作节点层)

作业执行流程

以一个简单的 FileSinkDemo 为例:

StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
DataStream<String> source = env.fromData("hello", "world", "flink");
source.sinkTo(fileSink);
env.execute();

完整执行流程:

用户代码 ---> StreamGraph ---> JobGraph ---> ExecutionGraph ---> 物理执行
   |              |              |               |                  |
 Client        Client        Client         JobMaster          TaskManager

源码深入分析

第一阶段:StreamGraph 生成

入口:StreamExecutionEnvironment

当调用 env.fromData() 时,实际执行的是:

// StreamExecutionEnvironment.java
public <OUT> DataStreamSource<OUT> fromData(OUT... data) {
    return fromData(Arrays.asList(data));
}

public <OUT> DataStreamSource<OUT> fromData(Collection<OUT> data) {
    // 1. 推断数据类型
    TypeInformation<OUT> typeInfo = TypeExtractor.getForObject(data.iterator().next());
    // 2. 创建 Source Function
    return fromCollection(data, typeInfo);
}

关键方法 addSource()

// StreamExecutionEnvironment.java
public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function) {
    // 核心:向 transformations 列表添加一个 LegacySourceTransformation
    TypeInformation<OUT> outTypeInfo = extractTypeInfo(function);
    StreamSource<OUT, ?> sourceOperator = new StreamSource<>(function);
    return new DataStreamSource<>(this, outTypeInfo, sourceOperator, isParallel, sourceName);
}

关键数据结构transformations 是一个 List<Transformation<?>> ,存储了所有的算子转换。

DataStream 的链式调用

每次调用 map(), filter(), sinkTo() 等方法,都会创建新的 Transformation 并加入列表:

// DataStream.java
public <R> SingleOutputStreamOperator<R> map(MapFunction<T, R> mapper) {
    TypeInformation<R> outType = TypeExtractor.getMapReturnTypes(...);
    // 创建 OneInputTransformation
    return transform("Map", outType, new StreamMap<>(clean(mapper)));
}

protected <R> SingleOutputStreamOperator<R> transform(String operatorName, 
        TypeInformation<R> outTypeInfo, OneInputStreamOperator<T, R> operator) {
    // 1. 创建 Transformation
    OneInputTransformation<T, R> transformation = new OneInputTransformation<>(
            this.transformation,  // 上游 transformation
            operatorName,
            operator,
            outTypeInfo,
            environment.getParallelism());
    // 2. 加入 transformations 列表
    getExecutionEnvironment().addOperator(transformation);
    return new SingleOutputStreamOperator<>(environment, transformation);
}

Transformation 继承体系

Transformation<T>
├── SourceTransformation          # Source 算子
├── SinkTransformation            # Sink 算子
├── OneInputTransformation        # 单输入算子 (map, filter)
├── TwoInputTransformation        # 双输入算子 (join, connect)
├── PartitionTransformation       # 分区算子 (keyBy, rebalance)
└── UnionTransformation           # union 算子

execute() 触发 StreamGraph 生成

// StreamExecutionEnvironment.java
public JobExecutionResult execute(String jobName) throws Exception {
    // 1. 生成 StreamGraph
    StreamGraph streamGraph = getStreamGraph();
    // 2. 提交执行
    return execute(streamGraph);
}

public StreamGraph getStreamGraph() {
    // 核心:StreamGraphGenerator 遍历 transformations 生成 StreamGraph
    return getStreamGraphGenerator(transformations).generate();
}

StreamGraphGenerator 核心逻辑

// StreamGraphGenerator.java
public StreamGraph generate() {
    streamGraph = new StreamGraph(executionConfig, checkpointConfig, ...);
    // 遍历所有 transformation,转换为 StreamNode
    for (Transformation<?> transformation : transformations) {
        transform(transformation);
    }
    return streamGraph;
}

private Collection<Integer> transform(Transformation<?> transform) {
    // 根据不同类型的 Transformation 调用不同的转换方法
    if (transform instanceof OneInputTransformation) {
        return transformOneInputTransform((OneInputTransformation<?, ?>) transform);
    } else if (transform instanceof SourceTransformation) {
        return transformSource((SourceTransformation<?>) transform);
    }
    // ... 其他类型
}

StreamGraph 核心数据结构

// StreamGraph.java
public class StreamGraph {
    // 节点:算子
    private Map<Integer, StreamNode> streamNodes = new HashMap<>();
    // 边:数据流向
    private Set<StreamEdge> edges = new HashSet<>();
    // Source 节点 ID 列表
    private Set<Integer> sources = new HashSet<>();
    // Sink 节点 ID 列表
    private Set<Integer> sinks = new HashSet<>();
}

// StreamNode.java
public class StreamNode {
    private final int id;                           // 节点 ID
    private String operatorName;                    // 算子名称
    private StreamOperatorFactory<?> operatorFactory;  // 算子工厂
    private List<StreamEdge> inEdges;               // 输入边
    private List<StreamEdge> outEdges;              // 输出边
    private int parallelism;                        // 并行度
}

第二阶段:JobGraph 生成

StreamGraph 生成后,需要转换为 JobGraph 用于提交:

// StreamGraph.java
public JobGraph getJobGraph() {
    return StreamingJobGraphGenerator.createJobGraph(this);
}

核心优化:算子链(Operator Chaining)

JobGraph 生成时会进行算子链优化,将可以合并的算子放到同一个 Task 中执行:

// StreamingJobGraphGenerator.java
private void createChain(...) {
    // 判断是否可以 chain
    if (isChainable(currentOperator, downstreamOperator)) {
        // 合并到同一个 JobVertex
        chainedOperators.add(downstreamOperator);
    } else {
        // 创建新的 JobVertex
        createJobVertex(currentOperator);
    }
}

// 判断是否可以 chain 的条件
public static boolean isChainable(StreamEdge edge, StreamGraph streamGraph) {
    // 1. 下游算子只有一个输入
    // 2. 上下游算子在同一个 slot sharing group
    // 3. 上下游并行度相同
    // 4. 连接策略是 ALWAYS 或 HEAD
    // 5. 分区方式是 ForwardPartitioner
    // 6. 上下游算子都允许 chaining
    return downStreamOperator.getInEdges().size() == 1
        && upStreamOperator.getSlotSharingGroup().equals(downStreamOperator.getSlotSharingGroup())
        && upStreamOperator.getParallelism() == downStreamOperator.getParallelism()
        && edge.getPartitioner() instanceof ForwardPartitioner
        && upStreamOperator.getChainingStrategy() != ChainingStrategy.NEVER
        && downStreamOperator.getChainingStrategy() != ChainingStrategy.HEAD;
}

JobGraph 核心数据结构

// JobGraph.java
public class JobGraph {
    private final JobID jobID;
    // 顶点:一个 JobVertex 对应一个或多个 chained 的算子
    private final Map<JobVertexID, JobVertex> taskVertices = new LinkedHashMap<>();
}

// JobVertex.java
public class JobVertex {
    private final JobVertexID id;
    private final List<IntermediateDataSet> results;      // 输出数据集
    private final List<JobEdge> inputs;                   // 输入边
    private int parallelism;                              // 并行度
    private String invokableClassName;                    // Task 类名
}

第三阶段:作业提交与调度

Dispatcher 接收作业

// Dispatcher.java
public CompletableFuture<Acknowledge> submitJob(JobGraph jobGraph) {
    // 1. 持久化 JobGraph
    jobGraphStore.putJobGraph(jobGraph);
    // 2. 创建 JobMaster
    runJob(jobGraph);
}

private void runJob(JobGraph jobGraph) {
    // 创建 JobManagerRunner,内部包含 JobMaster
    JobManagerRunner jobManagerRunner = createJobManagerRunner(jobGraph);
    jobManagerRunner.start();
}

JobMaster 生成 ExecutionGraph

// JobMaster.java
public JobMaster(..., JobGraph jobGraph, ...) {
    // 核心:从 JobGraph 生成 ExecutionGraph
    this.executionGraph = createAndRestoreExecutionGraph(jobGraph);
}

private ExecutionGraph createExecutionGraph(JobGraph jobGraph) {
    // ExecutionGraphBuilder 构建 ExecutionGraph
    return ExecutionGraphBuilder.buildGraph(
        jobGraph,
        configuration,
        slotProvider,
        ...);
}

ExecutionGraph 核心数据结构

// ExecutionGraph.java
public class ExecutionGraph {
    // JobVertex -> ExecutionJobVertex 的映射
    private final Map<JobVertexID, ExecutionJobVertex> tasks = new HashMap<>();
    // 所有的 Execution(Task 实例)
    private final Map<ExecutionAttemptID, Execution> currentExecutions = new HashMap<>();
}

// ExecutionJobVertex.java(对应 JobVertex)
public class ExecutionJobVertex {
    private final ExecutionVertex[] taskVertices;  // 并行度个 ExecutionVertex
}

// ExecutionVertex.java(对应一个并行实例)
public class ExecutionVertex {
    private final ExecutionJobVertex jobVertex;
    private final int subTaskIndex;      // 子任务索引
    private Execution currentExecution;  // 当前执行实例
}

// Execution.java(一次执行尝试)
public class Execution {
    private final ExecutionVertex vertex;
    private final ExecutionAttemptID attemptId;
    // CREATED -> SCHEDULED -> DEPLOYING -> RUNNING -> FINISHED
    private ExecutionState state;        
    private LogicalSlot assignedResource;  // 分配的 slot
}

图的转换关系

StreamGraph          JobGraph              ExecutionGraph
-----------          --------              --------------
StreamNode    --->   JobVertex      --->   ExecutionJobVertex
(每个算子)           (chained算子)          (并行度个ExecutionVertex)
                                                    |
                                                    v
                                              Execution (实际执行)

Scheduler 调度 Task

// DefaultScheduler.java
public void startScheduling() {
    // 1. 分配 slot
    allocateSlots();
    // 2. 部署 Task
    deployTasks();
}

private void deployTasks(List<ExecutionVertex> vertices) {
    for (ExecutionVertex vertex : vertices) {
        // 获取分配的 slot
        LogicalSlot slot = vertex.getCurrentExecution().getAssignedResource();
        // 创建 TaskDeploymentDescriptor
        TaskDeploymentDescriptor tdd = createTaskDeploymentDescriptor(vertex);
        // 向 TaskManager 提交 Task
        slot.getTaskManagerGateway().submitTask(tdd);
    }
}

第四阶段:TaskManager 执行

Task 启动

// TaskExecutor.java(TaskManager 的 RPC 端点)
public CompletableFuture<Acknowledge> submitTask(TaskDeploymentDescriptor tdd) {
    // 1. 创建 Task 对象
    Task task = new Task(tdd, ...);
    // 2. 注册到 TaskSlot
    taskSlotTable.addTask(task);
    // 3. 启动 Task 线程
    task.startTaskThread();
}

// Task.java
public void run() {
    // 1. 加载用户代码
    invokable = loadAndInstantiateInvokable(userCodeClassLoader, nameOfInvokableClass);
    // 2. 执行 Task
    invokable.invoke();
}

StreamTask 执行流程

StreamTask 是所有流式 Task 的基类:

// StreamTask.java
public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> {
    
    public final void invoke() throws Exception {
        // 1. 初始化
        beforeInvoke();
        // 2. 执行(核心循环)
        runMailboxLoop();
        // 3. 清理
        afterInvoke();
    }
    
    // 初始化算子链
    protected void beforeInvoke() {
        // 创建 OperatorChain
        operatorChain = new OperatorChain<>(this, recordWriter);
        // 初始化所有算子
        operatorChain.initializeStateAndOpenOperators(createStreamTaskStateInitializer());
    }
    
    // 核心处理循环
    public void runMailboxLoop() {
        while (true) {
            // 处理邮件(checkpoint、timer 等)
            processMail();
            // 处理输入数据
            inputProcessor.processInput();
        }
    }
}

数据处理流程

// StreamInputProcessor.java
public InputStatus processInput() {
    // 1. 从 InputGate 读取数据
    BufferOrEvent bufferOrEvent = inputGate.getNext();
    // 2. 反序列化
    StreamRecord<IN> record = deserializer.getNextRecord();
    // 3. 调用算子处理
    operator.processElement(record);
}

// StreamMap.java(以 map 算子为例)
public class StreamMap<IN, OUT> extends AbstractUdfStreamOperator<OUT, MapFunction<IN, OUT>> {
    
    @Override
    public void processElement(StreamRecord<IN> element) throws Exception {
        // 1. 调用用户函数
        OUT result = userFunction.map(element.getValue());
        // 2. 输出到下游
        output.collect(element.replace(result));
    }
}

第五阶段:Checkpoint 机制

Checkpoint 触发

// CheckpointCoordinator.java
public void triggerCheckpoint(long timestamp) {
    // 1. 生成 checkpoint ID
    long checkpointId = checkpointIdCounter.getAndIncrement();
    // 2. 向所有 Source Task 发送 barrier
    for (ExecutionVertex sourceTask : sourceTasks) {
        sourceTask.getCurrentExecution().triggerCheckpoint(checkpointId, timestamp);
    }
}

Barrier 传播与对齐

// CheckpointBarrierHandler.java
public void processBarrier(CheckpointBarrier barrier, int channelIndex) {
    // 1. 记录收到的 barrier
    barrierCount[channelIndex]++;
    // 2. 检查是否所有 channel 都收到了 barrier
    if (barrierCount.allReceived()) {
        // 3. 触发本地 checkpoint
        triggerCheckpoint(barrier.getId());
        // 4. 向下游发送 barrier
        output.emitBarrier(barrier);
    }
}

状态快照

// StreamTask.java
public void triggerCheckpoint(long checkpointId) {
    // 1. 对所有算子做状态快照
    for (StreamOperator<?> operator : operatorChain.getAllOperators()) {
        operator.snapshotState(checkpointId);
    }
    // 2. 上报 checkpoint 完成
    reportCheckpointComplete(checkpointId);
}

关键类速查表

阶段 关键类 职责
Client StreamExecutionEnvironment 用户程序入口,管理 transformations
Client StreamGraphGenerator 生成 StreamGraph
Client StreamingJobGraphGenerator 生成 JobGraph,算子链优化
Master Dispatcher 接收作业,创建 JobMaster
Master JobMaster 作业协调,生成 ExecutionGraph
Master DefaultScheduler 调度 Task 到 TaskManager
Master CheckpointCoordinator 触发和协调 checkpoint
Worker TaskExecutor TaskManager 的 RPC 端点
Worker Task 执行单元,包装 StreamTask
Worker StreamTask 流式 Task 基类,核心处理循环
Worker OperatorChain 管理 chained 算子

调试技巧

关键断点位置

  1. StreamGraph 生成StreamGraphGenerator.transform()
  2. JobGraph 生成StreamingJobGraphGenerator.createChain()
  3. 作业提交Dispatcher.submitJob()
  4. Task 部署TaskExecutor.submitTask()
  5. 数据处理StreamTask.processInput()
  6. CheckpointCheckpointCoordinator.triggerCheckpoint()

日志配置

log4j.properties 中添加:

# 查看 StreamGraph 生成
logger.graph.name = org.apache.flink.streaming.api.graph
logger.graph.level = DEBUG

# 查看调度过程
logger.scheduler.name = org.apache.flink.runtime.scheduler
logger.scheduler.level = DEBUG

# 查看 Checkpoint
logger.checkpoint.name = org.apache.flink.runtime.checkpoint
logger.checkpoint.level = DEBUG

Web UI 观察点

访问 http://localhost:8081

  1. Running Jobs → 查看作业 DAG 图
  2. Task Managers → 查看 slot 分配情况
  3. Checkpoints → 查看 checkpoint 历史和耗时
  4. Back Pressure → 查看反压情况