使用Apache Hudi构建大规模、事务性数据湖

一个近期由Hudi PMC & Uber Senior Engineering Manager Nishith Agarwal分享的Talk

关于Nishith Agarwal更详细的介绍,主要从事数据方面的工作,包括摄取标准化,数据湖原语等。

什么是数据湖?数据湖是一个集中式的存储,允许以任意规模存储结构化和非结构化数据。你可以存储原始数据,而不需要先转化为结构化的数据,基于数据湖之上可以运行多种类型的分析,如dashboard、大数据处理的可视化、实时分析、机器学习等。

接着看看对于构建PB级数据湖有哪些关键的要求

第一个要求:增量摄取(CDC)

企业中高价值的数据往往存储在OLTP中,例如下图中,users表包含用户ID,国家/地区,修改时间和其他详细信息,但OLTP系统并未针对大批量分析进行优化,因此可能需要引入数据湖。同时一些企业采用备份在线数据库的方式,并将其存储到数据湖中的方法来摄取数据,但这种方式无法扩展,同时它给上游数据库增加了沉重的负担,也导致数据重写的浪费,因此需要一种增量摄取数据的方法。

第二个要求:Log Event去重

考虑分析大规模时间序列数据的场景,这些事件被写入数据管道,并且数量非常大,可达数十亿,每秒可达百万的量。但流中可能有重复项,可能是由于至少一次(atleast-once)保证,数据管道或客户端失败重试处理等发送了重复的事件,如果不对日志流进行重复处理,则对这些数据集进行的分析会有正确性问题。下图是一个示例日志事件流,其中事件ID为唯一键,带有事件时间和其他有效负载。

第三个要求:存储管理(自动管理DFS上文件)

我们已经了解了如何摄取数据,那么如何管理数据的存储以扩展整个生态系统呢?其中小文件是个大问题,它们会导致查询引擎的开销并增加文件系统元数据的压力。而如果写入较大的文件,则可能导致摄取延迟增加。一种常见的策略是先摄取小文件,然后再进行合并,这种方法没有标准,并且在某些情况下是非原子行为,会导致一致性问题。无论如何,当我们写小文件并且在合并这些文件之前,查询性能都会受到影响。

第四个要求:事务写(ACID能力)

传统数据湖在数据写入时的事务性方面做得不太好,但随着越来越多的业务关键处理流程移至数据湖,情况也在发生变化,我们需要一种机制来原子地发布一批数据,即仅保存有效数据,部分失败必须回滚而不会损坏已有数据集。同时查询的结果必须是可重复的,查询端看不到任何部分提取的数据,任何提交的数据都必须可靠地写入。Hudi提供了强大的ACID能力。

第五个要求:更快地派生/ETL数据(增量处理)

仅仅能快速摄取数据还不够,我们还需要具有计算派生数据的能力,没有这个能力,数据工程师通常会绕过原始表来构建其派生/ETL并最终破坏整个体系结构。下面示例中,我们看到原始付款表(货币未标准化)和发生货币转换的派生表。

扩展此类数据管道时很有挑战,如仅对变更进行计算,或者基于窗口的Join的挑战。对基础数据集进行大规模重新处理不太可能,这会浪费计算资源。需要在数据湖上进行抽象以支持对上游表中已更改的行(数据)进行智能计算。

第六个要求:法律合规/数据删除(更新&删除)

近年来随着新的数据保护法规生效,对数据保留有了严格的规定,需要删除原始记录,修复数据的正确性等,当需要在PB级数据湖中高效执行合规性时非常困难,如同大海捞针一般,需要高效的删除,如进行索引,对扫描进行优化,将删除记录有效地传播到下游表的机制。

要求回顾(汇总)

  • 支持增量数据库变更日志摄取。
  • 从日志事件中删除所有重复项。
  • Data Lake必须为其数据集提供有效的存储管理
  • 支持事务写入
  • 必须提供严格的SLA,以确保原始表和派生表的数据新鲜度
  • 任何数据合规性需求都需要得到有效的支持
  • 支持唯一键约束
  • 有效处理迟到的数据

有没有能满足上面所有需求的系统呢?接下来我们引入Apache Hudi,HUDI代表Hadoop Upserts Deletes and Incrementals。从高层次讲,HUDI允许消费数据库和kafa事件中的变更事件,也可以增量消费其他HUDI数据集中的变更事件,并将其提取到存储在Hadoop兼容,如HDFS和云存储中。在读取方面,它提供3种不同的视图:增量视图,快照视图和实时视图。

HUDI支持2种存储格式:“写时复制”和“读时合并”。

首先来看看写时复制。如下图所示,HUDI管理了数据集,并尝试将一批数据写入数据湖,HUDI维护称为“提交时间轴(commit timeline)”的内容,以跟踪HUDI管理的数据集上发生的操作/更改,它在提交时间轴上标记了一个“inflight”文件,表示操作已开始,HUDI会写2个parquet文件,然后将“inflight”文件标记为已完成,这从原子上使该新数据写入HUDI管理的数据集中,并可用于查询。正如我们提到的,RO视图优化查询性能,并提供parquet的基本原始列存性能,无需增加任何额外成本。
现在假设需要更新另一批数据,HUDI在提交时间轴上标记了一个“inflight”文件,并开始合并这些更新并重写Parquet File1。此时,由于提交仍在进行中,因此用户看不到正在写入任何这些更新(这就是我们称为“快照隔离”)。最终以原子方式发布提交后,就可以查询版本为C2的新合并的parquet文件。

COW已经在Uber投入运行多年,大多数数据集都位于COW存储类型上。

尽管COW服务于我们的大多数用例,但仍有一些因素值得我们关注。以Uber的行程表为例,可以想象这可能是一个很大的表,它在旅程的整个生命周期中获取大量更新。每隔30分钟,我们就会获得一组新旅行以及对旧旅行的一些更新,在Hive上的旅行数据是按天划分分区的,因此新旅行最终会在最新分区中写入新文件,而某些更新会在旧分区中写入文件。使用COW,我们只能重写那些更新所涉及的文件,并且能够高效地更新。由于COW最终会重写某些文件,因此可以像合并和重写该数据一样快。在该用例中通常大于15分钟。再来看另外一种情况,由于某些业务用例(例如GDPR),必须更新大量历史行程,这些更新涉及过去几个月数据,从而导致很高的写入延迟,并一遍又一遍地重写大量数据,写放大也会导致大量的IO。若为工作负载分配的资源不足,可能就会严重损害摄取延迟。

在真实场景中,会将ETL链接在一起来构建数据管道,问题会变得更加复杂。

对问题进行总结如下:在COW中,太多的更新(尤其是杂乱的跨分区/文件)会严重影响提取延迟(由于作业运行时间较长且无法追赶上入流量),同时还会引起巨大的写放大,从而影响HDFS(相同文件的48个版本+过多的IO)。合并更新和重写parquet文件会限制我们的数据的新鲜度,因为完成此类工作需要时间 = (重写parquet文件所花费的时间*parquet文件的数量)/(并行性)。

在COW中,我们实际上并没有太大的parquet文件,因为即使只有一行更新也可能要重写整个文件,因为Hudi会选择写入小于预期大小的文件。

MergeOnRead将所有这些更新分组到一个文件中,然后在稍后的时刻创建一个新版本。对于重更新的表,重写大文件会导致开销变大。

如何解决上述写放大问题呢?除了将更新合并并重写parquet文件之外,我们将更新写入增量文件中,这可以帮助我们降低摄取延迟并获得更好的新鲜度。

将更新写入增量文件将需要在读取端做额外的工作以便能够读取增量文件中记录,这意味着我们需要构建更智能,更智能的读取端。

首先来看看写时复制。如下图所示,HUDI管理了数据集,并尝试将一批数据写入数据湖,HUDI维护称为“提交时间轴(commit timeline)”的内容,以跟踪HUDI管理的数据集上发生的操作/更改,它在提交时间轴上标记了一个“inflight”文件,表示操作已开始,HUDI会写2个parquet文件,然后将“inflight”文件标记为已完成,这从原子上使该新数据写入HUDI管理的数据集中,并可用于查询。正如我们提到的,RO视图优化查询性能,并提供parquet的基本原始列存性能,无需增加任何额外成本。

现在需要进行第二次更新,与合并和重写新的parquet文件(如在COW中一样)不同,这些更新被写到与基础parquet文件对应的增量文件中。RO视图继续查询parquet文件(过时的数据),而RealTime View(Snapshot query)会合并了parquet中的数据和增量文件中的更新,以提供最新数据的视图。可以看到,MOR是在查询执行时间与较低摄取延迟之间的一个权衡。

那么,为什么我们要异步运行压缩?我们实现了MERGE_ON_READ来提高数据摄取速度,我们希望尽快摄取较新的数据。而合并更新和创建列式文件是Hudi数据摄取的主要耗时部分。

因此我们引入了异步Compaction步骤,该步骤可以与数据摄取同时运行,减少数据摄取延迟。

Hudi将事务引入到了大规模数据处理中,实际上,我们是最早这样做的系统之一,最近,它已通过其他项目的类似方法获得了社区认可。

Hudi支持多行多分区的原子性提交,Hudi维护一个特殊的文件夹.hoodie,在该文件夹中记录以单调递增的时间戳表示的操作,Hudi使用此文件夹以原子方式公开已提交的操作;发生的部分故障会透明地回滚,并且不会影响读者和后面的写入;Hudi使用MVCC模型将读取与并发摄取和压缩隔离开来;Hudi提交协议和DFS存储保证了数据的持久写入。

下面介绍Hudi在Uber的使用情况

Hudi管理了超过150PB数据湖,超过10000张表,每天摄入5000亿条记录。

接着看看Hudi如何替代分析架构。利用Hudi的upsert原语,可以在摄取到数据湖中时实现<5分钟的新鲜度,并且能继续获得列式数据的原始性能(parquet格式),同时使用Hudi还可以获得实时视图,以5-10分钟的延迟提供dashboard,此外HUDI支持的增量视图有助于长尾效应对数据集的突变。

为方便用户能快速使用Hudi,Hudi提供了一些开箱即用的工具,如HoodieDeltaStreamer,在Uber内部,HoodieDeltaStreamer用来对全球网络进行近实时分析,可用来消费DFS/Kafka中的数据。

除了DeltaStreamer,Hudi还集成了Spark Datasource,也提供了开箱即用的能力,基于Spark,可以快速构建ETL管道,同时也可无缝使用Hudi + PySpark。

接着介绍更高级的原语和特性。

如何从损坏的数据中恢复?例如线上由于bug导致写入了不正确的数据,或者上游系统将某一列的值标记为null,Hudi也可以很好的处理上述场景,可以将表恢复到最近的一次正确时间,如Hudi提供的savepoint就可以将不同的commit保存起来,用于后续恢复,注意MoR表暂时不支持savepoint;Hudi还提供了文件的版本号,即可以保存多个版本的文件,这对于CoW和MoR表都适用,但是会占用一些存储空间。

Hudi还提供便于增量ETL的高级特性,通过Spark/Spark便可以轻松增量拉取Hudi表的变更。

除了增量拉取,Hudi也提供了时间旅行特性,同样通过Spark/Hive便可以轻松查询指定版本的数据,其中对于Hive查询中指定hoodie.table_name.consume.end.timestamp也马上会得到支持。

下面看看对于线上的Hudi Spark作业如何调优。

下面列举了几个调优手段,设置Kryo序列化器,使用Shuffle Service,利用开源的profiler来进行内存调优,当然Hudi也提供了Hudi生产环境的调优配置,可参考【调优 | Apache Hudi应用调优指南】

下面介绍社区正在进行的工作,敬请期待。

即将发布的0.6.0版本,将企业中存量的parquet表高效导入Hudi中,与传统通过Spark读取Parquet表然后再写入Hudi方案相比,占用的资源和耗时都将大幅降低。以及对于查询计划的O(1)时间复杂度的处理,新增列索引及统一元数据管理以消除对DFS的文件list操作。

还有一些值得关注的特性,比如支持行级别的索引,该功能将极大降低upsert的延迟;异步数据clustering以优化存储和查询性能;支持Presto对MoR表的快照查询;Hudi集成Flink,通过Flink可将数据写入Hudi数据湖。

整个分享就介绍到这里,欢迎观看。

相关文章