本文档详细梳理 Flink 中的 RPC(远程过程调用)体系架构,包括 RPC 系统的初始化流程、核心类关系以及各组件之间的通信模式。
1. 整体架构概览
Flink 的 RPC 体系基于 Apache Pekko (原 Akka) Actor 模型实现,提供了一套完整的分布式通信框架。
RpcEndpoint (业务逻辑) <---> RpcService (RPC 服务管理) <---> RpcGateway (远程代理接口)
↑ ↑ ↑
| | |
PekkoRpcActor (Actor 实现) PekkoRpcService (Pekko 实现) PekkoInvocationHandler (动态代理)
↓
ActorSystem (Pekko 运行时)
2. 核心类和文件位置
2.1 RPC 核心接口(flink-rpc-core)
RpcEndpoint-flink-rpc-core/.../rpc/RpcEndpoint.java- RPC 端点基类,所有提供 RPC 服务的组件都继承它RpcGateway-flink-rpc-core/.../rpc/RpcGateway.java- RPC 网关接口,定义可远程调用的方法RpcServer-flink-rpc-core/.../rpc/RpcServer.java- RPC 服务器接口,组合了多个功能接口RpcService-flink-rpc-core/.../rpc/RpcService.java- RPC 服务接口,管理 RPC 端点的生命周期RpcSystem-flink-rpc-core/.../rpc/RpcSystem.java- RPC 系统工厂接口,用于创建 RpcServiceRpcSystemLoader-flink-rpc-core/.../rpc/RpcSystemLoader.java- RPC 系统加载器接口FencedRpcEndpoint-flink-rpc-core/.../rpc/FencedRpcEndpoint.java- 带隔离令牌的 RPC 端点,防止脑裂FencedRpcGateway-flink-rpc-core/.../rpc/FencedRpcGateway.java- 带隔离令牌的 RPC 网关MainThreadExecutable-flink-rpc-core/.../rpc/MainThreadExecutable.java- 主线程执行接口
2.2 Pekko 实现类(flink-rpc-akka)
PekkoRpcService-flink-rpc-akka/.../pekko/PekkoRpcService.java- RpcService 的 Pekko 实现PekkoRpcActor-flink-rpc-akka/.../pekko/PekkoRpcActor.java- RPC Actor,处理 RPC 消息PekkoInvocationHandler-flink-rpc-akka/.../pekko/PekkoInvocationHandler.java- JDK 动态代理处理器FencedPekkoRpcActor-flink-rpc-akka/.../pekko/FencedPekkoRpcActor.java- 带隔离令牌的 RPC ActorFencedPekkoInvocationHandler-flink-rpc-akka/.../pekko/FencedPekkoInvocationHandler.java- 带隔离令牌的代理处理器SupervisorActor-flink-rpc-akka/.../pekko/SupervisorActor.java- 监管 Actor,管理 RPC Actor 生命周期ControlMessages-flink-rpc-akka/.../pekko/ControlMessages.java- 控制消息枚举(START/STOP/TERMINATE)
2.3 RPC 消息类(flink-rpc-core)
RpcInvocation-flink-rpc-core/.../rpc/messages/RpcInvocation.java- RPC 调用消息接口LocalRpcInvocation-flink-rpc-core/.../rpc/messages/LocalRpcInvocation.java- 本地 RPC 调用(不序列化)RemoteRpcInvocation-flink-rpc-core/.../rpc/messages/RemoteRpcInvocation.java- 远程 RPC 调用(需序列化)RunAsync-flink-rpc-core/.../rpc/messages/RunAsync.java- 异步执行 RunnableCallAsync-flink-rpc-core/.../rpc/messages/CallAsync.java- 异步执行 Callable
2.4 各组件的 Gateway 接口(flink-runtime)
JobMasterGateway-flink-runtime/.../jobmaster/JobMasterGateway.java- JobMaster 的 RPC 接口TaskExecutorGateway-flink-runtime/.../taskexecutor/TaskExecutorGateway.java- TaskExecutor 的 RPC 接口DispatcherGateway-flink-runtime/.../dispatcher/DispatcherGateway.java- Dispatcher 的 RPC 接口ResourceManagerGateway-flink-runtime/.../resourcemanager/ResourceManagerGateway.java- ResourceManager 的 RPC 接口
3. RPC 系统加载和初始化流程
3.1 RPC 系统加载
Flink 使用 Java SPI(ServiceLoader)机制加载 RPC 系统:
RpcSystem.load(Configuration config)
↓
ServiceLoader.load(RpcSystemLoader.class) -- 加载所有 RpcSystemLoader 实现
↓
按 loadPriority 排序 -- 优先级排序
↓
PekkoRpcSystemLoader.loadRpcSystem() -- 默认 Pekko 加载器
↓
提取 flink-rpc-akka.jar -- 从资源中提取 jar
↓
创建 SubmoduleClassLoader -- 隔离类加载器
↓
ServiceLoader.load(RpcSystem.class) -- 加载 PekkoRpcSystem
↓
返回 CleanupOnCloseRpcSystem -- 包装清理逻辑
关键源码:
// RpcSystem.java - 加载入口
static RpcSystem load(Configuration config) {
final PriorityQueue<RpcSystemLoader> rpcSystemLoaders =
new PriorityQueue<>(Comparator.comparingInt(RpcSystemLoader::getLoadPriority));
ServiceLoader.load(RpcSystemLoader.class).forEach(rpcSystemLoaders::add);
while (iterator.hasNext()) {
return next.loadRpcSystem(config);
}
}
// PekkoRpcSystemLoader.java - Pekko 加载器
public RpcSystem loadRpcSystem(Configuration config) {
// 1. 提取 flink-rpc-akka.jar 到临时目录
final InputStream resourceStream = flinkClassLoader.getResourceAsStream(FLINK_RPC_PEKKO_FAT_JAR);
IOUtils.copyBytes(resourceStream, Files.newOutputStream(tempFile));
// 2. 创建隔离的类加载器
final SubmoduleClassLoader submoduleClassLoader =
new SubmoduleClassLoader(new URL[] {tempFile.toUri().toURL()}, flinkClassLoader);
// 3. 通过 SPI 加载 PekkoRpcSystem
return new CleanupOnCloseRpcSystem(
ServiceLoader.load(RpcSystem.class, submoduleClassLoader).iterator().next(),
submoduleClassLoader,
tempFile);
}
3.2 RpcService 创建
RpcSystem.remoteServiceBuilder() -- 创建远程服务构建器
↓
RpcServiceBuilder.createAndStart() -- 构建并启动
↓
PekkoUtils.createActorSystem() -- 创建 Pekko ActorSystem
↓
new PekkoRpcService(actorSystem, config) -- 创建 RpcService
↓
startSupervisorActor() -- 启动监管 Actor
↓
startDeadLettersActor() -- 启动死信处理 Actor
ActorSystem 启动:
// PekkoRpcService 构造函数
PekkoRpcService(ActorSystem actorSystem, PekkoRpcServiceConfiguration configuration) {
this.actorSystem = checkNotNull(actorSystem);
this.configuration = checkNotNull(configuration);
// 获取地址和端口
Address actorSystemAddress = PekkoUtils.getAddress(actorSystem);
address = actorSystemAddress.host().isDefined() ? actorSystemAddress.host().get() : "";
port = actorSystemAddress.port().isDefined() ? (Integer) actorSystemAddress.port().get() : -1;
// 启动监管 Actor
supervisor = startSupervisorActor();
// 启动死信处理 Actor
startDeadLettersActor();
}
3.3 RpcEndpoint 启动流程
new XxxEndpoint(rpcService, ...) -- 创建 RpcEndpoint 子类
↓
RpcEndpoint 构造函数
↓
rpcService.startServer(this) -- 注册到 RpcService
↓
registerRpcActor(rpcEndpoint) -- 创建并注册 Actor
↓
SupervisorActor.startRpcActor() -- 通过监管 Actor 创建
↓
context.actorOf(Props.create(...)) -- 创建 PekkoRpcActor
↓
Proxy.newProxyInstance(...) -- 创建 RpcServer 代理
↓
返回 RpcServer -- 赋值给 rpcServer 字段
↓
rpcEndpoint.start() -- 调用 start() 方法
↓
rpcServer.start() -- 发送 START 消息
↓
PekkoRpcActor 状态: STOPPED → STARTED -- Actor 开始处理消息
↓
internalCallOnStart() -- 调用 onStart() 回调
关键源码:
// RpcEndpoint.java - 构造函数
protected RpcEndpoint(final RpcService rpcService, final String endpointId) {
this.rpcService = checkNotNull(rpcService);
this.endpointId = checkNotNull(endpointId);
// 关键:启动 RPC 服务器,返回代理对象
this.rpcServer = rpcService.startServer(this);
this.mainThreadExecutor = new MainThreadExecutor(rpcServer, ...);
}
// PekkoRpcService.java - 启动服务器
public <C extends RpcEndpoint & RpcGateway> RpcServer startServer(C rpcEndpoint) {
// 1. 注册 RPC Actor
final SupervisorActor.ActorRegistration actorRegistration = registerRpcActor(rpcEndpoint);
final ActorRef actorRef = actorRegistration.getActorRef();
// 2. 创建代理处理器
final InvocationHandler invocationHandler;
if (rpcEndpoint instanceof FencedRpcEndpoint) {
invocationHandler = new FencedPekkoInvocationHandler<>(...);
} else {
invocationHandler = new PekkoInvocationHandler(...);
}
// 3. 创建 JDK 动态代理
RpcServer server = (RpcServer) Proxy.newProxyInstance(
classLoader,
implementedRpcGateways.toArray(new Class<?>[0]),
invocationHandler);
return server;
}
4. RPC 核心类图
图例说明(UML 标准简写):
A <|-- B表示 B 继承 A(B extends A)A <|.. B表示 B 实现 A(B implements A)A o-- B表示 A 组合 B(A has/contains B)
4.1 RPC 核心接口继承关系
RpcGateway 继承体系:
RpcGateway <|-- FencedRpcGateway<F>
<|-- JobMasterGateway
<|-- TaskExecutorGateway
<|-- DispatcherGateway
<|-- ResourceManagerGateway
RpcServer 继承体系:
StartStoppable ────┐
MainThreadExecutable ─┼─<|-- RpcServer <|.. PekkoInvocationHandler
RpcGateway ────────┘
4.2 RpcEndpoint 继承关系
RpcEndpoint <|-- FencedRpcEndpoint<F>
<|-- JobMaster ..|> JobMasterGateway
<|-- Dispatcher ..|> DispatcherGateway
<|-- ResourceManager ..|> ResourceManagerGateway
<|-- TaskExecutor ..|> TaskExecutorGateway
4.3 RpcService 实现关系
RpcService <|.. PekkoRpcService
|--o ActorSystem
|--o SupervisorActor --o PekkoRpcActor[]
|--o Configuration
4.4 RPC 消息类继承关系
Message <|-- RpcInvocation <|.. LocalRpcInvocation
| <|.. RemoteRpcInvocation
<|.. RunAsync
<|.. CallAsync
4.5 Actor 类继承关系
AbstractActor <|-- PekkoRpcActor <|-- FencedPekkoRpcActor<F>
<|-- SupervisorActor
5. RPC 调用流程详解
5.1 核心接口职责说明
- RpcGateway:远程调用的"门面接口",定义可被远程调用的方法签名(类似 REST API 的接口定义)
- RpcEndpoint:实际处理 RPC 请求的"服务端",实现具体业务逻辑(类似 Controller/Service 实现类)
- RpcService:RPC 运行时环境,负责创建、管理 RpcEndpoint(类似 Spring 容器)
- RpcServer:RpcEndpoint 的代理对象,用于控制生命周期和执行调度(类似本地的 RPC Stub)
核心交互流程:
调用方 → RpcGateway(代理) → PekkoInvocationHandler → RpcInvocation → PekkoRpcActor → RpcEndpoint
↓
结果 ← 业务方法()
说明:调用方获取的
RpcGateway实际上是一个 JDK 动态代理对象,其InvocationHandler就是PekkoInvocationHandler。 该代理同时实现了RpcGateway(定义业务方法)和RpcServer(提供 start/stop 等生命周期方法)接口。
流程说明:
- 调用方通过 RpcGateway 代理发起方法调用(代理背后是 PekkoInvocationHandler)
- PekkoInvocationHandler 将调用封装为 RpcInvocation 消息
- 消息发送到 PekkoRpcActor 的消息队列
- Actor 通过反射调用 RpcEndpoint 的业务方法
- 结果原路返回给调用方
各接口交互关系:
RpcService (RPC 运行时容器)
│
├─ 管理的 RpcEndpoint:
│ JobMaster ↔ PekkoRpcActor
│ Dispatcher ↔ PekkoRpcActor
│ TaskExecutor ↔ PekkoRpcActor
│
└─ 底层: ActorSystem (Pekko 运行时)
RpcService 职责:
startServer(endpoint)→ 创建 Actor + 返回 RpcServer 代理connect(address, gateway)→ 连接远程 RpcEndpointstopServer(server)→ 停止 RpcEndpoint
RpcGateway、RpcEndpoint、RpcServer、RpcService 四者关系:
RpcService (容器/工厂)
• startServer(endpoint) → 创建 Actor + 返回代理
• connect(address) → 获取远程 Gateway
│
┌──────────────────┼──────────────────┐
│ 创建 │ 管理 │ 创建代理
↓ ↓ ↓
PekkoRpcActor RpcEndpoint ───持有──→ RpcServer (JDK 动态代理)
(消息处理) (业务实现) • 实现 RpcGateway + 生命周期控制
│ │ │
│ │ implements │ 实际是
│ ↓ ↓
└────反射调用──→ RpcGateway ←───── 调用方使用
(接口定义)
关系说明:
- RpcService:容器角色,负责创建 Actor、生成代理、管理连接
- RpcEndpoint:业务实现,继承它并实现 RpcGateway 接口
- RpcServer:代理对象,由 RpcService 创建,实现了 RpcGateway 接口
- RpcGateway:接口定义,调用方通过它发起 RPC 调用
示例 - JobMaster:
class JobMaster extends RpcEndpoint implements JobMasterGateway {
// RpcEndpoint 提供: rpcServer 字段
// JobMasterGateway 定义: 可被远程调用的方法
}
5.2 本地调用流程
当调用者和被调用者在同一个 JVM 中时:
调用方: gateway.someMethod(args)
↓
PekkoInvocationHandler.invoke()
↓
判断方法类型
↓
invokeRpc(method, args)
↓
createRpcInvocationMessage() -- 创建 LocalRpcInvocation
↓
tell(rpcInvocation) 或 ask(rpcInvocation) -- 发送到 Actor
↓
PekkoRpcActor.handleRpcInvocation() -- Actor 接收消息
↓
lookupRpcMethod() -- 反射查找方法
↓
method.invoke(rpcEndpoint, args) -- 调用 RpcEndpoint 方法
↓
返回结果或 CompletableFuture
5.3 远程调用流程
当调用者和被调用者在不同 JVM 中时:
调用方: gateway.someMethod(args)
↓
PekkoInvocationHandler.invoke()
↓
createRpcInvocationMessage() -- 创建 RemoteRpcInvocation
↓
序列化参数 -- 参数 → 字节数组
↓
Patterns.ask(rpcEndpoint, message, timeout)
↓
[网络传输 - Pekko Remoting] -- 跨 JVM 传输
↓
远程 PekkoRpcActor.handleRpcInvocation() -- 远程 Actor 接收
↓
反序列化参数 -- 字节数组 → 参数
↓
method.invoke(rpcEndpoint, args) -- 调用目标方法
↓
序列化结果 -- 结果 → 字节数组
↓
[网络传输返回]
↓
反序列化结果 -- 字节数组 → 结果
↓
返回给调用方
5.4 消息处理状态机
PekkoRpcActor 内部维护一个状态机来管理消息处理:
STOPPED (初始状态)
↓ ControlMessages.START
STARTED (运行状态)
↓ ControlMessages.TERMINATE
TERMINATING (终止中)
↓ Actor 停止
TERMINATED (已终止)
状态说明:
- STOPPED: 初始状态,不处理 RPC 消息(会丢弃)
- STARTED: 运行状态,正常处理 RPC 消息
- TERMINATING: 终止中,仍处理消息但准备关闭
- TERMINATED: 已终止
6. Flink 组件通信模式
6.1 组件通信架构图
Client (REST API)
↓ HTTP
JobManager
├── Dispatcher ←→ JobMaster ←→ ResourceManager
↓ RPC ↓ RPC ↓ RPC
TaskManager(s)
└── TaskExecutor
└── Task, Task, Task, Task ...
6.2 各组件 RPC 通信详解
6.2.1 Dispatcher(作业分发器)
角色:接收客户端提交的作业,管理多个 JobMaster
Gateway 继承关系:
DispatcherGateway
extends FencedRpcGateway<DispatcherId>
extends RestfulGateway
主要 RPC 方法:
submitJob(JobGraph)- 调用方: REST API - 提交作业listJobs()- 调用方: REST API - 列出所有作业cancelJob(JobID)- 调用方: REST API - 取消作业triggerSavepoint(...)- 调用方: REST API - 触发 SavepointgetBlobServerPort()- 调用方: JobMaster - 获取 BlobServer 端口
通信模式:
Client (REST) ---HTTP---> Dispatcher ---内部调用---> JobMaster
6.2.2 JobMaster(作业主节点)
角色:管理单个作业的执行,协调 Task 调度
Gateway 继承关系:
JobMasterGateway
extends CheckpointCoordinatorGateway
extends FencedRpcGateway<JobMasterId>
extends KvStateLocationOracle
extends BlocklistListener
主要 RPC 方法:
registerTaskManager(...)- 调用方: TaskExecutor - 注册 TaskManagerofferSlots(...)- 调用方: TaskExecutor - 提供 SlotupdateTaskExecutionState(...)- 调用方: TaskExecutor - 更新任务状态heartbeatFromTaskManager(...)- 调用方: TaskExecutor - 心跳requestNextInputSplit(...)- 调用方: Task - 请求输入分片notifyEndOfData(...)- 调用方: Task - 通知数据结束triggerSavepoint(...)- 调用方: Dispatcher - 触发 Savepoint
通信模式:
ResourceManager <---RPC---> JobMaster <---RPC---> TaskExecutor
↓ RPC
Dispatcher
6.2.3 ResourceManager(资源管理器)
角色:管理集群资源,分配 Slot 给 JobMaster
Gateway 继承关系:
ResourceManagerGateway
extends FencedRpcGateway<ResourceManagerId>
extends ClusterPartitionManager
extends BlocklistListener
主要 RPC 方法:
registerJobMaster(...)- 调用方: JobMaster - 注册 JobMasterregisterTaskExecutor(...)- 调用方: TaskExecutor - 注册 TaskExecutordeclareRequiredResources(...)- 调用方: JobMaster - 声明资源需求sendSlotReport(...)- 调用方: TaskExecutor - 发送 Slot 报告heartbeatFromTaskManager(...)- 调用方: TaskExecutor - 心跳heartbeatFromJobManager(...)- 调用方: JobMaster - 心跳notifySlotAvailable(...)- 调用方: TaskExecutor - 通知 Slot 可用
通信模式:
JobMaster (Scheduler) <---RPC---> ResourceManager (SlotManager) <---RPC---> TaskExecutor (SlotTable)
↓ requestSlots ↓ allocateSlot
6.2.4 TaskExecutor(任务执行器)
角色:执行具体的 Task,管理本地资源
Gateway 继承关系:
TaskExecutorGateway
extends RpcGateway
extends TaskExecutorOperatorEventGateway
extends TaskExecutorThreadInfoGateway
主要 RPC 方法:
requestSlot(...)- 调用方: ResourceManager - 请求分配 SlotsubmitTask(...)- 调用方: JobMaster - 提交任务cancelTask(...)- 调用方: JobMaster - 取消任务triggerCheckpoint(...)- 调用方: JobMaster - 触发 CheckpointconfirmCheckpoint(...)- 调用方: JobMaster - 确认 CheckpointfreeSlot(...)- 调用方: JobMaster - 释放 SlotheartbeatFromJobManager(...)- 调用方: JobMaster - 心跳heartbeatFromResourceManager(...)- 调用方: ResourceManager - 心跳
通信模式:
ResourceManager ---requestSlot---> TaskExecutor
JobMaster <---RPC---> TaskExecutor
↓ submitTask ↓
Execution Task.run()
6.3 典型通信场景
场景一:作业提交流程
Client
│
│ 1. POST /jobs (HTTP)
↓
Dispatcher
│
│ 2. createJobManagerRunner()
↓
JobMaster (创建)
│
│ 3. registerJobMaster() (RPC)
↓
ResourceManager
│
│ 4. declareRequiredResources() (RPC)
│
│ 5. requestSlot() (RPC)
↓
TaskExecutor
│
│ 6. offerSlots() (RPC)
↓
JobMaster
│
│ 7. submitTask() (RPC)
↓
TaskExecutor
│
│ 8. Task.run()
↓
用户代码执行
场景二:心跳机制
心跳发送方向:
TaskExecutor ---heartbeat---> JobMaster
TaskExecutor ---heartbeat---> ResourceManager
JobMaster -----heartbeat---> ResourceManager
场景三:Checkpoint 流程
JobMaster (CheckpointCoordinator)
│
│ 1. triggerCheckpoint() (RPC)
↓
TaskExecutor (所有 Source Task)
│
│ 2. Barrier 传播 (数据流)
↓
下游 Task
│
│ 3. acknowledgeCheckpoint() (RPC)
↓
JobMaster
│
│ 4. 所有 Task 确认后完成 Checkpoint
6.4 服务发现机制
Flink RPC 地址不是写死的,而是通过 服务发现机制 动态获取。
核心组件
- HighAvailabilityServices:服务发现的总入口,提供各组件的 LeaderRetriever
- LeaderRetrievalService:监听并获取特定组件 Leader 的地址
- LeaderElection:参与 Leader 选举,当选后发布自己的地址
- LeaderRetrievalListener:监听 Leader 变化的回调接口
工作流程
组件启动流程(以 JobMaster 发现 ResourceManager 为例):
JobMaster 启动
↓
haServices.getResourceManagerLeaderRetriever() // 获取服务发现器
↓
leaderRetriever.start(listener) // 开始监听
↓
[ZooKeeper/内存] 通知 Leader 地址变化 // 后端存储
↓
listener.notifyLeaderAddress(address, leaderID) // 回调通知
↓
rpcService.connect(address, fencingToken, gateway) // 连接到 Leader
↓
ResourceManagerGateway // 可以进行 RPC 调用
实现方式
Flink 提供多种 HighAvailabilityServices 实现:
- StandaloneHaServices:单机模式,地址通过配置指定(非真正的服务发现)
- EmbeddedHaServices:嵌入式模式(MiniCluster),内存中的服务发现
- ZooKeeperHaServices:生产 HA 模式,通过 ZooKeeper 实现真正的服务发现
HighAvailabilityServices 提供的服务发现:
haServices
├── getResourceManagerLeaderRetriever() → 发现 ResourceManager
├── getDispatcherLeaderRetriever() → 发现 Dispatcher
├── getJobManagerLeaderRetriever(jobId) → 发现特定作业的 JobMaster
└── getClusterRestEndpointLeaderRetriever() → 发现 REST 端点
典型使用示例
// JobMaster 中发现 ResourceManager
public class JobMaster extends FencedRpcEndpoint<JobMasterId> {
private final LeaderRetrievalService resourceManagerLeaderRetriever;
public JobMaster(HighAvailabilityServices haServices, ...) {
// 从 HA 服务获取 ResourceManager 的服务发现器
this.resourceManagerLeaderRetriever =
haServices.getResourceManagerLeaderRetriever();
}
@Override
protected void onStart() {
// 启动服务发现,监听 ResourceManager Leader 变化
resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());
}
// Leader 变化时的回调
private class ResourceManagerLeaderListener implements LeaderRetrievalListener {
@Override
public void notifyLeaderAddress(String leaderAddress, UUID leaderSessionID) {
// 连接到新的 ResourceManager Leader
rpcService.connect(leaderAddress,
ResourceManagerId.fromUuid(leaderSessionID),
ResourceManagerGateway.class);
}
}
}
服务发现与 RPC 的关系
HighAvailabilityServices (服务发现中心)
│
┌──────────────────────┼──────────────────────┐
↓ ↓ ↓
LeaderRetriever LeaderRetriever LeaderRetriever
(ResourceManager) (Dispatcher) (JobMaster)
│ │ │
│ 监听地址变化 │ │
↓ ↓ ↓
notifyLeaderAddress notifyLeaderAddress notifyLeaderAddress
│ │ │
│ 调用 RpcService.connect() │
↓ ↓ ↓
RpcGateway 代理 RpcGateway 代理 RpcGateway 代理
6.5 Fencing Token 机制
Fencing Token(隔离令牌)是 Flink 用来解决 分布式系统脑裂问题(Split-Brain) 的关键机制。
6.4.1 问题背景
在分布式高可用(HA)场景下,可能同时存在多个 JobManager 实例:
场景:网络分区导致的脑裂
时刻 T1: 正常状态
JobManager-A (Leader, 活跃)
JobManager-B (Standby, 备用)
时刻 T2: 网络分区
JobManager-A (以为自己还是 Leader) ←── 网络分区 ──→ JobManager-B (被选举为新 Leader)
↓ ↓
继续发送 RPC 命令 开始发送 RPC 命令
↓ ↓
TaskExecutor 收到两个"Leader"的命令,怎么办?
如果没有 Fencing Token,TaskExecutor 无法区分哪个 JobMaster 是"合法的 Leader",可能导致:
- 同一个 Task 被启动两次
- Checkpoint 数据混乱
- 资源分配冲突
6.4.2 工作原理
核心思想:每次 Leader 选举时生成一个 唯一的令牌(UUID),所有 RPC 请求都必须携带这个令牌,接收方验证令牌是否匹配。
Fencing Token 验证流程:
1. 旧 JobMaster (Token: UUID-A) 发送 RPC
↓
2. TaskExecutor 收到消息
↓
3. 新 JobMaster (Token: UUID-B) 已注册到 TaskExecutor
↓
4. Token 不匹配 (UUID-A ≠ UUID-B)
↓
5. 拒绝请求,抛出 FencingTokenException
6.4.3 实现方式
Flink 通过以下类实现 Fencing Token 机制:
-
FencedRpcEndpoint<F>- 带令牌的 RPC 端点基类F是 Fencing Token 类型(如JobMasterId、ResourceManagerId)- 存储当前的 Fencing Token
-
FencedRpcGateway<F>- 带令牌的 RPC 网关接口getFencingToken()方法返回当前令牌
-
FencedPekkoRpcActor- 验证令牌的 Actor- 收到消息时检查令牌是否匹配
- 不匹配则拒绝处理
-
FencedPekkoInvocationHandler- 附加令牌的代理- 发送 RPC 时自动附加当前令牌
6.4.4 典型应用
// JobMaster 继承 FencedRpcEndpoint,使用 JobMasterId 作为 Fencing Token
public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMasterGateway {
// JobMasterId 是一个包装了 UUID 的类
// 每次 JobMaster 启动时生成新的 UUID
}
// ResourceManager 同样使用 Fencing Token
public class ResourceManager extends FencedRpcEndpoint<ResourceManagerId>
implements ResourceManagerGateway {
// ...
}
6.4.5 消息验证过程
FencedPekkoRpcActor 中的验证逻辑:
收到 FencedMessage<F>
↓
获取消息中的 Token
↓
与本地存储的 Token 比较
↓
匹配? ──否──→ 丢弃消息或返回 FencingTokenException
↓ 是
正常处理 RPC 调用
6.4.6 为什么同进程组件也需要心跳?
虽然 ResourceManager 和 JobMaster 在同一个 JVM 进程中,但它们之间仍然需要心跳机制:
JobManager 进程内部:
│
├── ResourceManager (独立的 RpcEndpoint/Actor)
│ ↑
│ │ RPC 心跳消息 (仍然走 Actor 消息机制)
│ ↓
├── JobMaster (独立的 RpcEndpoint/Actor)
│
└── Dispatcher
心跳的真正目的不仅仅是检测网络连通性,更重要的是:
-
活性检测(Liveness Check):检测组件是否正常响应,即使在同一进程,Actor 也可能因为死锁、长时间阻塞、消息队列积压等原因无法响应
-
Leader 变更感知:Flink 支持高可用(HA),可能存在多个 JobManager 进程,当 Leader 切换时,心跳超时可以帮助快速感知并进行故障转移
-
Fencing Token 验证:每次心跳都会验证 Fencing Token,确保通信的是合法的 Leader
-
设计一致性:统一的通信模式,不管是同进程还是跨进程,都使用相同的心跳机制和 RPC 接口
6.4.7 总结
- 目的:防止脑裂,确保只有合法 Leader 的命令被执行
- 实现:基于 UUID 的唯一令牌,每次 Leader 选举更新
- 验证方:FencedPekkoRpcActor 在接收消息时验证
- 发送方:FencedPekkoInvocationHandler 自动附加令牌
- 适用组件:JobMaster、ResourceManager、Dispatcher 等所有需要 HA 的组件
7. 关键设计模式
7.1 Actor 模型
Flink RPC 基于 Actor 模型,每个 RpcEndpoint 对应一个 Actor:
Actor 模型:
Actor A (Mailbox) ----消息----> Actor B (Mailbox)
↓ 单线程处理 ↓ 单线程处理
处理消息 处理消息
更新状态 更新状态
优点:无锁并发、消息驱动、故障隔离
7.2 动态代理模式
RPC 调用通过 JDK 动态代理实现:
动态代理模式:
调用方
↓ gateway.someMethod(args)
JDK Proxy (RpcGateway 接口)
↓ 委托
PekkoInvocationHandler
- 创建 RpcInvocation
- 发送到 Actor
- 处理返回值
7.3 监管策略
SupervisorActor 监管所有 RpcActor 的生命周期:
监管策略:
SupervisorActor
├── RpcActor
├── RpcActor
└── RpcActor
↓ 异常
SupervisorActorSupervisorStrategy
↓ 决策: Stop (不重启)
rpcActorFailed() → 终止整个 ActorSystem
8. 调试指南
8.1 关键断点位置
PekkoRpcService.startServer()- RPC 服务器启动PekkoInvocationHandler.invoke()- RPC 调用入口PekkoRpcActor.handleRpcInvocation()- RPC 消息处理PekkoRpcActor.handleControlMessage()- 控制消息处理SupervisorActor.createStartRpcActorMessage()- Actor 创建
8.2 关键日志
# RPC 服务启动
"Starting RPC endpoint for {} at {}."
# RPC 连接
"Try to connect to remote RPC endpoint with address {}."
# Actor 终止
"RpcActor {} has terminated."
# 状态转换
"Starting {} with name {}."
8.3 常见问题排查
问题1:RPC 调用超时
- 检查网络连接
- 检查目标 RpcEndpoint 是否启动
- 检查 Fencing Token 是否匹配
- 增加
pekko.ask.timeout配置
问题2:序列化错误
- 确保参数类实现 Serializable
- 检查类加载器是否正确
- 查看是否使用了不可序列化的 Lambda
问题3:Actor 异常终止
- 查看 SupervisorActor 日志
- 检查 onStart/onStop 回调是否抛出异常
- 确认资源是否正确释放
9. 配置参数
9.1 RPC 相关配置
pekko.ask.timeout- 默认值: 10s - RPC 调用超时时间pekko.framesize- 默认值: 10485760b - 最大消息帧大小pekko.tcp.timeout- 默认值: 20s - TCP 连接超时pekko.throughput- 默认值: 15 - Actor 每次处理的消息数pekko.fork-join-executor.parallelism-factor- 默认值: 2.0 - 并行度因子
10. 总结
Flink RPC 体系的核心特点:
- 基于 Actor 模型:利用 Pekko Actor 实现消息驱动的异步通信
- 动态代理透明化:通过 JDK 动态代理使远程调用像本地调用一样简单
- 单线程保证:每个 RpcEndpoint 的方法都在其主线程中执行,避免并发问题
- Fencing Token 防脑裂:通过令牌机制确保只有合法的 Leader 能够执行操作
- 监管策略:SupervisorActor 统一管理所有 RpcActor 的生命周期
- 本地/远程统一:同一套接口既支持本地调用也支持远程调用
本文档基于 Flink 1.19 源码分析编写