睿治

智能数据治理平台

睿治作为国内功能最全的数据治理产品之一,入选IDC企业数据治理实施部署指南。同时,在IDC发布的《中国数据治理市场份额,2022》报告中,蝉联数据治理解决方案市场份额第一。

阿里云万亿级数据集成架构实践

时间:2022-08-17来源:谁给的寂寞浏览数:184

当数据分为冷热数据时,需要把冷数据放到归档数据中,极大节约成本。在数仓场景中,也可以把云上计算好的一些结果数据回流到数据库中,服务于在线数据应用。

导读:阿里云DataWorks数据集成是DataX团队在阿里云上的商业化产品,致力于提供复杂网络环境下、丰富的异构数据源之间高速稳定的数据移动能力,以及繁杂业务背景下的数据同步解决方案。目前已经支持云上近3000家客户,单日同步数据超过3万亿条。DataWorks数据集成目前支持离线50+种数据源,可以进行整库迁移、批量上云、增量同步、分库分表等各类同步解决方案。2020年上线实时同步能力,支持10+种数据源的读写任意组合,提供MySQL,Oracle等多种数据源到阿里云MaxCompute,Hologres等大数据引擎的一键全增量同步解决方案。今天跟大家交流的内容是数据集成DataX的架构,以及在使用数据集成这款产品过程中遇到的一些问题和实践经验。

今天的介绍会围绕下面几点展开:

为什么需要数据集成

阿里云数据集成大事记

阿里云数据集成产品定位和业务支撑情况

阿里云数据集成DataX设计核心思路

阿里云数据集成DataX架构

阿里云数据集成数据同步-核心亮点

阿里云数据集成解决方案系统

阿里云 DataWorks & 数据集成关系

智能实时数仓解决方案示例

01为什么需要数据集成

1. 数据集成的应用场景

首先为什么需要数据集成,在大数据场景下,数据集成主要是解决哪些核心问题,我将其分为以下四个场景:

第一个场景就是搬站上云。搬站上云主要用来对接业务数据库,完成数仓初始化,比如用户需要把自己的云下数据库快速安全地迁移到云上存储并做进一步的业务分析,如线下MySQL、Oracle到云上MaxCompute或者是Hadoop中Hive上,这既包含了一次性全量数据迁移,还包含持续增量数据迁移。

第二个场景是构建实时数仓,做流式的数据汇聚。主要包含应用产生的日志数据,应用产生的消息数据,和应用对接数据库产生的日志数据(数据库的Binlog或者归档日志)。

第三个场景是平台融和,完成云上各个产品之间的数据同步和交换。既包含跨云平台之间的数据同步,比如从AWS、GCP到阿里云;还包含云平台内部之间的数据交换,比如阿里云内部RDS、MaxCompute、OSS、PolarDB、AnalyticDB、Datahub等之间的数据同步。

第四个场景是容灾备份。当数据分为冷热数据时,需要把冷数据放到归档数据中,极大节约成本。在数仓场景中,也可以把云上计算好的一些结果数据回流到数据库中,服务于在线数据应用。

以上是数据集成主要支撑的四个典型的数据场景。

2. 数据集成的角色和地位

数据集成在整个数仓建设中的角色和地位是什么呢?这里我们将数仓开发简单概括为6个模块或步骤,第一个是数据源,里面包含了各种关系型数据库、文件存储、大数据存储、消息队列等。这些异构的数据源可以通过统一的数据集成平台来将异构网络的数据统一抽取到数据仓库平台,在数据仓库平台中完成数据汇聚,进行进一步的数据分析。分析后的结果数据仍可以通过数据集成回流到在线数据库,为在线业务、在线应用提供数据查询。除此之外,还可以利用一个承上启下的产品-数据服务,对接数据应用,制作相关报表、大屏、和应用等。

综上所述,我们知道数据集成在整个数据仓库开发建设过程中起到了非常关键的作用,只有通过数据集成将异构数据源统一汇总到数据仓库中,才有后序的数据分析、数据服务、数据应用等。

02阿里云数据集成大事记

阿里云数据集成正是为了解决数据仓库中这些经典问题而出现的,它是一款有历史渊源的产品。2011年伴随着阿里巴巴数据平台事业部成立,当时推出了DataX的1.0和2.0版本,当时大家可能对于这款产品还没有太多了解。在2014年,伴随着DataX 3.0版本的发布,我们将服务于内部的数据集成产品推向了阿里云(公有云、专有云),开始正式的对外提供服务。并且于2016年,将数据同步引擎DataX开源到社区,助力大数据ETL生态建设。在2018年DataX有了一个比较大的架构升级,将公有云、专有云、阿里内部功能统一建立全新的Data Integration服务,进一步优化了开发效率,节约了运维成本。在2019年,将数据集成能力进行了商业化。在2020年,发布了数据集成的实时同步能力,可以支持数据实时传输,并且提供了一些实时同步场景解决方案。

总之,阿里云数据集成是大数据平台上下云的核心枢纽,可以将不同业务系统中数据相互打通,实现数据自由离线或实时的流动。其致力于提供复杂网络环境下,丰富的异构数据源之间高速稳定的数据移动能力,以及在繁杂业务背景下,提供数据同步解决方案。

03阿里云数据集成产品定位和业务支撑情况1. 阿里云数据集成产品定位

图中是阿里云数据集成产品的核心功能特性列表,大家在自己设计或选型ETL产品的时候,可以着重了解这些点,做选型参考。

首先,离线和实时全覆盖,这款产品既支持传统的离线同步,也支持及时高效的实时同步;

第二点,针对各种复杂网络均做了相应的解决方案和产品化能力,无论数据源在公网、IDC还是VPC等,数据集成都有对应的解决方案;

第三点,因为DataX是一个云上产品,同时需要具备一些安全的管控策略,我们将开发和生产环境隔离,进行数据源权限安全控制;

第四点,得益于这种可扩展性架构,DataX可以支持繁多的异构数据源,离线支持50+种数据源,实时支持10+种数据源读写任意组合,其中涵盖了关系型数据库、MPP、NoSQL、文件储存、消息流等各大种类;

第五点,在大数据场景中往往存在特定场景的通用需求,比如整库迁移、批量上云、增量同步、分库分表、一键实时全增量,我们将这些通用需求抽象为数据集成的解决方案,进一步降低用户使用数据集成门槛;

最后我们拥有一个非常完备的运维监控体系,能够进行流量控制、脏数据控制,能够对任务执行资源组进行监控,另外还可以设置任务告警,支持电话、短信、邮件、钉钉等。

2. 阿里云数据集成业务支撑情况

首先该产品现在拥有数千用户、数万名开发者,每天同步的数据量在PB级别,这些数据是由数千万个任务调度完成,而且该产品已经做到了全球地域部署覆盖,目前已经涵盖了政府、金融、保险、能源、电力、制造、互联网、零售等多个行业,几乎每一个行业都有典型的客户和场景。

04阿里云数据集成DataX设计核心思路

1. 核心设计思路

左侧图是传统的数据ETL工具制作的方法,是一种网状模型,每种储存之间如果交换数据,需分别设计一套数据传输软件。比如在MySQL和FTP之间交换数据,需要实现一个MySQL到FTP的数据同步工具,在MySQL和HDFS之间交换数据,还要实现另一个同步工具,这种方法扩展性非常差。

针对这种场景,DataX做了一个非常核心的抽象,就是右侧图中蓝色的线性模型,每一种数据源统一通过DataX框架做数据交换,当增加一种新的数据源类型同步插件,便天然的具备和DataX已有的其他数据源通道进行数据交换的能力。

下面着重介绍一下具体的实现思路,首先每一种数据存储,比如MySQL,会有一个MySQL的Reader插件和一个Writer插件,Reader插件主要是用来读取MySQL中的数据,读取的模式可以是离线的同步,也可以是实时的Binlog同步,Writer插件主要是用来写入数据。DataX做了两个核心抽象,第一个是接口抽象,不论Reader还是Writer插件均有一套接口规范,只要符合接口规范,就可以纳入平台调度体系,被平台调度和使用。另外一个抽象是数据类型规范,每增加一种新的数据源,只要符合数据交换规范,就可以和平台中已经支持的其他数据源进行交换,以上就是DataX极具扩展性的原因。我们在2016年将数据同步核心同步引擎的一部分DataX开源到了社区,地址为:

https://github.com/alibaba/DataX 。

DataX是数据集成的一个核心同步引擎,是我们同步能力的一部分,今天除了要介绍一些开源的架构之外,还会介绍一些商业版,还没有完全开源的部分功能的核心架构。

2. 离线数据同步原理

首先是离线数据的同步原理,每一个数据同步作业都是一个DataX程序进程,DataX可以连接数据源头读取数据,读取到数据以后写入到目标端。数据源头的数据读取可以基于数据库的JDBC协议,这些在一些关系型数据库或者一些MPP数据库中已经非常常见了;如果源数据系统是Kafka,MQ消息队列等储存形式的话,可以使用数据源对应的SDK进行读写。一个任务在读取的过程中,会发生切片、分发、执行,Reader和Writer之间是一个生产者消费者模型,Reader在读取数据以后会将数据发送到中间的缓存队列,Writer从中间缓存队列捕获数据,并最终由Writer写入到目标储存里面。

3. 实时数据同步原理

其次,介绍一下DataX实时数据同步原理。数据来源主要分为两类,关系型数据库和实时消息流。关系型数据库可以有对应日志,比如MySQL对应的Binlog日志,Oracle的归档日志,或者数据变更捕获cdc。这些日志会被日志抽取捕获到,捕获方式有API和SDK两种方式,然后框架会将捕获到的内容做消息分解处理,分解为insert、update、delete、alter等事件,然后并行地将其存放。我们支持关系型数据库、大数据、实时消息流等多种目标存储方式,如果数据源就是实时消息流的话,我们可以使用消息订阅模式订阅消息,并且将消息同样做分解,最终重放到目标端。其中第一类关系型数据库复杂度还是比较高的,因为包含了数据的增删改和DDL操作,实时消息流则主要是一些追加数据。

05阿里云数据集成DataX架构1. DataX架构模式 

Standalone模式

在了解基础原理之后,向大家介绍一下阿里云数据集成DataX的架构设计,其中最经典的是大家比较熟悉的Standalone模式,Github上开源的即为这种模式。

Standalone模式将整个过程抽象为3个部分:

第一部分是framework框架,框架主要用来提供一些核心的、共性的功能,比如数据的速度控制、脏数据、任务执行指标的收集、汇总和上报,除此之外,框架还具有一定的调度能力;

第二部分是Reader插件部分,Reader插件主要是用来对接数据源头,将源头数据读入其中;

第三部分是Writer写插件部分,每一个目标存储类型都对应一个Writer插件,可以将源头投递过来的数据,写入到不同的目标储存里面。

上面就是一个非常简单的模型,我们将一个数据同步作业称之为一个job,为了最大化的提高任务执行速度和效率,我们会将一个job拆分成多个task任务,每一个task都是一个完整数据同步作业的一个工作子任务,用来完成对应工作期间数据传输和投递,task任务可以被我们调度框架进行调度执行。task是分组的,即TaskGroup。一个TaskGroup可以有多个task子任务,TaskGroup会在worker节点中执行和处理,worker也是单机版本的,这也是Github中开源的经典模式。

② Distribute模式

除了Standalone模式以外,我们在商业版本里面还有一些更强大的能力,首先是分布式模式,刚刚我们看到一个数据传输作业可以被分为多个task子任务,task又可以分组成TaskGroup,这些task会被并发执行。我们将前面讲的调度部分换成分布式调度模型,我们可以将不同的TaskGroup分发到不同的worker节点上面,就可以突破系统的单机瓶颈,在面对海量数据同步的时候,极大地提升数据传输效率。分布式执行集群也可以做到线性扩展,只要数据系统吞吐可以保障,数据同步作业可以随着worker节点的增加而线性增加,并且我们还做了一些异常节点的隔离等对应的一些分布式机制。

③ On Hadoop模式

第三个模式是On Hadoop模式,也是商业化版本的一个能力。当用户已经拥有一个Hadoop执行集群,我们可以将DataX数据传输作业部署在已有的Hadoop集群里面,Hadoop中常见的编程模型是MapReduce,我们可以将DataX拆分的task寄宿在mapper节点和reducer节点中,通过Yarn进行统一调度和管理,通过这种方式我们可以复用已有的Hadoop计算和执行能力。这种模式和开源的Sqoop框架是有一点类似的。

2. 实时同步CheckPoint机制

下面介绍阿里云数据集成DataX关于实时同步的checkpoint机制。checkpoint机制可以保证实时数据传输的稳定和断点续传的能力。

如果你对Flink特别了解的话,这张图会非常熟悉。Flink是阿里团队开源出去的另外一个非常重要的实时计算引擎,DataX框架也借助了Flink的checkpoint机制,比如Flink会定期发送一些barrier事件和消息。我们的Reader Task其实是source,收到barrier以后,会产生snapShotState,并且barrier会传递到Transformer Task,Transformer Task可以用来做数据的转换,Transformer Task收到barrier以后,barrier进一步传递到Writer Task,这个时候的Writer Task其实就是sink,收到barrier之后,会再做一次snapShotState,Writer Task会将我们的数据flush到目标储存。我们会跟踪barrier进度情况,并且根据barrier进度情况,把数据流消费的点位cache缓存下来,并且可以进行持久化存储。当任务出现异常或者进程退出的时候,我们可以继续从上一个cache点继续消费数据,可以保证数据不会被丢弃,不过数据可能会有部分的重复,一般后序的计算引擎可以处理这种情况(幂等写出,最终一致)。

06阿里云数据集成数据同步-核心亮点

1. 阿里云数据集成离线同步-核心亮点

概况一下阿里云数据集成离线同步的核心亮点。主要分为以下四个部分:

第一部分是支持多种类的数据源,DataX支持50+常见数据源,涵盖各种关系型数据库、文件系统、大数据系统、消息系统;

第二部分是解决方案系统,我们为一些数据传输经典问题准备了对应的解决方案,比如支持全量和增量的数据同步,支持整库、批量数据同步、支持分库分表,我们将这些琐碎的功能整合成了产品化的解决方案,直接通过界面操作即可完整复杂的数据传输过程;

第三部分是精细化权限管控能力,可以对数据源权限进行安全控制,并且隔离开发和生产环境;

第四部分DataX支持复杂调度,数据集成与DataWorks深度融合,利用DataWorks强大的调度能力调度我们的数据传输任务。

2. 阿里云数据集成实时同步-核心亮点

再概况下阿里云数据集成实时同步的核心亮点。

DataX是借助插件化机制,对新的数据源支持扩展能力强。

DataX支持丰富多样的数据源,支持星型链路组合,任何一种输入源都可以和任何一种输出源搭配组成同步链路。

DataX支持断点续传,可以实时读取MySQL、Oracle、SQLSever、OceanBase、Kafka、LogHub、DataHub、PolarDB等的数据,可以将数据实时写入到MaxCompute、Hologres、Datahub、Kafka、ElasticSearch等储存系统。

DataX天然具有云原生基因,和阿里云产品融合度非常高。

DataX可以轻松监控运维告警,提供运维大盘、监控报警、FailOver等运维能力,可以监控业务延迟、Failover、脏数据、心跳检查、失败信息,并且支持邮件、电话、钉钉告警通知。

DataX支持一站式解决方案,支持常见数据源整库全增量到MaxCompute、Hologres、ElasticSearch、DataHub等,同时能够满足分库分表,单表、整库多表、DDL消息等复杂场景。

07阿里云数据集成解决方案系统

1. 离线数仓-整库迁移方案

下面将详细介绍一下阿里云数据集成解决方案系统,首先是离线数仓的整库迁移解决方案,我们将数据集成中的一些典型场景,抽象为数据产品解决方案,可以帮助提升用户效率,降低用户使用成本。上图展示出源头数据库中所有的表列表,直接选中需要的表,选择对应的同步方式,比如每日增量或者每日全量,选择分批上传或者整批上传的同步并发配置,就可以上传到MaxCompute中,这种可视化操作可以满足大多离线数据迁移场景。

2. 实时数仓-全增量解决方案

实时数仓的全增量解决方案,可以非常方便的将现有数据库通过简单的配置后,完成存量的全量迁移,以及后续增量的实时同步。支持在目标库中建表、自动建立离线同步任务、自动建立实时任务、自动启动离线任务、自动启动实时任务、自动建立和启动增量和全量的融合任务、全流程的监控和展示,支持子步骤异常重试。通过这种方案,可以让用户不用关注每个全量任务和实时任务的琐碎配置细节。通过这一套解决方案,可以完成整个数据的全量、增量实时数据的同步。

08阿里云DataWorks和数据集成的关系

前面介绍了阿里云数据集成开源和商业架构和能力,接下来介绍一下阿里云DataWorks和数据集成的关系。DataWorks是阿里云提供的一站式开发、数据治理的平台,融合了阿里云、阿里集团12年之久的数据中台、数据治理的实践经验。数据集成是阿里云DataWorks核心的一部分,DataWorks向下支持各种不同的计算和存储引擎,比如阿里大数据计算服务MaxCompute、开源大数据计算平台E-MapReduce、实时计算Realtime Compute、图计算引擎GraphCompute、交互式分析引擎MC-Hologres等,以及支持OSS、HDFS、DB等各种存储引擎。这些不同的计算存储引擎可以被阿里云DataWorks统一管理使用,后面可以基于这些引擎去做整个数据仓库。

DataWorks内部划分为7个模块,最下面是数据集成,可以完成各种模式的数据同步。数据集成之上,是元数据中心,提供统一的元数据服务。任务调度中心可以执行任务调度服务,数据开发方面,不同的存储引擎,比如实时计算和离线计算,其有着不同的开发模式,DataWorks支持离线开发和实时开发。同时DataWorks拥有一套综合数据治理的解决方案,会有一个数据服务模块,统一向上提供数据服务,对接各种数据应用。最后将DataWorks各种能力统一通过OpenAPI对外提供服务。

数据集成模块是可以单独对用户提供服务,单独使用的,并不需要了解和掌握所有DataWorks模块就可以将数据同步作业配置和运行起来。

09智能实时数仓解决方案示例

下面介绍一个智能实时数仓解决方案实例,可以应用在电商、游戏、社交等大数据实时场景中。数据源有结构化数据和非结构化数据,非结构化数据可以通过DataHub数据总线做实时数据采集,之后借助数据集成来实时写到Hologres中做交互式分析,也可以将数据实时写入到MaxCompute中,进行归档和离线数据计算,另外Flink也可以消费订阅数据,做实时数据计算。Flink计算结果同时又可以写入Hologres中,也可以将实时计算结果做实时大屏和实时预警。结构化数据也可以通过实时数据抽取或者批量数据采集方式,统一采集到DataWorks,实时数据可以写入到Hologres或者定期归档到MaxCompute,离线数据可以通过批量数据加工到MaxCompute中来,另外MaxCompute和Hologres可以结合使用,进行实时联邦查询。

上面这套解决方案可以将阿里云实时数仓全套链路与离线数据无缝衔接,满足一套存储、两种计算(实时计算和离线计算)的高性价比组合。

(部分内容来源网络,如有侵权请联系删除)
立即申请数据分析/数据治理产品免费试用 我要试用
customer

在线咨询