Flink内存模型

2023/03/28

Tags: Flink

Java 堆外内存

import sun.nio.ch.DirectBuffer;
import java.nio.ByteBuffer;
import java.util.concurrent.TimeUnit;

public class OutHeapMem {
    public static void main(String[] args) throws Exception {
        // 分配 1G 直接内存
        ByteBuffer byteBuffer = ByteBuffer.allocateDirect(1024 * 1024 * 1024);
        TimeUnit.SECONDS.sleep(30);

        System.out.println("clean start");
        // 清除直接内存
        ((DirectBuffer) byteBuffer).cleaner().clean();
        System.out.println("clean finished");

        TimeUnit.SECONDS.sleep(30);
    }
}
# 分配内存
Memory                                  used         total        max           usage
heap                                    21M          165M         3641M         0.59%
ps_eden_space                           3M           64M          1344M         0.29%
ps_survivor_space                       0K           10752K       10752K        0.00%
ps_old_gen                              17M          91M          2731M         0.64%
nonheap                                 28M          28M          -1            96.89%
code_cache                              5M           5M           240M          2.11%
metaspace                               20M          21M          -1            97.00%
compressed_class_space                  2M           2M           1024M         0.25%
+direct                                 1024M        1024M        -             100.00%
mapped                                  0K           0K           -             0.00% 
# 释放内存
Memory                                  used         total        max           usage
heap                                    21M          165M         3641M         0.60%
ps_eden_space                           4M           64M          1344M         0.32%
ps_survivor_space                       0K           10752K       10752K        0.00%
ps_old_gen                              17M          91M          2731M         0.64%
nonheap                                 27M          28M          -1            96.79%
code_cache                              5M           5M           240M          2.09%
metaspace                               20M          21M          -1            97.03%
compressed_class_space                  2M           2M           1024M         0.25%
-direct                                 0K           0K           -             0.00%
mapped                                  0K           0K           -             0.00%

通过 arthas 分析,分配直接内存会在 direct 开辟内存空间,表明是在堆外分配的内存空间;虽然 byteBuffer 指向了 direct memory,但是这个对象引用还在 heap 中,当 byteBuffer 对象引用 被 GC 算法回收掉之后,byteBuffer 指向的内存空间也会被释放;

taskmanager内存模型

flink 内存模型大体上可以分为,Heap 内存 和 Off-Heap 内存;

JVM Heap

组成部分 配置参数 描述
框架堆内存(Framework Heap Memory) taskmanager.memory.framework.heap.size 用于 Flink 框架的 JVM 堆内存(进阶配置)。TaskExecutors 的框架堆内存大小。这是为 TaskExecutor 框架保留的 JVM 堆内存大小,不会分配给任务槽。
任务堆内存(Task Heap Memory) taskmanager.memory.task.heap.size 用于 Flink 应用的算子及用户代码的 JVM 堆内存。
任务执行器的任务堆内存大小。这是为任务保留的 JVM 堆内存的大小。如果未指定,它的大小为:总 Flink 内存减去框架堆内存、框架堆外内存、任务堆外内存、托管内存和网络内存。

Off-JVM Memory

Managed memory

组成部分 配置参数 描述
托管内存(Managed memory) taskmanager.memory.managed.size
taskmanager.memory.managed.fraction
由 Flink 管理的流处理和批处理作业中用于排序、哈希表及缓存中间结果 、 流处理作业中用于 RocksDB State Backend、流处理和批处理作业中用于在 Python 进程中执行用户自定义函数。内存使用者可以从 MemoryManager 以 MemorySegments 的形式分配内存,所以名称叫 Managed Momory。

Direct Memory

组成部分 配置参数 描述
框架堆外内存(Framework Off-heap Memory) taskmanager.memory.framework.off-heap.size 这是为 Flink 框架保留的堆外内存的大小,不会分配给任务槽。 Flink 在计算 JVM max direct memory size 参数时,会把配置的值全部统计进去。
任务堆外内存(Task Off-heap Memory) taskmanager.memory.task.off-heap.size TaskExecutors 的任务堆外内存大小,用于 Flink 算子及用户代码的堆外内存。 Flink 在计算 JVM max direct memory size 参数时,会把配置的值全部统计进去。
网络内存(Network Memory) taskmanager.memory.network.min
taskmanager.memory.network.max
taskmanager.memory.network.fraction
用于task 之间数据传输的堆外内存(网络传输缓冲区)

JVM Metaspace 、JVM Overhead

组成部分 配置参数 描述
JVM Metaspace taskmanager.memory.jvm-metaspace.size Flink JVM 进程的 Metaspace。从 JDK 8 开始,JVM 把永久代拿掉了。类的一些元数据放在叫做 Metaspace 的 Native Memory 中。在 Flink 中的 JVM Metaspace Memory 也一样,配置的是 Taskmanager JVM 的元空间内存大小。
JVM Overhead taskmanager.memory.jvm-overhead.min
taskmanager.memory.jvm-overhead.max
taskmanager.memory.jvm-overhead.fraction
用于 JVM 开销的本地内存,例如栈空间、垃圾回收空间等。Flink 计算 JVM max direct memory size 参数时不会计算在内。

参考: https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/deployment/memory/mem_setup_tm/