数仓学习几种常见的数据同步方式
数仓学习⼏种常见的数据同步⽅式
⽬录
前⾔
数据仓库的特性之⼀是集成,即⾸先把未经过加⼯处理的、不同来源的、不同形式的数据同步到ODS层,⼀般情况下,这些ODS层数据包括⽇志数据和业务DB数据。对于业务DB数据⽽⾔(⽐如存储在MySQL中),将数据采集并导⼊到数仓中(通常是Hive或者MaxCompute)是⾮常重要的⼀个环节。
那么,该如何将业务DB数据⾼效准确地同步到数仓中呢?
⼀般企业会使⽤两种⽅案:
直连同步
实时增量同步(数据库⽇志解析)
其中直连同步的基本思路是直连数据库进⾏SELECT,然后将查询的数据存储到本地⽂件作为中间存储,最后把⽂件Load到数仓中。这种⽅式⾮常的简单⽅便,但是随着业务的发展,会遇到⼀些瓶颈,具体见下⽂分析。
为了解决这些问题,⼀般会使⽤实时增量的⽅式进⾏数据同步,其基本原理是CDC (Change Data Capture) + Merge,即实时Binlog采集 + 离线处理Binlog还原业务数据这样⼀套解决⽅案。
(1)常见数据同步⽅式
(1.1)直连同步
直连同步是指通过定义好的规范接⼝API和基于动态链接库的⽅式直接连接业务库,⽐如ODBC/JDBC等规定了统⼀的标准接⼝,不同的数据库基于这套标准提供规范的驱动,从⽽⽀持完全相同的函数调⽤和SQL实现。⽐如经常使⽤的Sqoop就是采取这种⽅式进⾏批量数据同步的。
直连同步的⽅式配置⼗分简单,很容易上⼿操作,⽐较适合操作型业务系统的数据同步,但是会存在以下问题:数据同步时间:随着业务规模的增长,数据同步花费的时间会越来越长,⽆法满⾜下游数仓⽣产的时间要求。
性能瓶颈(关键):直连数据库查询数据,对数据库影响⾮常⼤,容易造成慢查询,如果业务库没有采取主备策略,则会影响业务线上的正常服务,如果采取了主备策略,虽然可以避免对业务系统的性能影响,但当数据量较⼤时,性能依然会很差。
(1.2)实时增量同步(⽇志解析)
所谓⽇志解析,即解析数据库的变更⽇志,⽐如MySQL的Binlog⽇志,Oracle的归档⽇志⽂件。通过读取这些⽇志信息,收集变化的数据并将其解析到⽬标存储中即可完成数据的实时同步。这种读操作是在操作系统层⾯完成的,不需要通过数据库,因此不会给源数据库带来性能上的瓶颈。
数据库⽇志解析的同步⽅式可以实现实时与准实时的同步,延迟可以控制在毫秒级别的,其最⼤的优势就是性能好、效率⾼,不会对源数据库造成影响,⽬前,从业务系统到数据仓库中的实时增量同步,⼴泛采取这种⽅式。当然,这种⽅式也会存在⼀些问题,⽐如批量补数时造成⼤量数据更新,⽇志解析会处理较慢,造成数据延迟。除此之外,这种⽅式⽐较复杂,投⼊也较⼤,因为需要⼀个实时的抽取系统去抽取并解析⽇志,下⽂会对此进⾏详细解释。
如上图所⽰架构,在直连同步基础之上增加了流式同步的链路,经过流式计算引擎把相应的 Binlog 采集到 Kafka,同时会经过⼀个 Kafka 2Hive 的程序把它导⼊到原始数据,再经过⼀层 Merge,产出下游需要的 ODS 数据。
上述的数据集成⽅式优势是⾮常明显的,把数据传输的时间放到了 T+0 这⼀天去做,在第⼆天的时候只需要去做⼀次 merge 就可以了。⾮常节省时间和计算资源。
新房税费两种数据同步⽅式⽐较:
(2)流式数据集成实现
实现思路
⾸先,采⽤Flink负责把Kafka上的Binlog数据拉取到HDFS上,⽣成增量表。
然后,对每张ODS表,⾸先需要⼀次性制作快照(Snapshot),把MySQL⾥的全量数据读取到Hive上,这⼀过程底层采⽤直连MySQL 去Select数据的⽅式,可以使⽤Sqoop进⾏⼀次性全量导⼊,⽣成⼀张全量表。
最后,对每张ODS表,每天基于全量数据和当天增量产⽣的Binlog做Merge,从⽽还原出业务数据。
Binlog是流式产⽣的,通过对Binlog的实时采集,把部分数据处理需求由每天⼀次的批处理分摊到实时流上。⽆论从性能上还是对MySQL 的访问压⼒上,都会有明显地改善。Binlog本⾝记录了数据变更的类型(Insert/Update/Delete),通过⼀些语义⽅⾯的处理,完全能够做到精准的数据还原。
关于Binlog解析部分,可以使⽤canal⼯具,采集到Kafka之后,可以使⽤Flink解析kafka数据并写⼊到HDFS上,解析kafka的数据可以使⽤Flink的DataStreamAPI,也可以使⽤FlinkSQL的canal-json数据源格式进⾏解析,使⽤FlinkSQL相对来说是⽐较简单的。下⾯是canal-json格式的kafka数据源。
CREATE TABLE region (通信工程就业方向及前景
id BIGINT,
region_name STRING
)WITH(
'connector'='kafka',
'topic'='mydw.base_region',
'properties.bootstrap.servers'='kms-3:9092',
'up.id'='testGroup',
'format'='canal-json',
'de'='earliest-offset'
);
数据解析完成之后,下⾯的就是合并还原完整数据的过程,关于合并还原数据,⼀种⽐较常见的⽅式就是全外连接(FULL OUTER JOIN)。具体如下:
⽣成增量表与全量表的Merge任务,当天的增量数据与昨天的全量数据进⾏全外连接,该Merge任务的基本逻辑是:
INSERT OVERWRITE TABLE user_order PARTITION(ds='20211012') SELECT CASE WHEN n.id IS NULL THEN o.id
ELSE n.id
END
,
广东旅游景点介绍
CASE WHEN n.id IS NULL ate_time
ate_time
END
,CASE WHEN n.id IS NULL dified_time
dified_time
END
,CASE WHEN n.id IS NULL THEN o.user_id
ELSE n.user_id
END
,CASE WHEN n.id IS NULL THEN o.sku_code
ELSE n.sku_code
佟丽娅演过的电视剧END
,CASE WHEN n.id IS NULL THEN o.pay_fee
ELSE n.pay_fee物化地可以报什么专业
END
FROM(
SELECT*
FROM    user_order_delta
WHERE  ds ='20211012'
AND    id IS NOT NULL
AND    user_id IS NOT NULL
) n
FULL OUTER JOIN(-- 全外连接进⾏数据merge
SELECT*
FROM    user_order
WHERE  ds ='20211011'
AND    id IS NOT NULL
AND    user_id IS NOT NULL
) o
ON      o.id = n.id
AND    o.user_id = n.user_id
;
办公耗材有那些
经过上述步骤,即可将数据还原完整。
以上内容仅供参考学习,如有侵权请联系我删除!
如果这篇⽂章对您有帮助,左下⾓的⼤拇指就是对博主最⼤的⿎励。您的⿎励就是博主最⼤的动⼒!

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。