
分区设计是数据建模的关键部分。不幸的是,考虑到大多数基于Hive的表的限制,数据工程师(包括我自己)往往被迫做出具有无意和潜在危险影响的选择。在这篇文章中,我将通过下面的样例:使用纽约流行的出租车数据集,说明分区设计的一些潜在陷阱,以及避免这些陷阱的技术。
Hive分区的问题
纽约出租车和豪华轿车委员会公布了一个关于纽约及其周边地区居民活动的丰富数据集。该数据以一组parquet文件的形式存储,每个文件涵盖一个月的旅行信息。
让我们假设,我的任务是将这些数据输入公司的数据仓库,以帮助分析开设我们全新旗舰店的最佳位置。因为我是一个经验丰富的数据从业者,精通功能数据工程的方法,所以我很自然地设置了我的目标数据仓库表,其中包含每月分区。
CREATE TABLE taxis AS … PARTITIONED BY year, month
由于我正在加载每月所有文件中提供的数据,因此使用文件名中指示的月份似乎是一个理所当然的选择。我可以相信,加载任何给定的文件只会影响表中的单个分区。这使我能够安全地并行加载文件,并在将来安全地重新处理文件。
INSERT OVERWRITE taxis PARTITION (year=2020, month=3)
SELECT * FROM taxi_202003
然而,当我们尝试回答一个简单的问题“2020年3月份有多少次打车?”时,我们很快就会发现这种方法的问题。
SELECT count(*) FROM taxis
WHERE pickup_time BETWEEN ‘2020-03-01’ AND ‘2020-04-01’;
--RESULT: 3,007,384
很好,但在sql的某个地方,数据工程师崩溃了,因为我们忘了添加年和月分区作为谓词,这意味着我们需要做一次全表扫描!让我们修改我们的查询以提高性能:
SELECT count(*) FROM taxis
WHERE pickup_time BETWEEN ‘2020-03-01’ AND ‘2020-04-01’
AND year=2020 AND month=03;
--RESULT: 3,007,687(译者注:作者写错了,应该少了)
什么?!!
事实证明,以月为单位的出租车源数据可能包含多个月的行程。因此,通过将我们的查询限制为只包括单个分区,我们会无意中排除记录,我们的查询会产生不正确的结果。
事件时间与处理时间
数据工程师有一个业内通用的方案,我们通常会根据处理时间对表进行分区。对较为随意的分析师来说,这似乎是一个无害的选择,然而,读者应该能意识到,出租车公司所有的KPI考核都应该打上问号。
昨天有多少新客户访问了我们的网站?
”昨天“可能包括前几天的访客,也可能缺少尚未登陆的用户数据
我们经常遇到如下的情况:采集到的迟到的数据。在该小时内,我们的系统很可能会处理前几个小时(或几天)产生的记录。我们通常选择按处理时间对此数据集分区,以避免尝试将此延迟到达的数据附加到现有分区所带来的性能和正确性挑战。
按事件时间分区
我可以选择从pickup_time或者dropoff_time(哦,痛苦!)派生每条记录的年和月,而不是直接使用文件填充年和月的分区列。为了确保我可以安全地重新运行此作业,而不复制数据,我可以使用Spark的插入覆盖语法。
INSERT OVERWRITE taxis PARTITION (year, month)
SELECT *,
year(pickup_time) as year,
month(pickup_time) as month
FROM taxi_202003
但是,正如我们所观察到的,每个月源数据文件可能包含多个目标分区的数据。即使启用了Spark的动态分区覆盖,如果我试图简单地插入此数据,我最终也会通过覆盖意外分区中的数据来删除数据。
按处理时间和事件时间分区
为了避免无意中覆盖数据,我可以使用处理时间和事件时间分区的组合。
CREATE TABLE taxis AS … PARTITIONED BY filemonth, year, month
现在,我可以将静态和动态分区覆盖结合起来,以实现幂等写入。
INSERT OVERWRITE taxis PARTITION (filemonth=202003, year, month)
SELECT *,
year(pickup_time) as year,
month(pickup_time) as month
FROM taxi_202003
我们已经静态定义了要加载的文件的分区,因此我们可以安全地多次加载此文件,而不会覆盖其他文件中的数据。而且,由于年和月分区是从皮卡时间列派生的,我们之前的两个查询(有和没有分区谓词)会产生相同的(正确的)结果。
是时候庆祝工作做得很好了?
很遗憾,还没有。我现在已经为一个新的、更微妙的问题奠定了基础。最初是每个月的单个源数据文件的内容将分布在可能的许多分区上。随着几个月的数据继续加载到这个表中,我很可能会遇到可怕的小文件问题。
来救场的Table Format
幸运的是,我们现在有了像Delta和iceberg这样的表格式,它们允许我们利用合并操作,使这种模式更加高效。
MERGE INTO taxis t USING (SELECT * FROM source) s
ON t.pickup_time = s.pickup_time AND t.dropoff_time = s.dropoff_time
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
我们几乎完成了,但是我们仍然有这种不够友好的模式,即在数据模型中添加列,只是为了保存分区值。正如您所记得的,这也意味着我们的所有查询都需要添加这些无关的列,以避免全表扫描:
SELECT count(*) from nyc_taxi where pickup_time between ‘2020-03-01’ and ‘2020-04-01’ AND year=2020 and month=03
旁注:分析师怎么知道他们应该添加这个额外的谓词?口口相传?被支付AWS账单的人一再责骂?
iceberg 拯救世界
如果你像我一样聪明、经验丰富(老道),并记得数据在以前(例如hadoop之前)是如何使用的,那么你就知道分区的概念并不新鲜。
与基于hadoop的表格式不同,iceberg分区是指导引擎如何结构化数据的配置,而不是表示目录结构的额外列。冰山跟踪数据和结构之间的关系,因此我们永远不需要手动选择这些目录,也不需要额外的谓词来提高效率。
事实上,对于基于时间的分区,冰山提供了方便的转换,用于将分区定义为表定义的一部分,这样我们甚至不必考虑将分区作为我们的采集工作的一部分。
CREATE TABLE iceberg.taxis AS … PARTITIONED BY month(pickup_time)
因此,我们终于可以回到我们最初的理想查询,并知道它将是高效和准确的。
SELECT count(*) FROM iceberg.taxis WHERE pickup_time BETWEEN ‘2020-03-01’ AND ‘2020-04-01’;
RESULT: 3,007,384
专家模式:这也意味着我们可以进化表的分区方案,而不必重述所有数据!
圆满结束
我们已经看到了表的分区策略如何影响读取和写入性能以及查询结果的准确性。通过利用iceberg,数据工程师可以避免处理时间与事件时间的复杂性带来的问题,而分析师可以轻松避免全表扫描!
作者:Jason Reid
翻译:池大勇
原文:https://tabular.io/blog/partitioning/
基于 Apache Flink 的实时计算数据流业务引擎在京东零售的实践和落地

