Flink源码-Jobmanager作业提交与调度

2025/12/26

Tags: Flink

详细梳理 Flink 中从 Client 提交作业到任务在 TaskManager 上执行的完整流程。

1. 整体流程概览

Client 提交 JobGraph
       ↓
REST API (JobSubmitHandler)    -- 接收 HTTP 请求,解析 JobGraph
       ↓
Dispatcher.submitJob()         -- 作业提交入口,创建 JobManagerRunner
       ↓
JobManagerRunner.start()       -- 启动 Leader 选举
       ↓
grantLeadership()              -- 获得 Leadership
       ↓
JobMasterServiceProcess        -- 创建 JobMaster
       ↓
JobMaster.onStart()            -- JobMaster 启动,触发调度
       ↓
SchedulerBase.startScheduling  -- 创建 ExecutionGraph,开始调度
       ↓
SchedulingStrategy             -- 调度策略,决定任务调度顺序
       ↓
ExecutionDeployer              -- 分配 Slot,部署任务
       ↓
TaskManagerGateway             -- RPC 调用,提交任务到 TaskManager
       ↓
TaskExecutor.submitTask()      -- 创建 Task,启动执行线程
       ↓
Task.run()                     -- 执行用户代码

2. 核心类和文件位置

2.1 作业提交相关

2.2 调度核心类

2.3 执行图相关类

2.4 调度策略和部署类

2.5 TaskManager 相关


3. 阶段一:Client 提交作业

3.1 REST 请求路由机制

Client 提交作业时,HTTP 请求经过以下路由过程到达 Dispatcher:

HTTP POST /jobs
    ↓
Netty ChannelPipeline   -- Flink REST Server 基于 Netty 实现
    ↓
RouterHandler           -- 根据 URL 匹配对应的 Handler
    ↓
LeaderRetrievalHandler  -- 获取当前 Leader (DispatcherGateway)
    ↓
AbstractRestHandler     -- 通用 REST 处理逻辑
    ↓
JobSubmitHandler        -- 具体处理作业提交
    ↓
gateway.submitJob()     -- 调用 DispatcherGateway RPC
    ↓
Dispatcher.submitJob()  -- Dispatcher 实现 DispatcherGateway 接口

关键类继承关系:

JobSubmitHandler
    extends AbstractRestHandler<DispatcherGateway, ...>
        extends AbstractHandler<T, R, M>
            extends LeaderRetrievalHandler<T>

LeaderRetrievalHandler 获取 Dispatcher 的核心逻辑:

// LeaderRetrievalHandler.java
@Override
protected void channelRead0(ChannelHandlerContext ctx, RoutedRequest routedRequest) {
    // 通过 GatewayRetriever 获取当前 Leader(Dispatcher)
    OptionalConsumer<? extends T> optLeaderConsumer =
            OptionalConsumer.of(leaderRetriever.getNow());
    
    optLeaderConsumer.ifPresent(gateway -> {
        // gateway 就是 DispatcherGateway,指向当前 Leader Dispatcher
        respondAsLeader(ctx, routedRequest, gateway);
    });
}

GatewayRetriever 的作用:

3.2 REST API 入口 - JobSubmitHandler

// JobSubmitHandler.java
@Override
protected CompletableFuture<JobSubmitResponseBody> handleRequest(
        HandlerRequest<JobSubmitRequestBody> request,
        DispatcherGateway gateway) {  // gateway 由 LeaderRetrievalHandler 传入
    
    // 1. 获取上传的文件
    final Collection<File> uploadedFiles = request.getUploadedFiles();
    
    // 2. 加载 JobGraph(反序列化)
    CompletableFuture<JobGraph> jobGraphFuture = loadJobGraph(requestBody, nameToFile);
    
    // 3. 上传 JAR 文件到 BlobServer
    CompletableFuture<JobGraph> finalizedJobGraphFuture =
            uploadJobGraphFiles(gateway, jobGraphFuture, jarFiles, artifacts, configuration);
    
    // 4. 提交作业到 Dispatcher(RPC 调用)
    CompletableFuture<Acknowledge> jobSubmissionFuture =
            finalizedJobGraphFuture.thenCompose(
                    jobGraph -> gateway.submitJob(jobGraph, timeout));
    
    return jobSubmissionFuture.thenCombine(
            jobGraphFuture,
            (ack, jobGraph) -> new JobSubmitResponseBody("/jobs/" + jobGraph.getJobID()));
}

3.3 Dispatcher 接收作业

// Dispatcher.java
@Override
public CompletableFuture<Acknowledge> submitJob(JobGraph jobGraph, Time timeout) {
    final JobID jobID = jobGraph.getJobID();
    
    log.info("Received JobGraph submission '{}' ({}).", jobGraph.getName(), jobID);
    
    // 检查作业是否已存在
    if (jobManagerRunnerRegistry.isRegistered(jobID)) {
        return FutureUtils.completedExceptionally(
                DuplicateJobSubmissionException.of(jobID));
    }
    
    // 内部提交作业
    return internalSubmitJob(jobGraph);
}

private CompletableFuture<Acknowledge> internalSubmitJob(JobGraph jobGraph) {
    log.info("Submitting job '{}' ({}).", jobGraph.getName(), jobGraph.getJobID());
    
    // 持久化并运行作业
    return waitForTerminatingJob(jobGraph.getJobID(), jobGraph, this::persistAndRunJob)
            .handle((ignored, throwable) -> handleTermination(jobGraph.getJobID(), throwable))
            .thenCompose(Function.identity());
}

3.4 启动 JobManagerRunner

// Dispatcher.java
private void runJob(JobManagerRunner jobManagerRunner, ExecutionType executionType) {
    // 1. 启动 JobManagerRunner
    jobManagerRunner.start();
    jobManagerRunnerRegistry.register(jobManagerRunner);
    
    // 2. 监听作业完成(异步)
    final CompletableFuture<CleanupJobState> cleanupJobStateFuture =
        jobManagerRunner.getResultFuture()
            .handleAsync((result, throwable) -> {
                if (result != null) {
                    return handleJobManagerRunnerResult(result, executionType);
                } else {
                    return CompletableFuture.completedFuture(
                        jobManagerRunnerFailed(jobId, JobStatus.FAILED, throwable));
                }
            }, getMainThreadExecutor())
            .thenCompose(Function.identity());
    
    // 3. 清理作业
    cleanupJobStateFuture.thenCompose(
        cleanupJobState -> removeJob(jobId, cleanupJobState));
}

4. 阶段二:Leader 选举和 JobMaster 创建

4.1 JobMasterServiceLeadershipRunner 启动

// JobMasterServiceLeadershipRunner.java
@Override
public void start() throws Exception {
    LOG.debug("Start leadership runner for job {}.", getJobID());
    // 启动 Leader 选举
    leaderElection.startLeaderElection(this);
}

4.2 获得 Leadership

// JobMasterServiceLeadershipRunner.java
@Override
public void grantLeadership(UUID leaderSessionID) {
    runIfStateRunning(
        () -> startJobMasterServiceProcessAsync(leaderSessionID),
        "starting a new JobMasterServiceProcess");
}

private void createNewJobMasterServiceProcess(UUID leaderSessionId) {
    LOG.info("Creating new JobMasterServiceProcess for job {}", getJobID());
    
    // 创建 JobMasterServiceProcess(内部会创建 JobMaster)
    jobMasterServiceProcess = jobMasterServiceProcessFactory.create(leaderSessionId);
}

4.3 创建 JobMaster

// DefaultJobMasterServiceProcess.java
public DefaultJobMasterServiceProcess(...) {
    // 异步创建 JobMasterService(即 JobMaster)
    this.jobMasterServiceFuture =
        jobMasterServiceFactory.createJobMasterService(leaderSessionId, this);
    
    jobMasterServiceFuture.whenComplete((jobMasterService, throwable) -> {
        if (throwable != null) {
            resultFuture.complete(
                JobManagerRunnerResult.forInitializationFailure(...));
        } else {
            registerJobMasterServiceFutures(jobMasterService);
        }
    });
}

5. 阶段三:JobMaster 启动和调度器初始化

5.1 JobMaster 构造函数

// JobMaster.java
public JobMaster(...) throws Exception {
    // ... 初始化各种服务 ...
    
    // 创建调度器(核心!)
    this.schedulerNG = createScheduler(
            slotPoolServiceSchedulerFactory,
            executionDeploymentTracker,
            jobManagerJobMetricGroup,
            jobStatusListener);
}

5.2 JobMaster 启动

// JobMaster.java
@Override
protected void onStart() throws Exception {
    try {
        startJobExecution();
    } catch (Exception e) {
        handleJobMasterError(e);
    }
}

private void startJobExecution() throws Exception {
    // 1. 验证 JobMaster 正在运行
    validateRunsInMainThread();
    
    // 2. 启动 JobMaster 服务
    startJobMasterServices();
    
    // 3. 重连 ResourceManager
    reconnectToResourceManager(
            new FlinkException("Starting JobMaster component."));
    
    // 4. 开始调度!
    startScheduling();
}

6. 阶段四:JobGraph 转换为 ExecutionGraph

6.1 转换时机

SchedulerBase 构造函数中,JobGraph 被转换为 ExecutionGraph:

// SchedulerBase.java
public SchedulerBase(...) throws Exception {
    // 创建并恢复 ExecutionGraph
    this.executionGraph = createAndRestoreExecutionGraph(
            completedCheckpointStore,
            checkpointsCleaner,
            checkpointIdCounter,
            initializationTimestamp,
            mainThreadExecutor,
            jobStatusListener,
            vertexParallelismStore);
}

6.2 ExecutionGraph 构建

// DefaultExecutionGraphBuilder.java
public static DefaultExecutionGraph buildGraph(
        JobGraph jobGraph, ...) {
    
    // 1. 创建 JobInformation
    final JobInformation jobInformation = new JobInformation(...);
    
    // 2. 创建 ExecutionGraph 实例
    final DefaultExecutionGraph executionGraph = new DefaultExecutionGraph(...);
    
    // 3. 初始化各个 JobVertex
    for (JobVertex vertex : jobGraph.getVertices()) {
        vertex.initializeOnMaster(...);
    }
    
    // 4. 将 JobGraph 的顶点附加到 ExecutionGraph
    executionGraph.attachJobGraph(sortedTopology);
    
    return executionGraph;
}

6.3 数据结构转换关系

JobGraph (用户提交)              ExecutionGraph (运行时)
─────────────────              ────────────────────

JobVertex                ──►   ExecutionJobVertex
  │                            │
  ├── parallelism = 4          ├── ExecutionVertex[0] → Execution
  │                            ├── ExecutionVertex[1] → Execution
  │                            ├── ExecutionVertex[2] → Execution
  │                            └── ExecutionVertex[3] → Execution
  │
JobEdge (数据交换)       ──►   IntermediateResult
                              └── IntermediateResultPartition[]

7. 阶段五:调度器启动调度

7.1 触发调度

// JobMaster.java
private void startScheduling() {
    schedulerNG.startScheduling();
}

7.2 SchedulerBase.startScheduling()

// SchedulerBase.java
@Override
public final void startScheduling() {
    mainThreadExecutor.assertRunningInMainThread();
    
    // 1. 注册作业指标
    registerJobMetrics(...);
    
    // 2. 启动所有 Operator Coordinator
    operatorCoordinatorHandler.startAllOperatorCoordinators();
    
    // 3. 调用子类实现的调度方法
    startSchedulingInternal();
}

7.3 DefaultScheduler.startSchedulingInternal()

// DefaultScheduler.java
@Override
protected void startSchedulingInternal() {
    log.info("Starting scheduling with scheduling strategy [{}]",
            schedulingStrategy.getClass().getName());
    
    // 1. 将 ExecutionGraph 状态转换为 RUNNING
    transitionToRunning();
    
    // 2. 调用调度策略开始调度
    schedulingStrategy.startScheduling();
}

8. 阶段六:调度策略决定调度顺序

8.1 PipelinedRegionSchedulingStrategy

这是 Flink 默认的调度策略,基于流水线区域(Pipelined Region)进行调度。

// PipelinedRegionSchedulingStrategy.java
@Override
public void startScheduling() {
    // 1. 找出所有源头区域
    final Set<SchedulingPipelinedRegion> sourceRegions =
            IterableUtils.toStream(schedulingTopology.getAllPipelinedRegions())
                    .filter(this::isSourceRegion)
                    .collect(Collectors.toSet());
    
    // 2. 调度这些源头区域
    maybeScheduleRegions(sourceRegions);
}

8.2 ScheduleRegion

// PipelinedRegionSchedulingStrategy.java
private void scheduleRegion(final SchedulingPipelinedRegion region) {
    // 1. 检查区域内所有顶点都处于 CREATED 状态
    checkState(areRegionVerticesAllInCreatedState(region));
    
    // 2. 标记区域已调度
    scheduledRegions.add(region);
    
    // 3. 调用 SchedulerOperations 分配 Slot 并部署
    schedulerOperations.allocateSlotsAndDeploy(
            regionVerticesSorted.get(region));
}

9. 阶段七:Slot 分配和任务部署

9.1 DefaultScheduler.allocateSlotsAndDeploy()

// DefaultScheduler.java
@Override
public void allocateSlotsAndDeploy(final List<ExecutionVertexID> verticesToDeploy) {
    // 1. 记录顶点版本
    final Map<ExecutionVertexID, ExecutionVertexVersion> requiredVersionByVertex =
            executionVertexVersioner.recordVertexModifications(verticesToDeploy);
    
    // 2. 获取需要部署的 Execution 列表
    final List<Execution> executionsToDeploy =
            verticesToDeploy.stream()
                    .map(this::getCurrentExecutionOfVertex)
                    .collect(Collectors.toList());
    
    // 3. 调用 ExecutionDeployer 进行部署
    executionDeployer.allocateSlotsAndDeploy(executionsToDeploy, requiredVersionByVertex);
}

9.2 DefaultExecutionDeployer.allocateSlotsAndDeploy()

// DefaultExecutionDeployer.java
@Override
public void allocateSlotsAndDeploy(
        final List<Execution> executionsToDeploy,
        final Map<ExecutionVertexID, ExecutionVertexVersion> requiredVersionByVertex) {
    
    // 1. 验证执行状态
    validateExecutionStates(executionsToDeploy);
    
    // 2. 状态转换:CREATED -> SCHEDULED
    transitionToScheduled(executionsToDeploy);
    
    // 3. 为每个 Execution 分配 Slot
    final Map<ExecutionAttemptID, ExecutionSlotAssignment> executionSlotAssignmentMap =
            allocateSlotsFor(executionsToDeploy);
    
    // 4. 创建部署句柄
    final List<ExecutionDeploymentHandle> deploymentHandles =
            createDeploymentHandles(...);
    
    // 5. 等待所有 Slot 分配完成,然后部署
    waitForAllSlotsAndDeploy(deploymentHandles);
}

9.3 Execution.deploy() - 核心部署方法

// Execution.java
public void deploy() throws JobException {
    // 1. 获取分配的 Slot
    final LogicalSlot slot = assignedResource;
    
    // 2. 状态转换:SCHEDULED -> DEPLOYING
    if (!transitionState(previous, DEPLOYING)) {
        throw new IllegalStateException("Cannot deploy task");
    }
    
    // 3. 创建 TaskDeploymentDescriptor
    final TaskDeploymentDescriptor deployment =
            vertex.getExecutionGraphAccessor()
                    .getTaskDeploymentDescriptorFactory()
                    .createDeploymentDescriptor(...);
    
    // 4. 获取 TaskManagerGateway
    final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
    
    // 5. 异步提交任务到 TaskManager
    CompletableFuture.supplyAsync(
            () -> taskManagerGateway.submitTask(deployment, rpcTimeout),
            executor)
        .thenCompose(Function.identity())
        .whenCompleteAsync((ack, failure) -> {
            if (failure == null) {
                vertex.notifyCompletedDeployment(this);
            } else {
                markFailed(failure);
            }
        }, jobMasterMainThreadExecutor);
}

10. 阶段八:TaskManager 执行任务

10.0 RPC 调用链:TaskManagerGateway 到 TaskExecutor

Execution.deploy() 中调用 taskManagerGateway.submitTask(),这里的 RPC 调用链如下:

taskManagerGateway.submitTask(deployment, rpcTimeout)
    ↓
RpcTaskManagerGateway.submitTask() -- TaskManagerGateway 的实现类
    ↓
taskExecutorGateway.submitTask()   -- TaskExecutorGateway RPC 代理
    ↓
[Akka/Pekko RPC 网络传输]           -- 跨进程 RPC 调用
    ↓
TaskExecutor.submitTask()          -- TaskExecutor 实现 TaskExecutorGateway

关键类关系:

TaskManagerGateway (接口)        -- JobMaster 侧的抽象
    ↓ 实现
RpcTaskManagerGateway           -- 持有 TaskExecutorGateway 引用
    ↓ 委托
TaskExecutorGateway (RPC 接口)   -- Flink RPC 定义的远程接口
    ↓ 实现
TaskExecutor                    -- TaskManager 主类,实现 TaskExecutorGateway

RpcTaskManagerGateway 核心代码:

// RpcTaskManagerGateway.java
public class RpcTaskManagerGateway implements TaskManagerGateway {
    
    private final TaskExecutorGateway taskExecutorGateway;  // RPC 代理
    private final JobMasterId jobMasterId;
    
    @Override
    public CompletableFuture<Acknowledge> submitTask(TaskDeploymentDescriptor tdd, Time timeout) {
        // 委托给 TaskExecutorGateway,这是一个 RPC 调用
        return taskExecutorGateway.submitTask(tdd, jobMasterId, timeout);
    }
}

10.1 TaskExecutor.submitTask()

// TaskExecutor.java
@Override
public CompletableFuture<Acknowledge> submitTask(
        TaskDeploymentDescriptor tdd, JobMasterId jobMasterId, Time timeout) {
    
    // 1. 创建 Task 对象
    final Task task = new Task(
            tdd,
            memoryManager,
            ioManager,
            networkEnvironment,
            ...);
    
    // 2. 注册 Task
    taskSlotTable.addTask(task);
    
    // 3. 启动 Task 执行线程
    task.startTaskThread();
    
    return CompletableFuture.completedFuture(Acknowledge.get());
}

10.2 Task.run()

// Task.java
@Override
public void run() {
    try {
        // 1. 切换状态:DEPLOYING -> RUNNING
        transitionState(ExecutionState.DEPLOYING, ExecutionState.RUNNING);
        
        // 2. 初始化任务
        invokable = loadAndInstantiateInvokable(
                userCodeClassLoader, nameOfInvokableClass);
        
        // 3. 执行任务(调用用户代码)
        invokable.invoke();  // ← 用户代码在这里执行!
        
        // 4. 任务完成
        transitionState(ExecutionState.RUNNING, ExecutionState.FINISHED);
        
    } catch (Throwable t) {
        transitionState(ExecutionState.RUNNING, ExecutionState.FAILED);
    }
}

11. Task 状态流转

11.1 ExecutionState 枚举

// ExecutionState.java
public enum ExecutionState {
    CREATED,       // 初始状态
    SCHEDULED,     // 已调度,等待 Slot
    DEPLOYING,     // 正在部署到 TaskManager
    INITIALIZING,  // 正在初始化
    RUNNING,       // 运行中
    FINISHED,      // 成功完成
    CANCELING,     // 正在取消
    CANCELED,      // 已取消
    FAILED,        // 失败
    RECONCILING;   // 协调中
}

11.2 状态转换图

CREATED                  -- 初始状态
    ↓ allocateSlotsAndDeploy()
SCHEDULED                -- 已调度,等待 Slot
    ↓ Execution.deploy()
DEPLOYING                -- 正在部署到 TaskManager
    ↓
INITIALIZING             -- Task 在 TaskManager 上初始化
    ↓
RUNNING                  -- Task 开始处理数据
    ↓
    ├──→ FINISHED        -- 成功完成
    ├──→ CANCELING       -- 取消中
    │       ↓
    │    CANCELED        -- 已取消
    └──→ FAILED          -- 失败

12. 完整调用链

Client 提交作业
    ↓
JobSubmitHandler.handleRequest()   -- REST API 入口
    ↓
Dispatcher.submitJob()             -- 作业提交入口
    ↓
Dispatcher.runJob()                -- 启动 JobManagerRunner
    ↓
JobMasterServiceLeadershipRunner
  .start()                         -- 启动 Leader 选举
    ↓
grantLeadership()                  -- 获得 Leadership
    ↓
JobMasterServiceProcess.create()   -- 创建 JobMaster
    ↓
JobMaster.onStart()                -- JobMaster 启动
    ↓
startJobExecution()                -- 启动作业执行
    ↓
startScheduling()                  -- 触发调度
    ↓
SchedulerBase.startScheduling()    -- 注册指标,启动 Coordinator
    ↓
DefaultScheduler
  .startSchedulingInternal()       -- ExecutionGraph → RUNNING
    ↓
PipelinedRegionSchedulingStrategy
  .startScheduling()               -- 识别源头区域
    ↓
scheduleRegion()                   -- 调度单个区域
    ↓
DefaultScheduler
  .allocateSlotsAndDeploy()        -- 获取 Execution 列表
    ↓
DefaultExecutionDeployer
  .allocateSlotsAndDeploy()        -- CREATED → SCHEDULED,分配 Slot
    ↓
waitForAllSlotsAndDeploy()         -- 异步等待 Slot 分配完成
    ↓
deployAll()                        -- 遍历所有 DeploymentHandle
    ↓
Execution.deploy()                 -- SCHEDULED → DEPLOYING,创建 TDD
    ↓
taskManagerGateway.submitTask()    -- RPC 调用
    ↓
TaskExecutor.submitTask()          -- 创建 Task
    ↓
Task.startTaskThread()             -- 启动任务线程
    ↓
Task.run() → invokable.invoke()    -- 执行用户代码

13. 关键接口和实现

13.1 类图关系

图例说明(UML 标准简写):

作业管理层级:

Dispatcher --o JobManagerRunner <|.. JobMasterServiceLeadershipRunner
                                          |--o JobMasterServiceProcess 
                                                    <|.. DefaultJobMasterServiceProcess
                                                              |--o JobMasterService <|.. JobMaster

调度器继承体系:

SchedulerNG <|.. SchedulerBase <|-- DefaultScheduler
                                        |--o SchedulingStrategy 
                                                <|.. PipelinedRegionSchedulingStrategy
                                        |--o ExecutionDeployer 
                                                <|.. DefaultExecutionDeployer
                                        |--o ExecutionGraph

ExecutionGraph 内部结构:

ExecutionGraph <|.. DefaultExecutionGraph
                         |--o ExecutionJobVertex[] --o ExecutionVertex[] --o Execution
                         |--o IntermediateResult[]

13.2 组件职责


14. 调试指南

14.1 关键断点位置

14.2 关键日志

# 作业提交
"Received JobGraph submission '{}' ({})."
"Submitting job '{}' ({})."

# 调度开始
"Starting scheduling with scheduling strategy [{}]"

# 区域调度
"Scheduling pipelined region"

# Slot 分配
"Allocating slot for execution"

# 任务部署
"Deploying {} to {}"

Flink Web UI 的 REST API 不是JobMaster.onStart() 中启动的,而是在集群启动时DispatcherResourceManagerComponent 组件启动。

15.1 REST API 启动时机

REST API 在 Flink 集群启动时初始化,早于任何作业提交:

ClusterEntrypoint.runCluster()
    ↓
DefaultDispatcherResourceManagerComponentFactory.create()
    ↓
restEndpointFactory.createRestEndpoint()   -- 创建 WebMonitorEndpoint
    ↓
webMonitorEndpoint.start()                 -- 启动 REST Server
    ↓
dispatcherRunner.start()                   -- 启动 Dispatcher
    ↓
resourceManagerService.start()             -- 启动 ResourceManager

15.2 核心源码位置

15.3 REST API 启动流程

// DefaultDispatcherResourceManagerComponentFactory.java
public DispatcherResourceManagerComponent create(...) {
    // 1. 创建 GatewayRetriever(用于获取 Dispatcher Leader)
    final LeaderGatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever =
            new RpcGatewayRetriever<>(...);
    
    // 2. 创建 REST Endpoint
    webMonitorEndpoint = restEndpointFactory.createRestEndpoint(
            configuration,
            dispatcherGatewayRetriever,
            resourceManagerGatewayRetriever,
            ...);
    
    // 3. 启动 REST Server(此时 Web UI 可访问)
    log.debug("Starting Dispatcher REST endpoint.");
    webMonitorEndpoint.start();
    
    // 4. 之后才启动 Dispatcher 和 ResourceManager
    dispatcherRunner = dispatcherRunnerFactory.createDispatcherRunner(...);
    resourceManagerService.start();
}

15.4 Handler 注册机制

WebMonitorEndpoint.initializeHandlers() 中注册所有 REST Handler:

// WebMonitorEndpoint.java
@Override
protected List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> initializeHandlers(...) {
    ArrayList<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> handlers = new ArrayList<>();
    
    // 集群概览
    handlers.add(Tuple2.of(ClusterOverviewHeaders.getInstance(), clusterOverviewHandler));
    
    // 作业列表
    handlers.add(Tuple2.of(JobsOverviewHeaders.getInstance(), jobsOverviewHandler));
    
    // 作业详情
    handlers.add(Tuple2.of(JobDetailsHeaders.getInstance(), jobDetailsHandler));
    
    // Checkpoint 相关
    handlers.add(Tuple2.of(CheckpointConfigHeaders.getInstance(), checkpointConfigHandler));
    handlers.add(Tuple2.of(CheckpointingStatisticsHeaders.getInstance(), checkpointStatisticsHandler));
    
    // TaskManager 相关
    handlers.add(Tuple2.of(TaskManagersHeaders.getInstance(), taskManagersHandler));
    handlers.add(Tuple2.of(TaskManagerDetailsHeaders.getInstance(), taskManagerDetailsHandler));
    
    // ... 更多 Handler
    return handlers;
}

DispatcherRestEndpoint 继承 WebMonitorEndpoint,额外添加 JobSubmitHandler

// DispatcherRestEndpoint.java
@Override
protected List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> initializeHandlers(...) {
    List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> handlers =
            super.initializeHandlers(localAddressFuture);  // 继承父类 Handler
    
    // 添加作业提交 Handler(Dispatcher 特有)
    JobSubmitHandler jobSubmitHandler = new JobSubmitHandler(...);
    handlers.add(Tuple2.of(jobSubmitHandler.getMessageHeaders(), jobSubmitHandler));
    
    return handlers;
}

15.5 关键设计点


本文档基于 Flink 1.19 源码分析编写