睿治

智能数据治理平台

睿治作为国内功能最全的数据治理产品之一,入选IDC企业数据治理实施部署指南。同时,在IDC发布的《中国数据治理市场份额,2022》报告中,蝉联数据治理解决方案市场份额第一。

一文看懂数据中台接口数据采集

时间:2022-05-05来源:邂逅浏览数:818

要实现数据中台,一个最基本的要求就是同步交易系统接口数据。实现接口数据同步的方式主要有3种:全量同步、增量同步、流式数据同步,其中流式数据又分为业务流数据和日志流数据。

要实现数据中台,一个最基本的要求就是同步交易系统接口数据。实现接口数据同步的方式主要有3种:全量同步、增量同步、流式数据同步,其中流式数据又分为业务流数据和日志流数据。

接口数据同步是数据中台的一项重要工作,是在搭建数据中台的过程中需要投入很多精力完成的。虽然单个表的数据同步任务难度不大,但是我们需要在数据中台实现标准化配置,这样才可以提高工作效率,为后续的数据中台运维和持续扩充接口打下良好的基础。

全量接口同步

一般而言,全量接口同步是数据中台必不可少的功能模块。不管是增量数据同步还是流式数据同步,都是在全量接口同步的基础上进行的。

全量接口同步一般针对T+1的业务进行,选择晚上业务低峰和网络空闲时期,全量抽取交易系统的某些业务数据。一般来说,虽然全量接口同步占用时间长,耗费网络宽带高,但是数据抽取过程简单、准确度高,数据可靠性好,因此比较容易进行平台标准化配置。

根据目前的开源生态,我们主要推荐了两种数据同步工具,一个是Kettle,一个是DolphinScheduler集成的DataX。

1.Kettle

对于Kettle,我们一般按照系统+业务模块来划分Kettle数据抽取任务。

第一步,把对应数据库的JDBC驱动都加入到data-integration\lib目录下,然后重新打开Spoon.bat。

第二步,在新创建的转换里面创建DB连接。

在弹出的页面选择对应的数据库,填写相关信息并保存。

针对DB连接设置“共享”,可以在多个Kettle中共享相同的数据库链接信息。

第三步,在Kettle开发视图中拖入一个表输入组件和一个表输出组件。

在表输入组件和表输出组件中分别选择不同的数据库连接,表输入支持选择一张表自动生成SQL语句,也支持手写SQL语句。

表输出组件则支持自动获取表结构和自动生成目标表。通过点击获取字段,即可直接获取表输入查询到的字段信息。

图14-4 Kettle表输出界面

点击SQL,即可在弹出的窗口中看到工具自动生产的建表语句,再点击“执行”,Kettle会自动完成目标表的创建。当然,这个建表语句是比较粗糙的,我们一般需要按照指定的规范来手工创建,需要指定分布键。

第四步,将输入组件和输出组件用线连起来,就组成了一个数据同步任务。

第五步,将上述组件一起复制多份,修改来源表、目标表、刷新字段,即可完成大量的数据同步任务。

第六步,直接点“开始”图标运行数据同步任务或者通过Kettle的左右来调度数据同步任务。

2.DataX

由于DataX数据同步工具本身是没有界面化配置的,因此我们一般会配套安装DataX-web或者DolphinScheduler调度工具。DolphinScheduler集成DataX的配置也很简单,只需要在DolphinScheduler的配置文件中指定DATAX_HOME即可。

在DolphinScheduler后台配置datax任务,这里以MySQL数据源为例,数据流配置如下。

首先在数据源中心配置MySQL数据源。

然后在项目管理里面创建数据流任务,在画布上拉去DataX类型配置第一个任务,选择刚才配置的MySQL数据源。

保存以后,系统就会自动生成数据同步的工作量,将数据流上线,并配置定时调度策略,即可完成数据的定时同步。

增量接口同步

一般来说,数据仓库的接口都符合二八规律,即20%的表存储了80%的数据,因此这20%的表数据抽取特别耗费时间。此时,对于批处理来说,最好的方法是,对于80%数据量较小的表,采用流水线作业的方式,快速生成接口表、接口程序、接口任务,通过全量接口快速抽取、先清空后插入目标表;针对20%数据量较大的表,则需要精耕细作,确定一个具体可行的增量方案。

我认为一般满足以下条件之一就是较大的表:①抽取时间超过10分钟;②单表记录数超过或者接近100万;③接口数据超过1GB。之所以如此定义,是从数据接口的实际情况出发。第一,抽取时间超过10分钟,会影响整体调度任务的执行时间;第二,单表记录数超过100万,则插入数据占用数据库大量的资源,会影响其他任务的插入,降低系统的并发能力;第三,数据传输超过1GB,则需要耗费大量的网络宽带,每天重复一次会增加网络负担。

对于需要做增量的接口表,主要推荐以下两种批处理方案。

方案一:根据数据创建或者修改时间来实现增量

很多业务系统一般都会在表结构上增加创建和修改时间字段,并且存在主键或者唯一键(可以是一个字段,也可以是多个字段组合),同时确保数据不会被物理删除,这种表适合方案一。实际情况是,各大OLTP系统的数据库都可以满足记录创建和修改时间信息的,因此这种方式应用最广泛。

对于创建或者修改时间,MySQL数据库可以在建表时指定字段默认值的方式来生成。

`create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间'

也可以在建表以后通过增加字段的方式来补充。

-- 修改create_time 设置默认时间 CURRENT_TIMESTAMP ALTER TABLE `tb_course`MODIFY COLUMN `create_time` datetime NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间' ;-- 添加update_time 设置 默认时间 CURRENT_TIMESTAMP 设置更新时间为 ON UPDATE CURRENT_TIMESTAMP ALTER TABLE `tb_course`ADD COLUMN `update_time` datetime NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间' ;Oracle数据库默认情况下只能记录创建时间,不能记录修改时间。 --先添加一个date类型的字段alter tabletb_courseaddcreate_timedate; --将该字段默认为系统时间alter tabletb_coursemodifycreate_timedefault sysdate;如果需要记录修改时间,则需要通过触发器或者修改更新语句来实现。触发器的脚本如下。 CREATE OR REPLACE TRIGGERtrig_tb_course afterINSERT OR UPDATE ON tb_course--新增和修改执行后出发,对象目标:tb_course表,执行后触发器对业务交易影响比较小FOR EACH ROW --行级触发器,每影响一行触发一次BEGIN IF INSERTING THEN --插入数据操作 :NEW.createtime := SYSDATE; ELSIF UPDATING then --修改数据操作 :NEW.createtime := SYSDATE; END IF; END;

有了创建或者修改时间以后,每次抽取最近几天(一般建议3天)的数据,则直接在where条件后面加上下面的过滤条件。

--取最近3天插入或者更新的记录where create_time >= cast(date_add(CURRENT_DATE,interval -3 day) as datetime)or update_time >= cast(date_add(CURRENT_DATE,interval -3 day) as datetime)

DataX或者Kettle在抽取数据时直接在SQL语句上加上上述条件即可,数据写入临时表,笔者一般以_incr作为临时表后缀。

抽取到变化的数据以后,将前后数据进行合并即可完成增量数据更新。一般情况下我们可能会采用MERGE INTO的方式进行数据合并,这里推荐先删除后插入的方式。首先,MERGE只有少数数据库支持,虽然Greenplum也支持,但是功能不够完善,语法比较复杂。其次对于大多数数据库而言,删除比更新更快,因此推荐先删除后插入的方式。如果变化的数据不大,可以直接采用删除再插入的方式;如果变化的数据太大,删除的效率太低,则需要借助第三张表来完成数据的合并。先删除后插入的语句示例如下,假设DRP系统的item_info表是一张商品主数据,数据量大,但是变化频率不高,则我们可以通过下面的语句来合并增量数据。

--先删除有过变化的数据delete from ods_drp.ods_drp_item_info twhere exists(select 1 from ods.ods_drp_item_info_incr bwhere t.item_id = b.item_id);--然后插入新抽取过来的数据insert into ods_drp.ods_drp_item_infoselect t.*,current_timestamp() insert_timefrom ods_drp.ods_drp_item_info_incr t;

方案二:增加触发器记录创建或者修改时间来实现增量

对于业务系统,我们经常遇到有些表要么没有创建、修改时间,要么存在记录物理删除的情况,因此无法通过方案一实现增量。结合HANA数据库的特点,我们最后采用了创建触发器来记录业务数据创建、修改时间的方案。

这种方案下,我们需要针对每一张增量接口表,创建一张日志表,包括接口表的主键字段、操作标志、操作时间。每次抽取数据需要用日志表关联业务数据,然后抽取一段时间内新增、修改、删除的记录到数据中台数据库,最后根据操作标志+操作时间对目标表数据进行更新。

本方案虽然看上去对交易系统的侵入性较高,很难被接受,但其实是一个非常好用的增量方案,适合任何场景。首先,触发器是Oracle、DB2、HANA等数据库系统标配的功能,在表上增加after触发器对业务交易影响微乎其微。其次,抽取数据的时间一般都在业务空闲时间,业务表和日志表的关联不会影响正常交易。第三,本方案可以捕捉数据的物理删除操作,可以保证数据同步100%的准确性。

下面,我们以 S/4 HANA的EKPO表为例进行方案解析。首先创建EKPO变更日志表。

--创建EKPO变更日志表,需要包含主键字段和变更标志、变更时间字段CREATE TABLE HANABI.DI_EKPO_TRIG_LOG ( EBELN CHAR(10) , EBELP CHAR(10), FLAG CHAR(5) , INSERT_TIME SECONDDATE );

然后给EKPO表添加触发器。

--INSERT触发器CREATE TRIGGER DI_TRIGGER_EKPO_I AFTER INSERT ON HANADB.EKPOREFERENCING NEW ROW MYNEWROWFOR EACH ROWBEGININSERT INTO HANABI.DI_EKPO_TRIG_LOG VALUES(:MYNEWROW.EBELN, :MYNEWROW.EBELP , 'I' , CURRENT_TIMESTAMP );END;--UPDATE触发器CREATE TRIGGER DI_TRIGGER_EKPO_U AFTER UPDATE ON HANADB.EKPOREFERENCING NEW ROW MYNEWROWFOR EACH ROWBEGININSERT INTOHANABI.DI_EKPO_TRIG_LOG VALUES (:MYNEWROW.EBELN, :MYNEWROW.EBELP , 'U' ,CURRENT_TIMESTAMP ) ;END;--DELETE触发器CREATE TRIGGER DI_TRIGGER_EKPO_D AFTER DELETE ON HANADB.EKPOREFERENCING OLD ROW MYOLDROWFOR EACH ROWBEGININSERT INTOHANABI.DI_EKPO_TRIG_LOG VALUES (:MYOLDROW.EBELN, :MYOLDROW.EBELP , 'D' ,CURRENT_TIMESTAMP );END ;

有了变更日志表以后,用变更日志表关联源表,就可以得到源表新发生的所有增、删、改记录时间。

#查询一段时间内EKPO表新增、修改、删除的记录信息select tr.flag op_flag,tr.insert_time op_time,tb.mandt,tr.ebeln,tr.ebelp,uniqueid,loekz,statu,aedat,matnr,--此处省略其余字段 from HANABI.DI_EKPO_TRIG_LOG tr left join HANADB.ekpo tb on tr.ebeln = tb.ebeln and tr.ebelp = tb.ebelpwhere tr.insert_time BETWEEN to_TIMESTAMP('${start_time}','YYYY-MM-DD-HH24:MI:SS') AND to_TIMESTAMP('${end_time}','YYYY-MM-DD HH24:MI:SS')

记录上次抽取时间的方案可以更加灵活地控制抽取数据的区间。为了抽取的数据不会遗漏,我们一般根据数据量预留10分钟的重叠区间。

首先,我们需要创建增量数据抽取的控制参数表ctl_ods_sync_incr。

字段名 字段类型 字段长度 小数位 是否主键 字段描述
schema_name varchar 40 模式名
table_name varchar 40 表名
last_sysn_time timestamp 6 上次同步时间

然后,我们在抽取脚本中读取和更新抽取日志表。

#!bin/bash#GP的用户名export gpuser="xxxx"#GP的密码export gppass="xxxx"#目标数据库模式名export gp_schema="ods_s4"# 目标数据库表名export gp_table="ods_s4_ekpo_i"# 数据源地址export datasource="s4"#为了避免丢失数据,从上次抽取时间的十分钟前开始抽取数据result=`psql -h gp-master -p 5432 -U ${gpuser} -d ${gppass} << EOF select to_char(last_sync_time + '-10 sec', 'yyyy-mm-dd-HH24:MI:SS') from cfg.ctl_ods_sync_incr where table_name ='ods_s4_ekpo'; EOF`start_time=`echo $result | awk -F' ' '{print $3}'`end_time=$(date "+%Y-%m-%d %H:%M:%S")#输出抽取时间日期echo "now sqoop data from ${start_time} to ${end_time}"export querySql="select tr.flag op_flag,tr.insert_time op_time,tb.mandt,tr.ebeln,tr.ebelp,uniqueid ,loekz,statu,aedat,txz01,matnr,#此处省略其余字段 from HANABI.DI_EKPO_TRIG_LOG tr inner join HANADB.ekpo tb on tr.ebeln = tb.ebeln and tr.ebelp = tb.ebelpwhere tr.insert_time BETWEEN to_TIMESTAMP('${start_time}','YYYY-MM-DD-HH24:MI:SS') AND to_TIMESTAMP('${end_time}','YYYY-MM-DD HH24:MI:SS')"cat>dataxjob.json<{ "job": { "name": "in-$db-$table", "content": [{ "reader": { "name": "hanareader", "parameter": { "dsDatasource" : "$datasource", "jsonLine" : true, "connection": [ {"querySql": ["$querySql"]}] } }, "writer": { "name": "gpdbwriter", "parameter": { "username": $gpuser, "password": $gppass, "preSql": [ "truncate table $gp_schema.$gp_table" ], "column": [{"name": "body","type": "string"}] "segment_reject_limit": 0, "copy_queue_size": 2000, "num_copy_processor": 1, "num_copy_writer": 1, "connection": [ { "jdbcUrl": "jdbc:postgresql://gp-master:5432/dp", "table": [ "$gp_schema.$gp_table" ] } ] } } } ], "setting": { "speed": { "channel": 1 } } }}EOFpython $DATAX_HOME/bin/datax.py --jobid ${system.taskId} dataxjob.json if [ $? -ne 0 ]; then echo "DataX failed,next time try again!"else echo "DataX succeed ,now update cfg.ctl_ods_sync_incr表" psql -h gp-master -p 5432 -U ${gpuser} -d ${gppass} << EOF update cfg.ctl_ods_sync_incr set last_sync_time = to_timestamp('${end_time}', 'YYYY-MM-DD HH24:MI:SS') where table_name ='ods_s4_ekpo'EOFfi

在保证数据抽取过程中不遗漏数据的前提下,我们需要对新抽取到的数据和历史数据进行合并。由于数据可能存在删除和多次修改的情况,我们的数据更新操作会比方案一更加复杂,需要在插入或者删除数据的过程中做一些开窗函数排序取最新的记录,操作语句如下。

--所有存在插入、删除、更新的数据全部从目标表删掉delete from ods_d4.ods_d4_ekpo t where exists (select 1 from ods_d4.ods_d4_ekpo_incr b where b.op_flag in ('I', 'D', 'U') and t.ebeln = b.ebeln and t.ebelp = b.ebelp);--插入最后一次操作不是删除的数据insert into ods_d4.ods_d4_ekposelect mandt,t.ebeln,t.ebelp,uniqueid,loekz,statu,aedat,txz01,matnr,--此处省略其余字段 from (select row_number() over(partition by b.ebeln,b.ebelp order by b.op_time desc,b.op_flag desc) rank_num, b.* from ods_d4.ods_d4_ekpo_incr b where b.op_flag in ('I', 'U' ,'D')) t where t.rank_num = 1 and t.op_flag <>'D';

为什么接口会有这么复杂的逻辑,这是我们多次实践总结出来的经验。这个增量接口方式我们核对和修复了不少于5次漏洞才最终实现准确快速又稳定的增量接口方案。

流式数据同步

通过上节的增量数据接口可以看出,不管是方案一还是方案二,都需要业务系统数据库做出一定的调整(当然,如果某些业务系统数据库已经在设计的时候考虑到创建和更新时间,则不需要修改),一次性抽取大量的数据会对交易数据库产生压力。基于上述原因,对于类MySQL数据库,我们推荐CDC日志的同步方式。其他数据库如果可以满足CDC日志的要求,也可以采用这种方式。

基于CDC日志同步的方案,也称作流式数据同步方案,一个典型的数据采集流程如下图。

第一步,需要对业务数据库进行分析,分析数据库是否支持CDC日志。一般来说,业务数据通常保存在关系型数据库中,从数据库的发展来看,MySQL对CDC日志的支持是最好的。

第二步,需要有DBA权限的管理员开启数据库的CDC日志功能。其中MySQL数据库的CDC功能开启过程如下。

使用命令行工具连接到MySQL数据库所在服务器,执行以下命令以root用户登录数据库。

mysql -uroot -ppassword

其中,password为数据库root用户的密码,可向数据库管理员获取。

执行以下命令,查询MySQL数据库是否开启了Binlog。

show variables like 'log_bin';

若变量log_bin的值为“OFF”,则说明Binlog未开启,继续执行下一步。

若变量log_bin的值为“ON”,则说明Binlog已开启,继续执行以下SQL命令,检查相关参数的配置是否符合要求。

show variables like '%binlog_format%';

show variables like '%binlog_row_image%';

变量binlog_format的值应该为ROW,变量binlog_row_image的值应该为FULL。如果满足要求,直接跳到2,否则继续执行下一步。

执行以下命令编辑MySQL配置文件,然后按“i”进入输入模式。在配置文件中增加如下配置,开启Binlog。

vi /etc/my.cnf

server-id = 123log_bin = mysql-binbinlog_format = rowbinlog_row_image = fullexpire_logs_days = 10gtid_mode = onenforce_gtid_consistency = on

其中,server-id的值应为大于1的整数,请根据实际规划设置,并且在创建数据集成任务时设置的Server Id值需要此处设置的值不同。

expire_logs_days为Binlog日志文件保留时间,超过保留时间的Binlog日志会被自动删除,应保留至少2天的日志文件。

“gtid_mode = on”和“enforce_gtid_consistency = on”仅当MySQL的版本大于、等于5.6.5时才需要添加,否则删除这两行内容。

按“ESC”键退出输入模式,然后输入“:wq”并回车,保存后退出。

执行以下命令重启MySQL数据库。

service mysqld restart

以root用户登录数据库,执行以下命令,查询变量log_bin的值是否为ON,即是否已开启Binlog。

show variables like 'log_bin';

在数据库中执行以下命令创建ROMA Connect连接数据库的用户并配置权限。

CREATE USER 'roma'@'%' IDENTIFIED BY 'password';GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'roma'@'%';

其中,roma为ROMA Connect连接用户名,请根据实际规划设置。password为ROMA Connect连接用户密码,请根据实际规划设置。

如果MySQL数据库版本为8.0,则需要执行以下命令,修改数据库连接用户的密码认证方式(可选)。

ALTER USER roma IDENTIFIED WITH mysql_native_password BY 'password';

其中,roma为2中创建的数据库连接用户名。password为数据库连接用户的密码。

执行以下命令退出数据库连接。

exit;

第三步,通过Canal、MaxWell、Debezium等开源组件读取数据库日志,将数据库转换成JSON对象写入Kafka队列。

第四步,通过Greenplum的GPSS读取Kafka消息或者通过Flink程序(Spark也可以)读取Kafka数据更新写入Greenplum数据库。

随着各个开源组件功能的完善,其中有些步骤已经可以省略了。例如Flink社区开发了flink-cdc-connectors组件,这是一个可以直接从MySQL、PostgreSQL等数据库直接读取全量数据和增量变更数据的source组件。

日志流数据同步

随着大数据技术的发展,日志数据的分析也成为数据中台必不可少的一个功能。一般来说,日志数据会以日志流的方式进入系统。

我们这里所说的日志数据,一般是指Web应用或者手机程序的埋点日志,一般包括用户点击、浏览、鼠标停留、收藏、加购、分享等操作信息。这些操作虽然未产生真正的业务价值,但是可以用来进行用户行为分析和业务促销活动。

典型的用户行为数据采集流程:

第一步,需要前端工程师或者App开发者在Web或者APP应用页面添加埋点信息,搜集用户的操作,并通过异步Post的方式发生给Ngnix服务器。

第二步,Ngnix服务器接收到用户的数据后,以文本格式记录请求参数等信息至access.log。

第三步,Flume实时监控Nginx日志变化,收集并过滤有用日志,之后发送至Kafka。

第四步,埋点数据到达Kafka后,通过Spark程序或者Flink程序完成日志解析,解析完成的数据格式化保存到Greenplum数据库的ODS层。

至此,日志数据采集和解析的工作完成了。

正如文章的章节分布一样,在数据中台实现过程中,最难做也是最费时间的就是增量数据接口。想要实现精准的增量数据快照,不仅有很多前置要求,也有很多数据合并的特殊处理。本文分析的内容适合绝大多数需要做增量接口的场景。而全量接口、实时接口、日志接口大多数情况下主要搭好平台做好配置即可。

(部分内容来源网络,如有侵权请联系删除)
立即申请数据分析/数据治理产品免费试用 我要试用
customer

在线咨询