睿治

智能数据治理平台

睿治作为国内功能最全的数据治理产品之一,入选IDC企业数据治理实施部署指南。同时,在IDC发布的《中国数据治理市场份额,2022》报告中,蝉联数据治理解决方案市场份额第一。

陈玉兆:基于Flink Hudi的增量ETL架构

时间:2022-05-24来源:薏米帮浏览数:193

大数据领域的数据类型主要分两块,第一块是没有 upsert 的 append 日志流,主要记录业务上的一些 record 或者从 log server 上发过来的数据。

分享嘉宾:陈玉兆 阿里巴巴 技术专家编辑整理:许友昌 中控集团出品平台:DataFunTalk导读:大家好,我是来自阿里巴巴计算平台事业部 SQL 引擎组的玉兆,我们团队之前主要负责 Apache Flink sql 模块的开发,过去半年我的主要工作是 Flink 与 Hudi 的集成,借此机会跟大家分享一下 Flink 与 Hudi 的集成工作,Hudi在数据湖方面的发展方向。今天的介绍包括以下几大方面内容:

数仓到数据湖

数据库入仓湖

HUDI 核心

Flink HUDI Inc ETL

01数仓到数据湖

图一 从数仓到数据湖的发展

1. 发展历史

近两年数据湖是一个比较火的技术,从传统的数仓到数据湖,在过去 5 年里架构演变得非常迅速。在 2015 年之前提到数仓我们想到的都是一些非常专业的数据公司,像 Teradata、Vertica 做的类似 MPP 架构的数据库,它的模型基本是存储与计算耦合在一起,format 是封闭的,后期的维护也处于比较封闭的状态,暴露给外界的接口也没有那么丰富。2015年到2018年随着云厂商的兴起,像 EMR、Amazon、Redshift 等云上数仓,特色是将传统的垂直架构改成了分层的存储计算分离的水平架构,尽量利用云上廉价存储的优势,利用对象存储 s3、oss来降低成本,同时支持海量数据的计算能力。但是它们的 format 仍然是封闭的,会定制自己的 format 来做一些深度的优化,下游查询引擎也比较单一。从 2018 年开始到目前,伴随着云服务的逐渐流行,数据湖技术渐渐兴起。数据湖技术目前主要有 Hudi、Iceberg、Dalta Lake。

2. 为什么需要数据湖?

数据湖相比原有的数仓更加灵活,它并不是一个 server,而是一个类似 table format 的概念。它定义了 table 的一些规范以及 format 的操作规范,可以操作云服务上底层的对象存储,所以可以和云服务很好地结合起来,下游对接的查询引擎也非常丰富,如 presto, sparkSQL, hive等。同时它的 format 本身也是非常开放的,像列式存储有 orc, parquet,行式存储有 avro 这些标准的数据格式,为下游生态的对接提供了丰富的可能性。这样以一种 table format 的形式暴露给下游,不管是运维还是开发,基本都是透明的,所以对于自建集群与开源生态来说,数据湖是很受欢迎的一种形式。

在云服务上去解决传统数仓处理的业务问题,那么在数据湖上也必须要具备事务、upsert 等能力。推动架构的演变的是我们希望把数仓上的操作原语能够在数据湖上支持起来,这样湖仓一体的架构才能支持后续业务的发展。所以数据湖需要解决的核心问题,第一是事务,第二是 upsert 能力。综合这两块,目前 hudi 在目前的数据湖框架里是做的最成熟的,提供的事务模型是快照级别,初步实现了海量数据 upsert 以及事务的管理能力。

02数据库入仓/湖

大数据领域的数据类型主要分两块,第一块是没有 upsert 的 append 日志流,主要记录业务上的一些 record 或者从 log server 上发过来的数据。第二块是业务库发过来的数据,如交易订单,这种数据有变更有状态。在没有流式框架入湖之前,我们通常会使用离线批量调度的架构,每天定时取数据。比如使用的是 mysql ,就会定时去 mysql 全量地拉一次数据,做一个小时级别的队列,每个小时通过 MySQL binlog 或其他的手段采集增量数据,然后做一个天级别的 merge,这样来实现数据库到数仓的同步。

图二 数据库数据写入仓湖

最近兴起的流批一体的架构,像debezium、canal 通过订阅 MySQL binlog 事件的方式将增量数据近实时地导入数仓之中,这就要求下游数据库本身有 upsert 语义,而 hudi 提供了这样的能力,并且是目前做得比较成熟的,因此 hudi 可以使用这两种途径至少在 ODS 层进行近实时的数据库数据入湖:先使用debezium 采集 binlog,在使用 flink cdc connector 直接对接,flink cdc connector 具有 snapshot 再加增量消费的能力,可以直接向下游拥有 upsert 的数据湖(如hudi)进行同步,不需要再去接一层 kafka 就可以做到分钟级别的入仓入湖。

不过目前的社区版的 flink cdc connector 在 snapshot 阶段,会 load 大量数据到内存,所以在数据量大的情况下会有一些瓶颈。如果是历史数据特别多,比如上亿级别,一次性的历史加增量导入数据,目前推荐图二下面的一种架构。先通过 debezium 等工具把数据库里的数据同步到 kafka,统一维护中间的增量加全量消息,再去接入 flink 实时的导入数据湖中。

这样的好处是留给 flink 一定的 buffer 能力,如果中间出现一下 failover 等状况断点续接起来比较方便。图二第一种架构在全量同步阶段如果想要做到断点续接,目前需要改一些代码,所以实施起来不是那么方便。相比之下第二种的扩展性会更好一些。当然在后续的商业版本中,flink cdc connector 会得到改进,像目前的单并发问题、断点续接问题,在商业版本都会解决。

通过这样的两条链路我们可以将数据库的快照一分钟级别同步到数仓,在下游 hudi 还可以向 hive 的 metastore 中同步一份 meta,这样我们就可以在 hive 中查询到比如 5 分钟、10分钟新鲜度的数据。当然也可以接一些 OLAP 引擎如 presto。

03HUDI 核心

1. Timeline

Hudi 是一个 table format,虽然它是一种没有 server 的数据格式,但内部是有状态的。它的第一个状态就是 Timeline,我们在 Hudi 数据湖上所做的所有的动作,比如写入compaction、clean、rollback等,都有一个唯一的事件时间戳,hudi 把这样的时间戳抽象为 Instant 代表某个时间上的一个动作,一次行为的记录,一个 instant 包括三个部分:

Action:动作的类型 commit, delta_commit, clean, compaction, rollback, savepoint;

Time:instant的事件时间,一个事件时间唯一标识了一个instant;

State:每个action的状态,包括requested, inflight, completed。

图三 Hudi Timeline

Action 代表动作的类型,包括:

commit (copy on write 模式下写入产生的动作);

delta_commit(merge on read 模式下写入产生的动作);

clean(对历史 commit 数据的定期清理)。

每一个最新的快照都记住了当前所有数据,因此没有必要记录所有commit ,这样一方面可以减少存储上小文件的压力,另一方面对 hudi 本身的视图,比如对文件的 scan 也是一种优化;compaction 是 merge on read 模式下面的一个优化动作,写入的 log 文件在查询时并不是那么高效,compaction 是一个定期触发的策略,会把 avro 格式的 log 文件定期的压缩成 parquet 格式;rollback 是对写入动作失败产生的脏文件进行清理,是对这些文件做一次回滚动作,即在下一个写入动作执行之前先执行一次 rollback,把上一次产生的脏文件清理掉;savepoint 是做一些历史快照,一旦 transaction 做了 savepoint 操作,它就会永远的记录在数据湖当中。clean 动作会忽略掉这些 savepoint。

Time 代表 instant 的事件时间,一个事件时间唯一标识了一个instant。

State 代表 action 的状态,每个 action 有三种状态,requested 代表向数据库发起 action 请求,action 真正执行时的状态是 inflight;action 已经完成的状态是 completed,标志着这个事务已经成功的结束,对于写入动作来说 completed 状态就意味着这份数据拥有一个完整的事务,就可以对下游的 reader 暴露了,即对下游 reader 可见。

通过唯一的 instant time 串联起来的 Timeline 视图,hudi 自己的 reader 可以暴露一个统一、连贯的视图,对文件、meta 的一些管理以及对下游可见性的维护也比较便捷。比如可以灵活的选择当前时间线上应该暴露哪些视图,是最新的快照或者是历史某个snapshot,或者 merge on read 模式下只暴露 parquet 等。

2. FileGrouping

通常我们的 writer 在写的过程中可能会同时写多个文件,这样就可能产生很多小文件,如果使用 hdfs 的话会造成 NameNode 的压力过大,下游 reader 的读取效率也不高。hudi 于是在写入就做了一些抽象,将一个 partition 下的文件按照逻辑上的组织关系,组成了多个 file group。file goup 在 hudi 类似于 hive 中 bucket 的概念,切换一个 file group 的依据是希望这个 file group 不会太小也不会太大,parquet 一般希望是 100 多兆,log 格式的可能会大一些,可能会配置 512MB或 1G。

hudi 划分 file group 的方式是通过文件名中的一段 UUID,一个 UUID 标志一个 file group,例如一个 parquet 文件与 3 个 log 文件有相同的一段 UUID 的话,则属于同一个 file group。file group 中不同时间的提交会生成不同版本的文件,为了减少小文件的问题,新版本的提交可能会对之前的文件进行合并操作。

例如在merge on read 模式下,新版本的 parquet 文件是旧版本的 parquet 文件与新增 log 文件的merge,直到 parquet 文件的大小达到设置的阈值,才会切换到新的 parquet 文件去写。log 的策略要灵活一些,因为像 avro 这种存储是支持 append 操作的,效率会高一些。hudi 在写入的过程中会实时监控当前 partition 中有多少 file group,以及每个 file group 的大小是多少,对于较小的 file group,insert 数据可以继续向里面追加,这样的话都会写到我们期望的大小,这在一定程度上缓解了小文件的问题。

划分 file group 的核心逻辑就是其中新版本的 file slice 的大小,超过了设定的阈值就会划分新的 file group。一个新的 file slice 对应 file group 中的一个新的版本,如果你想要读数据湖里最新的 snapshot 数据,hudi 会去读最新的 file slice。

3. Copy On Write

copy on write 模式会一直写列式的 parquet 文件,新数据过来会被缓存在内存中,构建一个内存的索引,新的数据会和老的 parquet 数据不断的 merge,每次 merge 会形成一个新的 parquet 文件,一个 parquet 文件对应一个 file slice,一个 file slice 对应一个新的数据版本。在hudi 中,你需要定义一个 primary key 和一个 precombine key,合并的逻辑就是根据primary key 和 precombine key(代表数据的版本)在不同的 record 之间做一个比较,默认的策略是 replace策略,新版本的 record 会替换老版本的 record,我们也可以定义自己的 precombine 策略而不使用 replace 策略。

图四 copy on write 模式

新增数据分为两部分,一个是对老数据的更新,就是 upsert 数据,另一部分是 insert 数据。对于upsert 数据写到哪个文件,哪个位置是固定的,因为在 hudi 中一个 file group 维护了一个 primary key 所有版本的记录,这是为了做高效的 upsert 操作,也就是一个主键的所有版本都会 merge 到一个 file group 里最新的 file slice 中,例如 id = 1 的 record,它后续所有的版本都会写入到这一个 file group 中,因为这是一个 merge 的操作,即使后续有多次的变更或删除,也不会改变最新的 file slice 的大小,在最新的 file slice 中一个 primary key 只会保留一条最新的 record。

对于 insert 数据,写到哪里其实无所谓,因为不破坏语义。hudi 写insert数据的依据是小文件策略,比如图中的 file group 还没写到 100 兆,这时就会继续往里写,一直把它写到我希望的大小。这就是 hudi 核心的 upsert 与 insert 数据写入模型。

4. Merge On Read

图五 merge on read 模式

merge on read 模式在 upsert 时首先会根据 primary key 找 file group,把数据写入 file group中,与 copy on write 不同的是,在 upsert 时不需要实时 merge 的过程,它会把增量的数据追加到最新的 file slice 的 log 中,在 hdfs 上可以直接在原文件上追加,这样它的写入效率会比 Copy On Write 高效得多。rollover 的逻辑则与 copy on write 类似,比如 log 写到了设定的大小,就会切换一个新的 file group,但在此之前的 upsert 和 insert 数据都会 append 到 log 文件中。

因为 merge on read 写入的是 avro 格式,在查询上就不会像 parquet 这么高效,后续需要依赖自动的 compaction 或离线定时的 compaction 任务,把 log 文件进一步压缩成 parquet 来实现高效查询。与 copy on write 相比,merge on read 的写入吞吐较高,但是查询效率较低。根据社区的反馈,在 flink 实时写时,由于会自动启动一个 compaction 的 pipeline,会比较吃内存,所以 merge on read 需要配置更多的内存。最近社区也在开发这种离线的 compaction,以独立的 job 来运行,这样就不会干扰写入流程。

5. Flink Write Pipeline( copy on write )

图六 flink hudi copy on write 模式 pipeline

上图是社区版 flink 写操作的一个 pipeline 模型,对应于 copy on write 模式的模型。数据源对接的是 flink sql 的 raw data,使用 raw data 的好处是:上游我们可以选择任意 table 支持的 format,比如 kafka 的 format。pipeline 会将上游的数据转为 raw data,再转成我们的 hudi record,record 会根据我们设置的 primary key 进行 shuffle,shuffle 的目的是为了让下游的一个 task 只看到一个 record 的所有版本,方便把一个 record 的所有版本写到一个 file group 里面。bucket assign 为接收到的 record 分配 file group id,即 uuid,这样 partition 加 uuid 的唯一位置,确定了一个 record 在文件系统上的位置。分配完 file group id 之后,会根据 file group id 进行 shuffle,目的是让一个 write function 只写一个文件,不会出现一个文件被多个 task 去写导致乱序的情况。write function 会有 buffer 的策略,数据 flash 的策略有三种,第一种是单个 buffer 的大小到达阈值(默认 64MB),数据会被 flash 到磁盘;第二种是总 buffer 的大小到达阈值,当前最大的 buffer 数据会被 flash 到磁盘;第三种策略是 checkpoint,一次 checkpoint 触发一次 flash。

最后有一些运维动作,比如清理动作,它们和 Stream Write 之间没有数据的传输,是通过 event 来进行通信的。

当前版本的 Stream Write task 的事务提交是依赖于 coordinator 来实现的,它们之间主要通过 RPC 来完成一些 metadata 的传输,这些 metadata 主要是描述了 Stream Write task 写入了哪个文件,写了多少条记录,coordinator 将 metadata 收集起来最终一起提交,提交成功则表示这次写入成功,下游即可以看到最新的记录。同时 coordinator 还会有一些 meta 的同步动作,比如向 hive 同步 partition、字段变更的一些信息。

04Flink HUDI Inc ETL

图七 hudi flink etl

为了构建经典的数仓模型,传统的方式是通过调度系统按照某种时间策略构建一个定期的 pipeline 任务,依据 pipeline 之间的依赖关系规定触发机制,整体的维护十分复杂。

hudi 因为具有 upsert 的能力,因此我们可以利用 debezium 等工具,通过 flink CDC 加 kafka 将数据库数据近实时的同步到 ODS 层。如果hudi 可以继续将上游数据的变更数据流传到下游,借助 flink CDC 的能力下游可以继续消费这种增量数据,然后在原有状态的基础上继续做增量计算,这就构建了一个分钟级别近实时的增量数仓模型,因此我们对 hudi table format 进行了改动。

对于 copy on write 模式,由于每次更新都是 merge 操作,所以不需要修改,而 merge on read 模式由于 log 是 append 写入的,我们会在 log format 下面增加一个 change flags,包括每次操作的 update before、update after、delete 等这些 change flags,上游数据中只要携带了 change flags,在下游会一直传播下去。flink 利用 change flags 可以将上游的操作在下游全部还原出来,成为当前最新视图。这样在 ADS 层就可以直接对接如 presto、es、mysql 等。该实现在 jira 的 id为 HUDI-1771,预计在 hudi 0.9 版本发布,为了应对更大的数据量和更复杂的模型,还会对 hudi 进行进一步的优化。

05答疑

Q:可以对比一下 hudi 与 iceberg、delta lake 的区别吗?

A:我的理解是,从社区的角度来看,是希望把 hudi 作为一个数据湖平台,而不仅仅是 table format,为此构建了数据湖周边的很多工具,比如数据的自动清理、数据的二次聚合、与其他生态的对接,比如 hive meta 的同步。从 table format 上来看,目前 hudi upsert 能力是最成熟的,更新是最高效的,并且对小文件友好。但是它的写入 pipeline 操作相对其它数据湖方案会比较重一些,在吞吐上有一些劣势。

Q:flink hudi 是比较推荐 copy on write 模式吗?

A:目前从很多公司的反馈来看,是比较推荐 copy on write 模式的,因为它的内存管理比较直观,只在写入那一步 buffer 的内存以及 merge 时的 merge map 使用的内存,因此我们在启动作业时配置内存是十分明确的,job 也更加稳定,如果对吞吐要求不高,推荐使用 copy on write 模式。merge on read 模式我们后面也会做一个离线的 compaction 方案,也会提升它的稳定性。

Q:flink hudi 0.9 版本稳定了吗?

A:copy on write 现在已经很稳定了,merge on read 模式我们把 compaction 剥离出来后也会很稳定。

Q:PrestoDB 支持 MOR 的快照读了吗?

A:最新版的 prestoDB 对两种模式都支持的。




(部分内容来源网络,如有侵权请联系删除)
立即申请数据分析/数据治理产品免费试用 我要试用
customer

在线咨询