Flink源码-RPC体系

2026/01/05

Tags: Flink

本文档详细梳理 Flink 中的 RPC(远程过程调用)体系架构,包括 RPC 系统的初始化流程、核心类关系以及各组件之间的通信模式。


1. 整体架构概览

Flink 的 RPC 体系基于 Apache Pekko (原 Akka) Actor 模型实现,提供了一套完整的分布式通信框架。

RpcEndpoint (业务逻辑) <---> RpcService (RPC 服务管理) <---> RpcGateway (远程代理接口)
     ↑                            ↑                              ↑
     |                            |                              |
PekkoRpcActor (Actor 实现)   PekkoRpcService (Pekko 实现)   PekkoInvocationHandler (动态代理)
                                  ↓
                            ActorSystem (Pekko 运行时)

2. 核心类和文件位置


3. RPC 系统加载和初始化流程

3.1 RPC 系统加载

Flink 使用 Java SPI(ServiceLoader)机制加载 RPC 系统:

RpcSystem.load(Configuration config)
       ↓
ServiceLoader.load(RpcSystemLoader.class)  -- 加载所有 RpcSystemLoader 实现
       ↓
按 loadPriority 排序                        -- 优先级排序
       ↓
PekkoRpcSystemLoader.loadRpcSystem()       -- 默认 Pekko 加载器
       ↓
提取 flink-rpc-akka.jar                    -- 从资源中提取 jar
       ↓
创建 SubmoduleClassLoader                  -- 隔离类加载器
       ↓
ServiceLoader.load(RpcSystem.class)        -- 加载 PekkoRpcSystem
       ↓
返回 CleanupOnCloseRpcSystem               -- 包装清理逻辑

关键源码:

// RpcSystem.java - 加载入口
static RpcSystem load(Configuration config) {
    final PriorityQueue<RpcSystemLoader> rpcSystemLoaders =
            new PriorityQueue<>(Comparator.comparingInt(RpcSystemLoader::getLoadPriority));
    ServiceLoader.load(RpcSystemLoader.class).forEach(rpcSystemLoaders::add);
    
    while (iterator.hasNext()) {
        return next.loadRpcSystem(config);
    }
}

// PekkoRpcSystemLoader.java - Pekko 加载器
public RpcSystem loadRpcSystem(Configuration config) {
    // 1. 提取 flink-rpc-akka.jar 到临时目录
    final InputStream resourceStream = flinkClassLoader.getResourceAsStream(FLINK_RPC_PEKKO_FAT_JAR);
    IOUtils.copyBytes(resourceStream, Files.newOutputStream(tempFile));
    
    // 2. 创建隔离的类加载器
    final SubmoduleClassLoader submoduleClassLoader =
            new SubmoduleClassLoader(new URL[] {tempFile.toUri().toURL()}, flinkClassLoader);
    
    // 3. 通过 SPI 加载 PekkoRpcSystem
    return new CleanupOnCloseRpcSystem(
            ServiceLoader.load(RpcSystem.class, submoduleClassLoader).iterator().next(),
            submoduleClassLoader,
            tempFile);
}

3.2 RpcService 创建

RpcSystem.remoteServiceBuilder()           -- 创建远程服务构建器
       ↓
RpcServiceBuilder.createAndStart()         -- 构建并启动
       ↓
PekkoUtils.createActorSystem()             -- 创建 Pekko ActorSystem
       ↓
new PekkoRpcService(actorSystem, config)   -- 创建 RpcService
       ↓
startSupervisorActor()                     -- 启动监管 Actor
       ↓
startDeadLettersActor()                    -- 启动死信处理 Actor

ActorSystem 启动:

// PekkoRpcService 构造函数
PekkoRpcService(ActorSystem actorSystem, PekkoRpcServiceConfiguration configuration) {
    this.actorSystem = checkNotNull(actorSystem);
    this.configuration = checkNotNull(configuration);
    
    // 获取地址和端口
    Address actorSystemAddress = PekkoUtils.getAddress(actorSystem);
    address = actorSystemAddress.host().isDefined() ? actorSystemAddress.host().get() : "";
    port = actorSystemAddress.port().isDefined() ? (Integer) actorSystemAddress.port().get() : -1;
    
    // 启动监管 Actor
    supervisor = startSupervisorActor();
    
    // 启动死信处理 Actor
    startDeadLettersActor();
}

3.3 RpcEndpoint 启动流程

new XxxEndpoint(rpcService, ...)           -- 创建 RpcEndpoint 子类
       ↓
RpcEndpoint 构造函数
       ↓
rpcService.startServer(this)               -- 注册到 RpcService
       ↓
registerRpcActor(rpcEndpoint)              -- 创建并注册 Actor
       ↓
SupervisorActor.startRpcActor()            -- 通过监管 Actor 创建
       ↓
context.actorOf(Props.create(...))         -- 创建 PekkoRpcActor
       ↓
Proxy.newProxyInstance(...)                -- 创建 RpcServer 代理
       ↓
返回 RpcServer                              -- 赋值给 rpcServer 字段
       ↓
rpcEndpoint.start()                        -- 调用 start() 方法
       ↓
rpcServer.start()                          -- 发送 START 消息
       ↓
PekkoRpcActor 状态: STOPPED → STARTED      -- Actor 开始处理消息
       ↓
internalCallOnStart()                      -- 调用 onStart() 回调

关键源码:

// RpcEndpoint.java - 构造函数
protected RpcEndpoint(final RpcService rpcService, final String endpointId) {
    this.rpcService = checkNotNull(rpcService);
    this.endpointId = checkNotNull(endpointId);
    
    // 关键:启动 RPC 服务器,返回代理对象
    this.rpcServer = rpcService.startServer(this);
    
    this.mainThreadExecutor = new MainThreadExecutor(rpcServer, ...);
}

// PekkoRpcService.java - 启动服务器
public <C extends RpcEndpoint & RpcGateway> RpcServer startServer(C rpcEndpoint) {
    // 1. 注册 RPC Actor
    final SupervisorActor.ActorRegistration actorRegistration = registerRpcActor(rpcEndpoint);
    final ActorRef actorRef = actorRegistration.getActorRef();
    
    // 2. 创建代理处理器
    final InvocationHandler invocationHandler;
    if (rpcEndpoint instanceof FencedRpcEndpoint) {
        invocationHandler = new FencedPekkoInvocationHandler<>(...);
    } else {
        invocationHandler = new PekkoInvocationHandler(...);
    }
    
    // 3. 创建 JDK 动态代理
    RpcServer server = (RpcServer) Proxy.newProxyInstance(
            classLoader,
            implementedRpcGateways.toArray(new Class<?>[0]),
            invocationHandler);
    
    return server;
}

4. RPC 核心类图

图例说明(UML 标准简写):

4.1 RPC 核心接口继承关系

RpcGateway 继承体系:

RpcGateway <|-- FencedRpcGateway<F>
                    <|-- JobMasterGateway
                    <|-- TaskExecutorGateway
                    <|-- DispatcherGateway
                    <|-- ResourceManagerGateway

RpcServer 继承体系:

StartStoppable ────┐
MainThreadExecutable ─┼─<|-- RpcServer <|.. PekkoInvocationHandler
RpcGateway ────────┘

4.2 RpcEndpoint 继承关系

RpcEndpoint <|-- FencedRpcEndpoint<F>
                      <|-- JobMaster ..|> JobMasterGateway
                      <|-- Dispatcher ..|> DispatcherGateway
                      <|-- ResourceManager ..|> ResourceManagerGateway
                      <|-- TaskExecutor ..|> TaskExecutorGateway

4.3 RpcService 实现关系

RpcService <|.. PekkoRpcService
                     |--o ActorSystem
                     |--o SupervisorActor --o PekkoRpcActor[]
                     |--o Configuration

4.4 RPC 消息类继承关系

Message <|-- RpcInvocation <|.. LocalRpcInvocation
        |                  <|.. RemoteRpcInvocation
        <|.. RunAsync
        <|.. CallAsync

4.5 Actor 类继承关系

AbstractActor <|-- PekkoRpcActor <|-- FencedPekkoRpcActor<F>
              <|-- SupervisorActor

5. RPC 调用流程详解

5.1 核心接口职责说明

核心交互流程:

调用方 → RpcGateway(代理) → PekkoInvocationHandler → RpcInvocation → PekkoRpcActor → RpcEndpoint
                                                                                         ↓
结果   ←                                                                             业务方法()

说明:调用方获取的 RpcGateway 实际上是一个 JDK 动态代理对象,其 InvocationHandler 就是 PekkoInvocationHandler。 该代理同时实现了 RpcGateway(定义业务方法)和 RpcServer(提供 start/stop 等生命周期方法)接口。

流程说明:

  1. 调用方通过 RpcGateway 代理发起方法调用(代理背后是 PekkoInvocationHandler)
  2. PekkoInvocationHandler 将调用封装为 RpcInvocation 消息
  3. 消息发送到 PekkoRpcActor 的消息队列
  4. Actor 通过反射调用 RpcEndpoint 的业务方法
  5. 结果原路返回给调用方

各接口交互关系:

RpcService (RPC 运行时容器)
│
├─ 管理的 RpcEndpoint:
│     JobMaster ↔ PekkoRpcActor
│     Dispatcher ↔ PekkoRpcActor
│     TaskExecutor ↔ PekkoRpcActor
│
└─ 底层: ActorSystem (Pekko 运行时)

RpcService 职责:

RpcGateway、RpcEndpoint、RpcServer、RpcService 四者关系:

                    RpcService (容器/工厂)
                    • startServer(endpoint) → 创建 Actor + 返回代理
                    • connect(address) → 获取远程 Gateway
                            │
         ┌──────────────────┼──────────────────┐
         │ 创建              │ 管理              │ 创建代理
         ↓                  ↓                  ↓
   PekkoRpcActor      RpcEndpoint ───持有──→ RpcServer (JDK 动态代理)
   (消息处理)         (业务实现)              • 实现 RpcGateway + 生命周期控制
         │                  │                  │
         │                  │ implements       │ 实际是
         │                  ↓                  ↓
         └────反射调用──→ RpcGateway ←───── 调用方使用
                        (接口定义)

关系说明:

示例 - JobMaster:

class JobMaster extends RpcEndpoint implements JobMasterGateway {
    // RpcEndpoint 提供: rpcServer 字段
    // JobMasterGateway 定义: 可被远程调用的方法
}

5.2 本地调用流程

当调用者和被调用者在同一个 JVM 中时:

调用方: gateway.someMethod(args)
       ↓
PekkoInvocationHandler.invoke()
       ↓
判断方法类型
       ↓
invokeRpc(method, args)
       ↓
createRpcInvocationMessage()              -- 创建 LocalRpcInvocation
       ↓
tell(rpcInvocation) 或 ask(rpcInvocation) -- 发送到 Actor
       ↓
PekkoRpcActor.handleRpcInvocation()       -- Actor 接收消息
       ↓
lookupRpcMethod()                         -- 反射查找方法
       ↓
method.invoke(rpcEndpoint, args)          -- 调用 RpcEndpoint 方法
       ↓
返回结果或 CompletableFuture

5.3 远程调用流程

当调用者和被调用者在不同 JVM 中时:

调用方: gateway.someMethod(args)
       ↓
PekkoInvocationHandler.invoke()
       ↓
createRpcInvocationMessage()              -- 创建 RemoteRpcInvocation
       ↓
序列化参数                                 -- 参数 → 字节数组
       ↓
Patterns.ask(rpcEndpoint, message, timeout)
       ↓
[网络传输 - Pekko Remoting]               -- 跨 JVM 传输
       ↓
远程 PekkoRpcActor.handleRpcInvocation()  -- 远程 Actor 接收
       ↓
反序列化参数                               -- 字节数组 → 参数
       ↓
method.invoke(rpcEndpoint, args)          -- 调用目标方法
       ↓
序列化结果                                 -- 结果 → 字节数组
       ↓
[网络传输返回]
       ↓
反序列化结果                               -- 字节数组 → 结果
       ↓
返回给调用方

5.4 消息处理状态机

PekkoRpcActor 内部维护一个状态机来管理消息处理:

STOPPED (初始状态)
    ↓ ControlMessages.START
STARTED (运行状态)
    ↓ ControlMessages.TERMINATE
TERMINATING (终止中)
    ↓ Actor 停止
TERMINATED (已终止)

状态说明:


6.1 组件通信架构图

Client (REST API)
    ↓ HTTP
JobManager
    ├── Dispatcher ←→ JobMaster ←→ ResourceManager
    ↓ RPC          ↓ RPC         ↓ RPC
TaskManager(s)
    └── TaskExecutor
            └── Task, Task, Task, Task ...

6.2 各组件 RPC 通信详解

6.2.1 Dispatcher(作业分发器)

角色:接收客户端提交的作业,管理多个 JobMaster

Gateway 继承关系

DispatcherGateway
    extends FencedRpcGateway<DispatcherId>
    extends RestfulGateway

主要 RPC 方法

通信模式

Client (REST) ---HTTP---> Dispatcher ---内部调用---> JobMaster

6.2.2 JobMaster(作业主节点)

角色:管理单个作业的执行,协调 Task 调度

Gateway 继承关系

JobMasterGateway
    extends CheckpointCoordinatorGateway
    extends FencedRpcGateway<JobMasterId>
    extends KvStateLocationOracle
    extends BlocklistListener

主要 RPC 方法

通信模式

ResourceManager <---RPC---> JobMaster <---RPC---> TaskExecutor
                                ↓ RPC
                            Dispatcher

6.2.3 ResourceManager(资源管理器)

角色:管理集群资源,分配 Slot 给 JobMaster

Gateway 继承关系

ResourceManagerGateway
    extends FencedRpcGateway<ResourceManagerId>
    extends ClusterPartitionManager
    extends BlocklistListener

主要 RPC 方法

通信模式

JobMaster (Scheduler) <---RPC---> ResourceManager (SlotManager) <---RPC---> TaskExecutor (SlotTable)
       ↓ requestSlots                    ↓ allocateSlot

6.2.4 TaskExecutor(任务执行器)

角色:执行具体的 Task,管理本地资源

Gateway 继承关系

TaskExecutorGateway
    extends RpcGateway
    extends TaskExecutorOperatorEventGateway
    extends TaskExecutorThreadInfoGateway

主要 RPC 方法

通信模式

ResourceManager ---requestSlot---> TaskExecutor
JobMaster <---RPC---> TaskExecutor
    ↓ submitTask           ↓
Execution            Task.run()

6.3 典型通信场景

场景一:作业提交流程

Client
   │
   │ 1. POST /jobs (HTTP)
   ↓
Dispatcher
   │
   │ 2. createJobManagerRunner()
   ↓
JobMaster (创建)
   │
   │ 3. registerJobMaster() (RPC)
   ↓
ResourceManager
   │
   │ 4. declareRequiredResources() (RPC)
   │
   │ 5. requestSlot() (RPC)
   ↓
TaskExecutor
   │
   │ 6. offerSlots() (RPC)
   ↓
JobMaster
   │
   │ 7. submitTask() (RPC)
   ↓
TaskExecutor
   │
   │ 8. Task.run()
   ↓
用户代码执行

场景二:心跳机制

心跳发送方向:
TaskExecutor ---heartbeat---> JobMaster
TaskExecutor ---heartbeat---> ResourceManager
JobMaster -----heartbeat---> ResourceManager

场景三:Checkpoint 流程

JobMaster (CheckpointCoordinator)
   │
   │ 1. triggerCheckpoint() (RPC)
   ↓
TaskExecutor (所有 Source Task)
   │
   │ 2. Barrier 传播 (数据流)
   ↓
下游 Task
   │
   │ 3. acknowledgeCheckpoint() (RPC)
   ↓
JobMaster
   │
   │ 4. 所有 Task 确认后完成 Checkpoint

6.4 服务发现机制

Flink RPC 地址不是写死的,而是通过 服务发现机制 动态获取。

核心组件

工作流程

组件启动流程(以 JobMaster 发现 ResourceManager 为例):

JobMaster 启动
    ↓
haServices.getResourceManagerLeaderRetriever()   // 获取服务发现器
    ↓
leaderRetriever.start(listener)                  // 开始监听
    ↓
[ZooKeeper/内存] 通知 Leader 地址变化             // 后端存储
    ↓
listener.notifyLeaderAddress(address, leaderID)  // 回调通知
    ↓
rpcService.connect(address, fencingToken, gateway) // 连接到 Leader
    ↓
ResourceManagerGateway                           // 可以进行 RPC 调用

实现方式

Flink 提供多种 HighAvailabilityServices 实现:

HighAvailabilityServices 提供的服务发现:

haServices
    ├── getResourceManagerLeaderRetriever()      → 发现 ResourceManager
    ├── getDispatcherLeaderRetriever()           → 发现 Dispatcher  
    ├── getJobManagerLeaderRetriever(jobId)      → 发现特定作业的 JobMaster
    └── getClusterRestEndpointLeaderRetriever()  → 发现 REST 端点

典型使用示例

// JobMaster 中发现 ResourceManager
public class JobMaster extends FencedRpcEndpoint<JobMasterId> {
    private final LeaderRetrievalService resourceManagerLeaderRetriever;
    
    public JobMaster(HighAvailabilityServices haServices, ...) {
        // 从 HA 服务获取 ResourceManager 的服务发现器
        this.resourceManagerLeaderRetriever = 
            haServices.getResourceManagerLeaderRetriever();
    }
    
    @Override
    protected void onStart() {
        // 启动服务发现,监听 ResourceManager Leader 变化
        resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());
    }
    
    // Leader 变化时的回调
    private class ResourceManagerLeaderListener implements LeaderRetrievalListener {
        @Override
        public void notifyLeaderAddress(String leaderAddress, UUID leaderSessionID) {
            // 连接到新的 ResourceManager Leader
            rpcService.connect(leaderAddress, 
                              ResourceManagerId.fromUuid(leaderSessionID), 
                              ResourceManagerGateway.class);
        }
    }
}

服务发现与 RPC 的关系

                  HighAvailabilityServices (服务发现中心)
                              │
       ┌──────────────────────┼──────────────────────┐
       ↓                      ↓                      ↓
LeaderRetriever        LeaderRetriever        LeaderRetriever
(ResourceManager)      (Dispatcher)           (JobMaster)
       │                      │                      │
       │ 监听地址变化          │                      │
       ↓                      ↓                      ↓
notifyLeaderAddress    notifyLeaderAddress    notifyLeaderAddress
       │                      │                      │
       │ 调用 RpcService.connect()                   │
       ↓                      ↓                      ↓
RpcGateway 代理         RpcGateway 代理        RpcGateway 代理

6.5 Fencing Token 机制

Fencing Token(隔离令牌)是 Flink 用来解决 分布式系统脑裂问题(Split-Brain) 的关键机制。

6.4.1 问题背景

在分布式高可用(HA)场景下,可能同时存在多个 JobManager 实例:

场景:网络分区导致的脑裂

时刻 T1: 正常状态
    JobManager-A (Leader, 活跃)
    JobManager-B (Standby, 备用)

时刻 T2: 网络分区
    JobManager-A (以为自己还是 Leader)  ←── 网络分区 ──→  JobManager-B (被选举为新 Leader)
         ↓                                                    ↓
    继续发送 RPC 命令                                      开始发送 RPC 命令
         ↓                                                    ↓
    TaskExecutor 收到两个"Leader"的命令,怎么办?

如果没有 Fencing Token,TaskExecutor 无法区分哪个 JobMaster 是"合法的 Leader",可能导致:

6.4.2 工作原理

核心思想:每次 Leader 选举时生成一个 唯一的令牌(UUID),所有 RPC 请求都必须携带这个令牌,接收方验证令牌是否匹配。

Fencing Token 验证流程:

1. 旧 JobMaster (Token: UUID-A) 发送 RPC
    ↓
2. TaskExecutor 收到消息
    ↓
3. 新 JobMaster (Token: UUID-B) 已注册到 TaskExecutor
    ↓
4. Token 不匹配 (UUID-A ≠ UUID-B)
    ↓
5. 拒绝请求,抛出 FencingTokenException

6.4.3 实现方式

Flink 通过以下类实现 Fencing Token 机制:

6.4.4 典型应用

// JobMaster 继承 FencedRpcEndpoint,使用 JobMasterId 作为 Fencing Token
public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMasterGateway {
    // JobMasterId 是一个包装了 UUID 的类
    // 每次 JobMaster 启动时生成新的 UUID
}

// ResourceManager 同样使用 Fencing Token
public class ResourceManager extends FencedRpcEndpoint<ResourceManagerId> 
        implements ResourceManagerGateway {
    // ...
}

6.4.5 消息验证过程

FencedPekkoRpcActor 中的验证逻辑:

收到 FencedMessage<F>
    ↓
获取消息中的 Token
    ↓
与本地存储的 Token 比较
    ↓
匹配? ──否──→ 丢弃消息或返回 FencingTokenException
    ↓ 是
正常处理 RPC 调用

6.4.6 为什么同进程组件也需要心跳?

虽然 ResourceManager 和 JobMaster 在同一个 JVM 进程中,但它们之间仍然需要心跳机制:

JobManager 进程内部:
    │
    ├── ResourceManager (独立的 RpcEndpoint/Actor)
    │       ↑
    │       │ RPC 心跳消息 (仍然走 Actor 消息机制)
    │       ↓
    ├── JobMaster (独立的 RpcEndpoint/Actor)
    │
    └── Dispatcher

心跳的真正目的不仅仅是检测网络连通性,更重要的是:

6.4.7 总结


7. 关键设计模式

7.1 Actor 模型

Flink RPC 基于 Actor 模型,每个 RpcEndpoint 对应一个 Actor:

Actor 模型:
Actor A (Mailbox) ----消息----> Actor B (Mailbox)
     ↓ 单线程处理                    ↓ 单线程处理
   处理消息                        处理消息
   更新状态                        更新状态

优点:无锁并发、消息驱动、故障隔离

7.2 动态代理模式

RPC 调用通过 JDK 动态代理实现:

动态代理模式:
调用方
    ↓ gateway.someMethod(args)
JDK Proxy (RpcGateway 接口)
    ↓ 委托
PekkoInvocationHandler
    - 创建 RpcInvocation
    - 发送到 Actor
    - 处理返回值

7.3 监管策略

SupervisorActor 监管所有 RpcActor 的生命周期:

监管策略:
SupervisorActor
    ├── RpcActor
    ├── RpcActor
    └── RpcActor
            ↓ 异常
        SupervisorActorSupervisorStrategy
            ↓ 决策: Stop (不重启)
        rpcActorFailed() → 终止整个 ActorSystem

8. 调试指南

8.1 关键断点位置

8.2 关键日志

# RPC 服务启动
"Starting RPC endpoint for {} at {}."

# RPC 连接
"Try to connect to remote RPC endpoint with address {}."

# Actor 终止
"RpcActor {} has terminated."

# 状态转换
"Starting {} with name {}."

8.3 常见问题排查

问题1:RPC 调用超时

问题2:序列化错误

问题3:Actor 异常终止


9. 配置参数

9.1 RPC 相关配置


10. 总结

Flink RPC 体系的核心特点:

  1. 基于 Actor 模型:利用 Pekko Actor 实现消息驱动的异步通信
  2. 动态代理透明化:通过 JDK 动态代理使远程调用像本地调用一样简单
  3. 单线程保证:每个 RpcEndpoint 的方法都在其主线程中执行,避免并发问题
  4. Fencing Token 防脑裂:通过令牌机制确保只有合法的 Leader 能够执行操作
  5. 监管策略:SupervisorActor 统一管理所有 RpcActor 的生命周期
  6. 本地/远程统一:同一套接口既支持本地调用也支持远程调用

本文档基于 Flink 1.19 源码分析编写