数据湖和 Iceberg 简介
未来规划
-
演示方案 存储优化的一些思考
GitHub 地址 

一、数据湖和 Iceberg 简介
1. 数据湖生态

首先我们认为它底下应具备海量存储的能力,常见的有对象存储,公有云存储以及 HDFS;
在这之上,也需要支持丰富的数据类型,包括非结构化的图像视频,半结构化的 CSV、XML、Log,以及结构化的数据库表;
除此之外,需要高效统一的元数据管理,使得计算引擎可以方便地索引到各种类型数据来做分析。
最后,我们需要支持丰富的计算引擎,包括 Flink、Spark、Hive、Presto 等,从而方便对接企业中已有的一些应用架构。
2. 结构化数据在数据湖上的应用场景

-
首先,可以看到数据源的类型很多,因此需要支持比较丰富的数据 Schema 的组织; -
其次,它在注入的过程中要支撑实时的数据查询,所以需要 ACID 的保证,确保不会读到一些还没写完的中间状态的脏数据; -
最后,例如日志这些有可能临时需要改个格式,或者加一列。类似这种情况,需要避免像传统的数仓一样,可能要把所有的数据重新提出来写一遍,重新注入到存储;而是需要一个轻量级的解决方案来达成需求。
3. 结构化数据在数据湖上的典型解决方案

4. Iceberg 表数据组织架构

-
快照 Metadata:表格 Schema、Partition、Partition spec、Manifest List 路径、当前快照等。 -
Manifest List:Manifest File 路径及其 Partition,数据文件统计信息。 -
Manifest File:Data File 路径及其每列数据上下边界。 -
Data File:实际表内容数据,以 Parque,ORC,Avro 等格式组织。
-
可以看到右边从数据文件开始,数据文件存放表内容数据,一般支持 Parquet、ORC、Avro 等格式; -
往上是 Manifest File,它会记录底下数据文件的路径以及每列数据的上下边界,方便过滤查询文件; -
再往上是 Manifest List,它来链接底下多个 Manifest File,同时记录 Manifest File 对应的分区范围信息,也是为了方便后续做过滤查询; Manifest List 其实已经表示了快照的信息,它包含当下数据库表所有的数据链接,也是 Iceberg 能够支持 ACID 特性的关键保障。 有了快照,读数据的时候只能读到快照所能引用到的数据,还在写的数据不会被快照引用到,也就不会读到脏数据。多个快照会共享以前的数据文件,通过共享这些 Manifest File 来共享之前的数据。 -
再往上是快照元数据,记录了当前或者历史上表格 Scheme 的变化、分区的配置、所有快照 Manifest File 路径、以及当前快照是哪一个。 同时,Iceberg 提供命名空间以及表格的抽象,做完整的数据组织管理。
5. Iceberg 写入流程

-
首先,Data Workers 会从元数据上读出数据进行解析,然后把一条记录交给 Iceberg 存储; -
与常见的数据库一样,Iceberg 也会有预定义的分区,那些记录会写入到各个不同的分区,形成一些新的文件; -
Flink 有个 CheckPoint 机制,文件到达以后,Flink 就会完成这一批文件的写入,然后生成这一批文件的清单,接着交给 Commit Worker; -
Commit Worker 会读出当前快照的信息,然后与这一次生成的文件列表进行合并,生成一个新的 Manifest List 以及后续元数据的表文件的信息,之后进行提交,成功以后就形成一个新的快照。
6. Iceberg 查询流程

-
首先是 Flink Table scan worker 做一个 scan,scan 的时候可以像树一样,从根开始,找到当前的快照或者用户指定的一个历史快照,然后从快照中拿出当前快照的 Manifest List 文件,根据当时保存的一些信息,就可以过滤出满足这次查询条件的 Manifest File; -
再往下经过 Manifest File 里记录的信息,过滤出底下需要的 Data Files。这个文件拿出来以后,再交给 Recorder reader workers,它从文件中读出满足条件的 Recode,然后返回给上层调用。
7. Iceberg Catalog 功能一览

-
它可以对 Iceberg 定义一系列角色文件; -
它的 File IO 都是可以定制,包括读写和删除; -
它的命名空间和表的操作 (也可称为元数据操作),也可以定制; -
包括表的读取 / 扫描,表的提交,都可以用 Catalog 来定制。
二、对象存储支撑 Iceberg 数据湖
1. 当前 Iceberg Catalog 实现

2. 对象存储和 HDFS 的比较

-
对象存储在集群扩展性,小文件友好,多站点部署和低存储开销上更加有优势; -
HDFS 的好处就是提供追加上传和原子性 rename,这两个优势正是 Iceberg 需要的。

-
HDFS 架构是用单个 Name Node 保存所有元数据,这就决定了它单节点的能力有限,所以在元数据方面没有横向扩展能力。 -
对象存储一般采用哈希方式,把元数据分隔成各个块,把这个块交给不同 Node 上面的服务来进行管理,天然地它元数据的上限会更高,甚至在极端情况下可以进行 rehash,把这个块切得更细,交给更多的 Node 来管理元数据,达到扩展能力。

-
HDFS 基于架构的限制,小文件存储受限于 Name Node 内存等资源,虽然 HDFS 提供了 Archive 的方法来合并小文件,减少对 Name Node 的压力,但这需要额外增加复杂度,不是原生的。 同样,小文件的 TPS 也是受限于 Name Node 的处理能力,因为它只有单个 Name Node。对象存储的元数据是分布式存储和管理,流量可以很好地分布到各个 Node 上,这样单节点就可以存储海量的小文件。 -
目前,很多对象存储提供多介质,分层加速,可以提升小文件的性能。

-
对象存储支持多站点部署 -
全局命名空间 -
支持丰富的规则配置 -
对象存储的多站点部署能力适用于两地三中心多活的架构,而 HDFS 没有原生的多站点部署能力。虽然目前看到一些商业版本给 HDFS 增加了多站点负责数据的能力,但由于它的两个系统可能是独立的,因此并不能支撑真正的全局命名空间下多活的能力。

-
对于存储系统来说,为了适应随机的硬件故障,它一般会有副本机制来保护数据。 -
常见的如三副本,把数据存三份,然后分开保存到三个 Node 上面,存储开销是三倍,但是它可以同时容忍两个副本遇到故障,保证数据不会丢失。 -
另一种是 Erasure Coding,通常称为 EC。以 10+2 举例,它把数据切成 10 个数据块,然后用算法算出两个代码块,一共 12 个块。接着分布到四个节点上,存储开销是 1.2 倍。它同样可以容忍同时出现两个块故障,这种情况可以用剩余的 10 个块算出所有的数据,这样减少存储开销,同时达到故障容忍程度。 -
HDFS 默认使用三副本机制,新的 HDFS 版本上已经支持 EC 的能力。经过研究,它是基于文件做 EC,所以它对小文件有天然的劣势。因为如果小文件的大小小于分块要求的大小时,它的开销就会比原定的开销更大,因为两个代码块这边是不能省的。在极端情况下,如果它的大小等同于单个代码块的大小,它就已经等同于三副本了。
同时,HDFS 一旦 EC,就不能再支持 append、hflush、hsync 等操作,这会极大地影响 EC 能够使用的场景。对象存储原生支持 EC,对于小文件的话,它内部会把小文件合并成一个大的块来做 EC,这样确保数据开销方面始终是恒定的,基于预先配置的策略。
3. 对象存储的挑战:数据的追加上传



-
第一步先创建初始化的 MPU,拿到一个 Upload ID,然后给每一个分段赋予一个 Upload ID 以及一个编号,这些分块就可以并行上传; -
在上传完成以后,还需要一步 Complete 操作,这样相当于通知系统,它会把基于同一个 Upload ID 以及所有的编号,从小到大排起来,组成一个大文件; -
把机制运用到数据追加上传场景,常规实现就是写入一个文件,把文件缓存到本地,当达到分块要求大小时,就可以把它进行初始化 MPU,把它的一个分块开始上传。后面每一个分块也是一样的操作,直到最后一个分块上传完,最后再调用一个完成操作来完成上传。
-
缺点是 MPU 的分片数量有上限,S3 标准里可能只有 1 万个分片。想支持大文件的话,这个分块就不能太小,所以对于小于分块的文件,依然是要利用前面一种方法进行缓存上传; -
MPU 的优点在于并行上传的能力。假设做一个异步的上传,文件在缓存达到以后,不用等上一个分块上传成功,就可以继续缓存下一个,之后开始上传。当前面注入的速度足够快时,后端的异步提交就变成了并行操作。利用这个机制,它可以提供比单条流上传速度更快的上传能力。
4. 对象存储的挑战:原子提交


-
这里 Commit Worker 1 拿到了 v006 版本,然后合并自己的文件,提交 v007 成功。 -
此时还有另一个 Commit Worker 2,它也拿到了 v006,然后合并出来,且也要提供 v007。此时我们需要一个机制告诉它 v007 已经冲突,不能上传,然后让它自己去 Retry。Retry 以后取出新的 v007 合并,然后提交给 v008。

-
首先,Commit Worker 1 拿到 v006,然后合并文件,在提交之前先要获取这一把锁,拿到锁以后判断当前快照版本。如果是 v006,则 v007 能提交成功,提交成功以后再解锁。 -
同样,Commit Worker 2 拿到 v006 合并以后,它一开始拿不到锁,要等 Commit Worker 1 释放掉这个锁以后才能拿到。等拿到锁再去检查的时候,会发现当前版本已经是 v007,与自己的 v007 有冲突,因此这个操作一定会失败,然后它就会进行 Retry。
5. Dell EMC ECS 的数据追加上传

6. Dell EMC ECS 在并发提交下的解决方案

-
If-Match 就是说在 Commit Worker 1 提交拿到 v006 的时候,同时拿到了文件的 eTag。提交的时候会带上 eTag,系统需要判断要覆盖文件的 eTag 跟当前这个文件真实 eTag 是否相同,如果相同就允许这次覆盖操作,那么 v007 就能提交成功; -
另一种情况,是 Commit Worker 2 也拿到了 v006 的 eTag,然后上传的时候发现拿到 eTag 跟当前系统里文件不同,则会返回失败,然后触发 Retry。
7. S3 Catalog - 统一存储的数据

三、演示方案
四、存储优化的一些思考
另外~《Apache Flink-实时计算正当时》电子书重磅发布,本书将助您轻松 Get Apache Flink 1.13 版本最新特征,同时还包含知名厂商多场景 Flink 实战经验,学用一体,干货多多!快扫描下方二维码获取吧~

(本次为抢鲜版,正式版将于 7 月初上线)

更多 Flink 相关技术交流,可扫码加入社区钉钉大群~

▼ 关注「Flink 中文社区」,获取更多技术干货 ▼
关注下方 Apache Iceberg公众号,回复"flink2022"即可领取百份Flink干货纯资料,包含内核详解、行业实践、实时数仓等,业界大佬实力分享!
Apache Iceberg 简介



