睿治

智能数据治理平台

睿治作为国内功能最全的数据治理产品之一,入选IDC企业数据治理实施部署指南。同时,在IDC发布的《中国数据治理市场份额,2022》报告中,蝉联数据治理解决方案市场份额第一。

变革?数据湖替代数据仓库

时间:2022-01-14来源:玳瑁浏览数:352

目录

一.数据湖的起源

1.1.提出背景

1.2.出现问题

1.3.解决问题

1.4.提出者说

二. 开源厂商解决方案

2.1.背景

2.2.Delta

2.2Apache Hudi

2.3.Apache Iceberg

2.4.痛点小结

三.开源厂商维度对比

3.1.ACID和隔离级别支持

3.2.Schema变更支持和设计

3.3.流批接口支持

3.4.接口抽象程度和插件化

3.5.查询性能优化

3.6.其他功能

3.7.社区现状

3.8.总结

3.9.开源组件差异

3.9.1.delta特性

3.9.2.Iceberg特性

3.9.3.Hudi特性

3.9.4.hive特性

3.10.数据湖选型

四.数据仓库和数据湖

4.1.数据仓库的痛点

4.1.1资源浪费

4.1.2字段变更

4.1.3数据结构

4.1.4字段限制

4.2数据湖的理念

4.2.1功能

4.2.2理念

4.3特性对比

五.实时数仓和数据湖

5.1.实时数仓痛点

5.1.1没有加工逻辑

5.2.2.分层逻辑

5.2.3.合并成本高

5.2数据湖解决

六. 架构和数据湖

6.1数据仓库架构

6.2实时数仓架构

七.大数据平台与数据湖

八. Apache Hudi原理介绍

8.1使用场景

8.1.1近实时写入

8.1.2近实时分析

8.1.3增量 pipeline

8.1.4增量导出

8.2概念/术语

8.2.1Timeline

8.2.2Time

8.2.3文件管理

8.2.4 File Format

8.2.5 Table 类型

8.2.6数据写

8.2.7工具

8.2.8 Key 生成策略

8.2.9删除策略

8.2.10数据读

8.2.11 Compaction

8.3总结

      一.数据湖的起源 

      1.1.提出背景

      当我们学一样东西,不深究其源头,可能永远也掌握不了最本质的东西,那我们看看一看数据湖。数据湖最早是由Pentaho的创始人兼CTO, James Dixon于2010年10月纽约Hadoop World大会上提出来。在Pentaho刚刚发布了Hadoop的第一个版本这样的一个大背景下,当时James Dixon提出数据湖的概念,是为了推广自家Pentaho产品以及Hadoop生态相关组件。

      1.2.出现问题

      了解Pentaho的小伙伴都知道,他是一家做BI产品的。当时的BI分析主要是基于数据市场(Data Mart)的。数据市场的建立需要事先识别出感兴趣的字段、属性,并对数据进行聚合处理。这样BI分析面临两个问题:

      1.只使用一部分属性,这些数据只能回答预先定义好(pre-determined)的结构对应的数据分析问题。

      2.数据集市的数据被聚合,低层级的明细数据丢失了,能回答和探索分析的问题被限制。

      1.3.解决问题

       这个时候,James Dixon提出来把所有原始数据都存在Hadoop中,后面根据需要再来取用,就可以解决这个问题,所以数据湖由此诞生。有人这样说,如果说数据集市、数据仓库里面是瓶装的水,那么它就是清洁的、打包好的、摆放整齐方便取用的;如果数据湖里面就是原生态的水,那么它就是未经处理的,原汁原味的。数据湖中的水从源头流入湖中,所有用户都可以来湖里获取、蒸馏提纯这些水(数据),获取想要的数据,探索数据价值。由此,并引起了国内外数据行业的小伙伴的普遍关注和研究。

      1.4提出者说

      如下原话:

      一个数据湖不是建立在hadoop上的一个数据仓库。它可以建立在很多存储系统上,意味着有很多数据湖,这些数据间是可以相互关联的。(英语好,可以访问https://jamesdixon.wordpress.com/2014/09/25/data-lakes-revisited/)所以,这里提到,也是我们经常说的,企业有很多信息系统,将数据统一采集到数据仓库解决数据孤岛的问题。那么数据湖一提出来,我们来看看各个开源厂商的动作。

      二.开源厂商解决方案 

      2.1背景

      三大开源数据湖方案分别为:Delta、Apache Iceberg、Apache Hudi。Apache Spark背后商业公司Databricks在商业化上取得巨大成功,所以推出的Delta也显得格外亮眼,分为企业版和开源版本。Apache Hudi是由Uber的工程师为满足其内部数据分析的需求而设计的数据湖项目,它提供的fast upsert/delete以及compaction等功能可以说是精准命中广大人民群众的痛点,加上项目各成员积极地社区建设,包括技术细节分享、国内社区推广等等,也在逐步地吸引潜在用户的目光,开源。Apache Iceberg当前看则会显得相对平庸一些,简单说社区关注度暂时比不上Delta,功能也不如Hudi丰富,但却是一个野心勃勃的项目,因为它具有高度抽象和非常优雅的设计,为成为一个通用的数据湖方案奠定了良好基础,开源项目。其实我们更关心的是他们解决什么问题和痛点,怎么使用的,为什么会有出现这些开源的组件(特别强调:数据湖不是一个技术,而是一个理念,类似数据仓库)。

      2.2.Delta

      以Databricks推出的delta为例,它要解决的核心问题基本上集中在下原如图 :

      在没有delta数据湖之前,Databricks的客户一般会采用经典的lambda架构来构建他们的流批处理场景。

      以用户点击行为分析为例,点击事件经Kafka被下游的Spark Streaming作业消费,分析处理(业务层面聚合等)后得到一个实时的分析结果,这个实时结果只是当前时间所看到的一个状态,无法反应时间轴上的所有点击事件。所以为了保存全量点击行为,Kafka还会被另外一个Spark Batch作业分析处理,导入到文件系统上(一般就是parquet格式写HDFS或者S3,可以认为这个文件系统是一个简配版的数据湖),供下游的Batch作业做全量的数据分析以及AI处理等。

      第一、批量导入到文件系统的数据一般都缺乏全局的严格schema规范,下游的Spark作业做分析时碰到格式混乱的数据会很麻烦,每一个分析作业都要过滤处理错乱缺失的数据,成本较大。

      第二、数据写入文件系统这个过程没有ACID保证,用户可能读到导入中间状态的数据。所以上层的批处理作业为了躲开这个坑,只能调度避开数据导入时间段,可以想象这对业务方是多么不友好;同时也无法保证多次导入的快照版本,例如业务方想读最近5次导入的数据版本,其实是做不到的。

      第三、用户无法高效upsert/delete历史数据,parquet文件一旦写入HDFS文件,要想改数据,就只能全量重新写一份的数据,成本很高。事实上,这种需求是广泛存在的,例如由于程序问题,导致错误地写入一些数据到文件系统,现在业务方想要把这些数据纠正过来;线上的MySQL binlog不断地导入update/delete增量更新到下游数据湖中;某些数据审查规范要求做强制数据删除,例如欧洲出台的GDPR隐私保护等等。

      第四、频繁地数据导入会在文件系统上产生大量的小文件,导致文件系统不堪重负,尤其是HDFS这种对文件数有限制的文件系统。

      所以,在Databricks看来,以下四个点是数据湖必备的:

      事实上,  Databricks在设计delta时,希望做到流批作业在数据层面做到进一步的统一(如下图)。业务数据经过Kafka导入到统一的数据湖中(无论批处理,还是流处理),上层业务可以借助各种分析引擎做进一步的商业报表分析、流式计算以及AI分析等等。

      现在的基于数据湖的架构图:

      所以,总结起来,我认为databricks设计delta时主要考虑实现以下核心功能特性:

      2.2Apache Hudi

      Uber的业务场景主要为:将线上产生的行程订单数据,同步到一个统一的数据中心,然后供上层各个城市运营同事用来做分析和处理。

      在2014年的时候,Uber的数据湖架构相对比较简单,业务日志经由Kafka同步到S3上,上层用EMR做数据分析;线上的关系型数据库以及NoSQL则会通过ETL(ETL任务也会拉去一些Kakfa同步到S3的数据)任务同步到闭源的Vertica分析型数据库,城市运营同学主要通过Vertica SQL实现数据聚合。当时也碰到数据格式混乱、系统扩展成本高(依赖收Vertica商业收费软件)、数据回填麻烦等问题。后续迁移到开源的Hadoop生态,解决了扩展性问题等问题,但依然碰到Databricks上述的一些问题,其中最核心的问题是无法快速upsert存量数据。

      如上图所示,ETL任务每隔30分钟定期地把增量更新数据同步到分析表中,全部改写已存在的全量旧数据文件,导致数据延迟和资源消耗都很高。

      此外,在数据湖的下游,还存在流式作业会增量地消费新写入的数据,数据湖的流式消费对他们来说也是必备的功能。所以,他们就希望设计一种合适的数据湖方案,在解决通用数据湖需求的前提下,还能实现快速的upsert以及流式增量消费。

     Uber团队在Hudi上同时实现了Copy On Write和Merge On Read的两种数据格式,其中Merge On Read就是为了解决他们的fast upsert而设计的。简单来说,就是每次把增量更新的数据都写入到一批独立的delta文件集,定期地通过compaction合并delta文件和存量的data文件。同时给上层分析引擎提供三种不同的读取视角:仅读取delta增量文件、仅读取data文件、合并读取delta和data文件。满足各种业务方对数据湖的流批数据分析需求。

      最终,我们可以提炼出Uber的数据湖需求为如下图,这也正好是Hudi所侧重的核心特性:

      2.3.Apache Iceberg

      Netflix的数据湖原先是借助Hive来构建,但发现Hive在设计上的诸多缺陷之后,开始转为自研Iceberg,并最终演化成Apache下一个高度抽象通用的开源数据湖方案。Netflix用内部的一个时序数据业务的案例来说明Hive的这些问题,采用Hive时按照时间字段做partition,他们发现仅一个月会产生2688个partition和270万个数据文件。他们执行一个简单的select查询,发现仅在分区裁剪阶段就耗费数十分钟。

      他们发现Hive的元数据依赖一个外部的MySQL和HDFS文件系统,通过MySQL找到相关的parition之后,需要为每个partition去HDFS文件系统上按照分区做目录的list操作。在文件量大的情况下,这是一个非常耗时的操作。

      同时,由于元数据分属MySQL和HDFS管理,写入操作本身的原子性难以保证。即使在开启Hive ACID情况下,仍有很多细小场景无法保证原子性。另外,Hive Metastore没有文件级别的统计信息,这使得filter只能下推到partition级别,而无法下推到文件级别,对上层分析性能损耗无可避免。

      最后,Hive对底层文件系统的复杂语义依赖,使得数据湖难以构建在成本更低的S3上。于是,Netflix为了解决这些痛点,设计了自己的轻量级数据湖Iceberg。在设计之初,作者们将其定位为一个通用的数据湖项目,所以在实现上做了高度的抽象。

虽然目前从功能上看不如前面两者丰富,但由于它牢固坚实的底层设计,一旦功能补齐,将成为一个非常有潜力的开源数据湖方案。

      总体来说,Netflix设计Iceberg的核心诉求可以归纳为如下

      2.4.痛点小结

      我们可以把上述三个项目针对的痛点,放到一张图上来看。可以发现标红的功能点,基本上是一个好的数据湖方案应该去做到的功能点。

      三.开源厂商维度对比

      在理解了上述三大方案各自设计的初衷和面向的痛点之后,接下来我们从7个维度来对比评估三大项目的差异。通常人们在考虑数据湖方案选型时,Hive ACID也是一个强有力的候选人,因为它提供了人们需要的较为完善功能集合,所以这里我们把Hive ACID纳入到对比行列中。

      3.1.ACID和隔离级别支持

      这里主要解释下,对数据湖来说三种隔离分别代表的含义:Serialization是说所有的reader和writer都必须串行执行;Write Serialization: 是说多个writer必须严格串行,reader和writer之间则可以同时跑;Snapshot Isolation: 是说如果多个writer写的数据无交集,则可以并发执行;否则只能串行。Reader和writer可以同时跑。综合起来看,Snapshot Isolation隔离级别的并发性是相对比较好的。

       3.2.Schema变更支持和设计

       这里有两个对比项,一个是schema变更的支持情况,我的理解是hudi仅支持添加可选列和删除列这种向后兼容的DDL操作,而其他方案则没有这个限制。另外一个是数据湖是否自定义schema接口,以期跟计算引擎的schema解耦。这里iceberg是做的比较好的,抽象了自己的schema,不绑定任何计算引擎层面的schema。

      3.3.流批接口支持

      目前Iceberg和Hive暂时不支持流式消费,不过Iceberg社区正在issue 179上开发支持。

      3.4.接口抽象程度和插件化

      这里主要从计算引擎的写入和读取路径、底层存储可插拔、文件格式四个方面来做对比。这里Iceberg是抽象程度做得最好的数据湖方案,四个方面都做了非常干净的解耦。delta是databricks背后主推的,必须天然绑定spark;hudi的代码跟delta类似,也是强绑定spark。

      存储可插拔的意思是说,是否方便迁移到其他分布式文件系统上(例如S3),这需要数据湖对文件系统API接口有最少的语义依赖,例如若数据湖的ACID强依赖文件系统rename接口原子性的话,就难以迁移到S3这样廉价存储上,目前来看只有Hive没有太考虑这方面的设计;文件格式指的是在不依赖数据湖工具的情况下,是否能读取和分析文件数据,这就要求数据湖不额外设计自己的文件格式,统一用开源的parquet和avro等格式。这里,有一个好处就是,迁移的成本很低,不会被某一个数据湖方案给绑死。

      3.5.查询性能优化

      3.6.其他功能

      这里One line demo指的是,示例demo是否足够简单,体现了方案的易用性,Iceberg稍微复杂一点(我认为主要是Iceberg自己抽象出了schema,所以操作前需要定义好表的schema)。做得最好的其实是delta,因为它深度跟随spark易用性的脚步。

Python支持其实是很多基于数据湖之上做机器学习的开发者会考虑的问题,可以看到Iceberg和Delta是做的很好的两个方案。出于数据安全的考虑,Iceberg还提供了文件级别的加密解密功能,这是其他方案未曾考虑到的一个比较重要的点。

      3.7.社区现状

      这里需要说明的是,Delta和Hudi两个项目在开源社区的建设和推动方面,做的比较好。Delta的开源版和商业版本,提供了详细的内部设计文档,用户非常容易理解这个方案的内部设计和核心功能,同时Databricks还提供了大量对外分享的技术视频和演讲, 甚至邀请了他们的企业用户来分享Delta的线上经验。

      Uber的工程师也分享了大量Hudi的技术细节和内部方案落地,研究官网的近10个PPT已经能较为轻松理解内部细节,此外国内的小伙伴们也在积极地推动社区建设,提供了官方的技术公众号和邮件列表周报。Iceberg相对会平静一些,社区的大部分讨论都在Github的issues和pull request上,邮件列表的讨论会少一点,很多有价值的技术文档要仔细跟踪issues和PR才能看到,这也许跟社区核心开发者的风格有关。

      3.8.总结

      我们把三个产品(其中delta分为databricks的开源版和商业版)总结成如下图:

      3.9开源组件差异

      如果用一个比喻来说明delta、iceberg、hudi、hive-acid四者差异的话,可以把四个项目比做建房子。

      3.9.1.delta特性

      由于开源的delta是databricks闭源delta的一个简化版本,它主要为用户提供一个table format的技术标准,闭源版本的delta基于这个标准实现了诸多优化,这里我们主要用闭源的delta来做对比。



      Delta的房子底座相对结实,功能楼层也建得相对比较高,但这个房子其实可以说是databricks的,本质上是为了更好地壮大Spark生态,在delta上其他的计算引擎难以替换Spark的位置,尤其是写入路径层面。

      3.9.2.Iceberg特性

      Iceberg的建筑基础非常扎实,扩展到新的计算引擎或者文件系统都非常的方便,但是现在功能楼层相对低一点,目前最缺的功能就是upsert和compaction两个,Iceberg社区正在以最高优先级推动这两个功能的实现。

      3.9.3.Hudi特性

      Hudi的情况要相对不一样,它的建筑基础设计不如iceberg结实,举个例子,如果要接入Flink作为Sink的话,需要把整个房子从底向上翻一遍,把接口抽象出来,同时还要考虑不影响其他功能,当然Hudi的功能楼层还是比较完善的,提供的upsert和compaction功能直接命中广大群众的痛点。

      3.9.4.hive特性

      Hive的房子,看起来是一栋豪宅,绝大部分功能都有,把它做为数据湖有点像靠着豪宅的一堵墙建房子,显得相对重量级一点,另外正如Netflix上述的分析,细看这个豪宅的墙面是其实是有一些问题的。

      3.10.数据湖选型

      根据以上对比,可能更愿意选择Hudi,功能齐全,并且还有各大厂商巨头使用,社区非常活跃,具体问题也可以及时的结局。长远考虑,如果iceberg如果功能慢慢完善,会选择这个。

      四.数据仓库和数据湖。 

      4.1数据仓库的痛点 

      4.1.1 资源浪费

      TB级数据T+1离线数据仓库跑批失败,重跑之后资源浪费

      4.1.2字段变更

      写时模型,字段变更怎么办,重跑耗时,要跑很长的依赖,一旦有一个表错误,后面所有表都会报错。

      4.1.3数据结构

      数据仓库适合,结构化存储;

      而非结构化的图片,音频,视频,媒体等,工业物联网等产生的数据,半结构化的数据存储后处理非常复杂,要写大量udf等做特殊处理。

      4.1.4字段限制

      数据仓库开发经过以上流程,提需求,指标分析,模型设计,到etl开发。如果数据分析师做探索性业务分析,无法随时获取对应字段。

      4.2数据湖的理念 

      4.2.1功能


  • 存储原始数据
  • 灵活的底层存储
  • 完善的数据管理
  • 多种计算模型


      4.2.2理念

      能够存储海量的原始数据, 能够支持任意的数据格式, 有较好的分析和处理能力

      4.3特性对比

      数据湖的数据可以来自所有结构化,半结构化和非结构化数据;表结构也是在用的时候定义等等。

      五.实时数仓和数据湖 

      5.1.实时数仓痛点 

      5.1.1没有加工逻辑

      没有加工逻辑,直接加工入库。优点:简单, 容易开发 缺点:没有模型数据不能复用,浪费资源。

      5.2.2.分层逻辑

      多分层,中间结果基于mq,深度加工入库。


  • 优点:数据模型可以 复用,整体数仓延迟低
  • 缺点:1、Kafka无法支持海量数据存储。2、Kafka无法进行中间模型层的OLAP分析


      5.2.3.合并成本高

      做过数据处理的都有做过这种实时数据和批处理合并,带来的痛苦。

      5.2.数据湖解决

       借助数据湖的思想进行演进,多分层,所有数据落地到数据湖,进行深度加工,不再有这种kafka数据存储量不足的问题,可以随意获取分析等。

      六.架构和数据湖 

      6.1数据仓库架构

      提出数据湖和数据仓库一体。标准主题的数据存放数据仓库,原始结构化数据存放数据湖,有如下特点:

      1.开放性 使用的存储格式是开放式和标准化的(如parquet),并且为各类工具和引擎,包括机器学习和 Python/R库,提供API,以便它们可以直接有效地访问数据

      2.支持从非结构化数据到结构化数据的多种数据类型

      3.BI支持 Lakehouse可以直接在源数据上使用BI工具

      4.支持多种工作负载 包括数据科学、机器学习以及SQL和分析

      5.Schema enforcement and governance(模式实施和治理) 未来能更好的管理元数据,schema管理和治理,不让数据湖变成沼泽地

      6.事务支持 企业内部许多数据管道通常会并发读写数据。对ACID事务的支持确保了多方并发读写数据时的一致性问题

      7.端到端流 为了构建Lakehouse,需要一个增量数据处理框架,例如Apache Hudi。

      那么现在架构:

      6.2实时数仓架构

      基于flink生态构建,可以进行增量消费,批流读写,还可以基于flinkCDC数据统计等。

      七.大数据平台与数据湖

       那么现在的大数据平台是什么样子呢?存储和计算分离。各自评估各自的资源。

      1.计算层可以有flink,spark,hive,persto等;

      2.数据湖加速层是一个存储和计算彻底分离的架构,增加加速层,自然的实现冷热分离,提高读取性能,节省远程访问带宽,如开源组件alluxio;

      3.表的格式化层,把数据文件封装有业务含义的表,如快照,表结构,分区,等事物一致性;

      4.存储层,可以有很多种,如oss,hdfs,s3等等

      八.Apache Hudi原理介绍 

      8.1使用场景 

      8.1.1近实时写入

      减少碎片化工具的使用

      CDC 增量导入 RDBMS 数据

      限制小文件的大小和数量

      8.1.2近实时分析

      相对于秒级存储 (Druid, OpenTSDB) ,节省资源

      提供分钟级别时效性,支撑更高效的查询

      Hudi 作为 lib,非常轻量

      8.1.3增量 pipeline

      区分 arrivetime 和 event time 处理延迟数据

      更短的调度 interval 减少端到端延迟 (小时 -> 分钟) => Incremental Processing

      8.1.4增量导出

      替代部分 Kafka 的场景,数据导出到在线服务存储 e.g. ES

      8.2概念/术语 

      8.2.1Timeline

      Timeline 是 HUDI 用来管理提交(commit)的抽象,每个 commit 都绑定一个固定时间戳,分散到时间线上。在 Timeline 上,每个 commit 被抽象为一个 HoodieInstant,一个 instant 记录了一次提交 (commit) 的行为、时间戳、和状态。HUDI 的读写 API 通过 Timeline 的接口可以方便的在 commits 上进行条件筛选,对 history 和 on-going 的 commits 应用各种策略,快速筛选出需要操作的目标 commit。

      8.2.2Time

      Arrival time: 数据到达 Hudi 的时间,commit time

      Event time: record 中记录的时间

      上图中采用时间(小时)作为分区字段,从 10:00 开始陆续产生各种 commits,10:20 来了一条 9:00 的数据,该数据仍然可以落到 9:00 对应的分区,通过 timeline 直接消费 10:00 之后的增量更新(只消费有新 commits 的 group),那么这条延迟的数据仍然可以被消费到。

      8.2.3文件管理

      8.2.3.1文件版本

      一个新的 base commit time 对应一个新的 FileSlice,实际就是一个新的数据版本。HUDI 通过 TableFileSystemView 抽象来管理 table 对应的文件,比如找到所有最新版本 FileSlice 中的 base file (Copy On Write Snapshot 读)或者 base + log files(Merge On Read 读)。通过 Timeline 和 TableFileSystemView 抽象,HUDI 实现了非常便捷和高效的表文件查找。

      8.2.3.2文件格式

      Hoodie 的每个 FileSlice 中包含一个 base file (merge on read 模式可能没有)和多个 log file (copy on write 模式没有)。每个文件的文件名都带有其归属的 FileID(即 FileGroup Identifier)和 base commit time(即 InstanceTime)。通过文件名的 group id 组织 FileGroup 的 logical 关系;通过文件名的 base commit time 组织 FileSlice 的逻辑关系。HUDI 的 base file (parquet 文件) 在 footer 的 meta 去记录了 record key 组成的 BloomFilter,用于在 file based index 的实现中实现高效率的 key contains 检测。只有不在 BloomFilter 的 key 才需要扫描整个文件消灭假阳。HUDI 的 log (avro 文件)是自己编码的,通过积攒数据 buffer 以 LogBlock 为单位写出,每个 LogBlock 包含 magic number、size、content、footer 等信息,用于数据读、校验和过滤。

      8.2.4 File Format

      8.2.4.1 Index

      Hoodie key (record key + partition path) 和 file id (FileGroup) 之间的映射关系,数据第一次写入文件后保持不变,所以,一个 FileGroup 包含了一批 record 的所有版本记录。Index 用于区分消息是 INSERT 还是 UPDATE。

      8.2.4.2 Index 的创建过程 

      BloomFilter Index

      新增 records 找到映射关系:record key => target partition

      当前最新的数据 找到映射关系:partition => (fileID, minRecordKey, maxRecordKey) LIST (如果是 base files 可加速)

      新增 records 找到需要搜索的映射关系:fileID => HoodieKey(record key + partition path) LIST,key 是候选的 fileID

      通过 HoodieKeyLookupHandle 查找目标文件(通过 BloomFilter 加速)

      Flink State-based Index

      HUDI 在 0.8.0 版本中实现的 Flink witer,采用了 Flink 的 state 作为底层的 index 存储,每个 records 在写入之前都会先计算目标 bucket ID,不同于 BloomFilter Index,避免了每次重复的文件 index 查找。

      8.2.5 Table 类型

Table Type

Supported Query types

Copy On Write

Snapshot Queries + Incremental Queries

Merge On Read

Snapshot Queries + Incremental Queries + Read Optimized Queries

      Copy On Write

      Copy On Write 类型表每次写入都会生成一个新的持有 base file

      (对应写入的 instant time ) 的 FileSlice。

      用户在 snapshot 读取的时候会扫描所有最新的 FileSlice 下的 base file。

      Merge On Read

      Merge On Read 表的写入行为,依据 index 的不同会有细微的差别:

      对于 BloomFilter 这种无法对 log file 生成 index 的索引方案,对于 INSERT 消息仍然会写 base file (parquet format),只有 UPDATE 消息会 append log 文件(因为 base file 总已经记录了该 UPDATE 消息的 FileGroup ID)。

      对于可以对 log file 生成 index 的索引方案,例如 Flink writer 中基于 state 的索引,每次写入都是 log format,并且会不断追加和 roll over。

      Merge On Read 表的读在 READ OPTIMIZED 模式下,只会读最近的经过 compaction 的 commit。

      8.2.6数据写 

      8.2.6.1写操作

      1.UPSERT:默认行为,数据先通过 index 打标(INSERT/UPDATE),有一些启发式算法决定消息的组织以优化文件的大小 => CDC 导入

      2.INSERT:跳过 index,写入效率更高 => Log Deduplication

      1.BULK_INSERT:写排序,对大数据量的 Hudi 表初始化友好,对文件大小的限制 best effort(写 HFile)

      8.2.6.2写流程(UPSERT) 

      Copy On Write

      先对 records 按照 record key 去重

      首先对这批数据创建索引 (HoodieKey => HoodieRecordLocation);通过索引区分哪些 records 是 update,哪些 records 是 insert(key 第一次写入)

      对于 update 消息,会直接找到对应 key 所在的最新 FileSlice 的 base 文件,并做 merge 后写新的 base file (新的 FileSlice)

      对于 insert 消息,会扫描当前 partition 的所有 SmallFile(小于一定大小的 base file),然后 merge 写新的 FileSlice;如果没有 SmallFile,直接写新的 FileGroup + FileSlice

     Merge On Read

      先对 records 按照 record key 去重(可选)

      首先对这批数据创建索引 (HoodieKey => HoodieRecordLocation);通过索引区分哪些 records 是 update,哪些 records 是 insert(key 第一次写入)

      如果是 insert 消息,如果 log file 不可建索引(默认),会尝试 merge 分区内最小的 base file (不包含 log file 的 FileSlice),生成新的 FileSlice;如果没有 base file 就新写一个 FileGroup + FileSlice + base file;如果 log file 可建索引,尝试 append 小的 log file,如果没有就新写一个 FileGroup

     + FileSlice + base file

      如果是 update 消息,写对应的 file group + file slice,直接 append 最新的 log file(如果碰巧是当前最小的小文件,会 merge base file,生成新的 file slice)

      log file 大小达到阈值会 roll over 一个新的

      8.2.6.3写流程(INSERT)

      Copy On Write

      先对 records 按照 record key 去重(可选)

      不会创建 Index

      如果有小的 base file 文件,merge base file,生成新的 FileSlice + base file,否则直接写新的 FileSlice + base file

      Merge On Read

      先对 records 按照 record key 去重(可选)

      不会创建 Index

      如果 log file 可索引,并且有小的 FileSlice,尝试追加或写最新的 log file;如果 log file 不可索引,写一个新的 FileSlice + base file

      8.2.7工具

      DeltaStreamer

      Datasource Writer

      Flink SQL API

      8.2.8 Key 生成策略

      用来生成 HoodieKey(record key + partition path),目前支持以下策略:


  • 支持多个字段组合 record keys
  • 支持多个字段组合的 parition path (可定制时间格式,Hive style path name)非分区表


      8.2.9删除策略

      逻辑删:将 value 字段全部标记为 null

       物理删:

      1.通过 OPERATION_OPT_KEY  删除所有的输入记录

      2.配置 PAYLOAD_CLASS_OPT_KEY = org.apache.hudi.EmptyHoodieRecordPayload 删除所有的输入记录

      3.在输入记录添加字段:_hoodie_is_deleted

      8.2.10数据读 

      8.2.10.1 Snapshot 读

      读取所有 partiiton 下每个 FileGroup 最新的 FileSlice 中的文件,Copy On Write 表读 parquet 文件,Merge On Read 表读 parquet + log 文件

      8.2.10.2 Incremantal 读:

      当前的 Spark data source 可以指定消费的起始和结束 commit 时间,读取 commit 增量的数据集。但是内部的实现不够高效:拉取每个 commit 的全部目标文件再按照系统字段 _hoodie_commit_time_ apply 过滤条件。

      8.2.10.3Streaming 读

      0.8.0 版本的 HUDI Flink writer 支持实时的增量订阅,可用于同步 CDC 数据,日常的数据同步 ETL pipeline。Flink 的 streaming 读做到了真正的流式读取,source 定期监控新增的改动文件,将读取任务下派给读 task。

      8.2.11 Compaction

      没有 base file:走 copy on write insert 流程,直接 merge 所有的 log file 并写 base file

      有 base file:走 copy on write upsert 流程,先读 log file 建 index,再读 base file,最后读 log file 写新的 base file

      Flink 和 Spark streaming 的 writer 都可以 apply 异步的 compaction 策略,按照间隔 commits 数或者时间来触发 compaction 任务,在独立的 pipeline 中执行。

      8.3总结

       通过对写流程的梳理我们了解到 Apache Hudi 相对于其他数据湖方案的核心

      优势:写入过程充分优化了文件存储的小文件问题,Copy On Write 写会一直将一个 bucket (FileGroup)的 base 文件写到设定的阈值大小才会划分新的 bucket;Merge On Read 写在同一个 bucket 中,log file 也是一直 append 直到大小超过设定的阈值 roll over。对 UPDATE 和 DELETE 的支持非常高效,一条 record 的整个生命周期操作都发生在同一个 bucket,不仅减少小文件数量,也提升了数据读取的效率(不必要的 join 和 merge)。0.8.0 的 HUDI Flink 支持了 streaming 消费 HUDI 表,在后续版本还会支持 watermark 机制,让 HUDI Flink 承担 streaming ETL pipeline 的中间层,成为数据湖/仓建设中流批一体的中间计算层。

(部分内容来源网络,如有侵权请联系删除)
立即申请数据分析/数据治理产品免费试用 我要试用
customer

在线咨询