
01
—
背景
企业生产环境中,会出现大量依赖中间状态的实时任务,目前flink的状态存储有Memory、FileSystem 和 RocksDB 三种可选,且 RocksDB 是状态数据量较大(GB 到 TB 级别)时的唯一选择。RocksDB 的性能发挥非常仰赖调优,如果全部采用默认配置,读写性能有可能会很差。但是,RocksDB 的配置也是极为复杂的,可调整的参数多达百个,没有放之四海而皆准的优化方案。如果仅考虑 Flink 状态存储这一方面,我们仍然可以总结出一些相对普适的优化思路。本文先介绍一些基础知识,再列举方法。
本文测试flink版本1.14.6
02
—
-
MemoryStateBackend:默认的方式,即基于JVM的堆内存进行存储,主要适用于本地开发和调试; -
FsStateBackend:基于文件系统进行存储,可以是本地文件系统,也可以是HDFS等分布式文件系统。需要注意,虽然选择使用 FsStateBackend,但是正在进行的数据仍然存储在 TaskManager的内存中,只有在 checkpoint 时,才会将状态快照写入到指定文件系统上; -
RocksDBStateBackend:Flink内置的第三方状态管理器,采用嵌入式的 key-value 型数据库RocksDB 来存储正在进行的数据。等到 checkpoint 时,再讲其中的数据持久化到指定的文件系统中,所以采用 RocksDBStateBackend 时,也需要配置持久化存储的文件系统。之所以这样做,是因为 RocksDB 作为嵌入式数据库安全性比较低,但是比起全文件系统的方式,其读取速度更快,比起全内存的方式,其存储空间更大,因此是一种比较均衡的方案。
03
—
RocksDB 大状态调优
3.1 开启State访问性能监控
state.backend.latency-track.keyed-state-enabled: true #启用访问状态的性能监控state.backend.latency-track.sample-interval: 100 #采样间隔state.backend.latency-track.history-size: 128 #保留的采样数据个数,越大越精确state.backend.latency-track.state-name-as-viriable: true #将状态名作为变量
正常情况下,开启第一个参数即可。
3.2.1 开启增量检查点
state.backend.incremental: true #默认false,改为true或代码中指定new EmbeddedRocksDBStateBackend(true)
3.2.2 开启本地恢复
state.backend.local-recovery: true
3.3 调整预定义选项
Flink 为 RocksDB 提供了一些预定义的选项集合,比如 DEFAULT、SPINNING_DISK_OPTIMIZED、SPINING_DISK_OPTIMIZED_HIGH_MEM 或 FLASH_SSD_OPTIMIZED。
3.3 调整预定义选项Flink 为 RocksDB 提供了一些预定义的选项集合,比如 DEFAULT、SPINNING_DISK_OPTIMIZED、SPINING_DISK_OPTIMIZED_HIGH_MEM 或 FLASH_SSD_OPTIMIZED。
3.3.1 配置方式
EmbeddedRocksDBStateBackend embeddedRocksDBStateBackend = new EmbeddedRocksDBStateBackend(true);embeddedRocksDBStateBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED_HIGH_MEM);env.setStateBackend(embeddedRocksDBStateBackend);
2)启动指令指定
-Dstate.backend.rocksdb.predefined-options: SPINING_DISK_OPTIMIZED_HIGH_MEM #机械硬盘+内存
3.4 其他高阶配置

3.4.1 增大block缓存
state.backend.rocksdb.block.cache-size: 64m #默认 8m
3.4.2 增大 write buffer 和 level 阈值大小
state.backend.rocksdb.writebuffer.size: 128mstate.backend.rocksdb.compaction.level.max-size-level-base: 320m
3.4.3 增大 write buffer 数量
state.backend.rocksdb.writebuffer.count: 5
3.4.4 增大后台线程数和 write buffer 合并数
state.backend.rocksdb.thread.num: 4
2)增大 write buffer 最小合并数
state.backend.rocksdb.writebuffer.number-to-merge: 3
3.4.5 开启分区索引功能
Flink 1.13 中对 RocksDB 增加了分区索引功能,复用了 RocksDB 的 partitioned Index & filter 功能,简单来说就是对 RocksDB 的partitioned Index 做了多级索引。也就是将内存中的最上层常驻,下层根据需要再 load 回来,这样就大大降低了数据 Swap 竞争。线上测试中,相对于内存较小的场景中,性能提升10倍左右。如果在内存管控下 RocksDB 性能不如预期的话,这个也能作为一个性能优化点。
state.backend.rocksdb.memory.partitioned-index-filters:true #默认 false
3.5 参数设定案例
yarn-session模式启动参数设置nohup ./bin/yarn-session.sh -nm flink14-pv-event-nginx-parse -s 6 -tm 15360 -jm 2048 -D taskmanager.memory.size=0m -D taskmanager.memory.off-heap.enabled=true -D taskmanager.memory.jvm-overhead.min=2048m -D taskmanager.memory.jvm-overhead.max=3072m -D state.backend.rocksdb.predefined-options=SPINNING_DISK_OPTIMIZED -D state.backend.rocksdb.writebuffer.size=256m -D state.backend.rocksdb.writebuffer.count=5 -D state.backend.rocksdb.compaction.level.max-size-level-base=320m -D state.backend.rocksdb.writebuffer.number-to-merge=3 -D state.backend.rocksdb.memory.partitioned-index-filters=true -d &>/dev/null 2>&1 &
04
—
Checkpoint设置
// 使⽤ RocksDBStateBackend 做为状态后端,并开启增量 CheckpointRocksDBStateBackend rocksDBStateBackend = newRocksDBStateBackend("hdfs://hadoop01:8020/flink/checkpoints", true);env.setStateBackend(rocksDBStateBackend);// 开启 Checkpoint,间隔为 1 分钟env.enableCheckpointing(TimeUnit.MINUTES.toMillis(1));// 配置 CheckpointCheckpointConfig checkpointConf = env.getCheckpointConfig();checkpointConf.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)// 最小间隔 2 分钟checkpointConf.setMinPauseBetweenCheckpoints(TimeUnit.MINUTES.toMillis(2))// 超时时间 10 分钟checkpointConf.setCheckpointTimeout(TimeUnit.MINUTES.toMillis(10));// 保存 checkpointcheckpointConf.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
05
—
调优实践总结
启动yarn sessionnohup ./bin/yarn-session.sh -nm flink14-pv-event-nginx-parse2 -s 6 -tm 15360 -jm 2048 -D state.backend.rocksdb.memory.managed=true -d &>/dev/null 2>&1 &启动flink任务nohup ./bin/flink run -c suishen.bigdata.pv.event.PvEventNginxLogParseAppRepair -p 1 -yid application_1642060369182_7487586 -d module-event-etl.jar &>/dev/null 2>&1 &

第二种禁用 RocksDB 内存托管,调整部分RocksDB参数启动任务
启动yarn sessionnohup ./bin/yarn-session.sh -nm flink14-pv-event-nginx-parse -s 6 -tm 15360 -jm 2048 -D taskmanager.memory.size=0m -D taskmanager.memory.off-heap.enabled=true -D taskmanager.memory.jvm-overhead.min=2048m -D taskmanager.memory.jvm-overhead.max=3072m -D state.backend.rocksdb.memory.managed=false -D state.backend.rocksdb.predefined-options=SPINNING_DISK_OPTIMIZED -D state.backend.rocksdb.writebuffer.size=256m -D state.backend.rocksdb.block.cache-size=512m -D state.backend.rocksdb.writebuffer.count=5 -D state.backend.rocksdb.compaction.level.max-size-level-base=64m -D state.backend.rocksdb.log.level=ERROR_LEVEL -D state.backend.rocksdb.writebuffer.number-to-merge=3 -D state.backend.rocksdb.options.max-manifest-file-size=32m -D state.backend.rocksdb.memory.partitioned-index-filters=true -d &>/dev/null 2>&1 &启动flink任务nohup ./bin/flink run -c suishen.bigdata.pv.event.PvEventNginxLogParseApp -p 1 -yid application_1642060369182_6869813 -d module-event-etl.jar &>/dev/null 2>&1 &

-
对于一般使用状态的任务(状态小于GB)来说,直接使用默认设置即可(默认是使用RocksDB 内存托管方式管理状态)。 -
对于内存比较大的任务可以禁用 RocksDB 内存托管,手动根据任务运行状况调节合适参数。这里只是举例部分RocksDB内存参数设置,还有很多参数需要根据官方文档和业务状况结合判断。 -
参数调优只是一部分,最主要的调优方式还是减少状态大小,如设置状态过期时间、状态清理策略等等。
State Backends | Apache Flink
欢迎使用RocksDB RocksDB中文网 | 一个持久型的key-value存储
[https://juejin.cn/post/6874493825272774663]
作者 | 徐伟奇 大数据开发工程师

