⼤数据时代,数据实时同步解决⽅案的思考—最全的数据同步总结
1、早期关系型数据库之间的数据同步
1)、全量同步
⽐如从oracle数据库中同步⼀张表的数据到Mysql中,通常的做法就是分页查询源端的表,然后通过 jdbc的batch ⽅式插⼊到⽬标表,这个地⽅需要注意的是,分页查询时,⼀定要按照主键id来排序分页,避免重复插⼊。
2)、基于数据⽂件导出和导⼊的全量同步,这种同步⽅式⼀般只适⽤于同种数据库之间的同步,如果是不同的数据库,这种⽅式可能会存在问题。
3)、基于触发器的增量同步
增量同步⼀般是做实时的同步,早期很多数据同步都是基于关系型数据库的触发器trigger来做的。
使⽤触发器实时同步数据的步骤:
A、基于原表创触发器,触发器包含insert,modify,delete 三种类型的操作,数据库的触发器分Before
和After两种情况,⼀种是在insert,modify,delete 三种类型的操作发⽣之前触发(⽐如记录⽇志操作,⼀般是Before),⼀种是在insert,modify,delete 三种类型的操作之后触发。
B、创建增量表,增量表中的字段和原表中的字段完全⼀样,但是需要多⼀个操作类型字段(分表代表insert,modify,delete 三种类型的操作),并且需要⼀个唯⼀⾃增ID,代表数据原表中数据操作的顺序,这个⾃增id⾮常重要,不然数据同步就会错乱。
C、原表中出现insert,modify,delete 三种类型的操作时,通过触发器⾃动产⽣增量数据,插⼊增量表中。
D、处理增量表中的数据,处理时,⼀定是按照⾃增id的顺序来处理,这种效率会⾮常低,没办法做批量操作,不然数据会错乱。有⼈可能会说,是不是可以把insert操作合并在⼀起,modify合并在⼀起,delete操作合并在⼀起,然后批量处理,我给的答案是不⾏,因为数据的增删改是有顺序的,合并后,就没有顺序了,同⼀条数据的增删改顺序⼀旦错了,那数据同步就肯定错了。
市⾯上很多数据etl数据交换产品都是基于这种思想来做的。
4)、基于时间戳的增量同步
A、⾸先我们需要⼀张临时temp表,⽤来存取每次读取的待同步的数据,也就是把每次从原表中根据
时间戳读取到数据先插⼊到临时表中,每次在插⼊前,先清空临时表的数据
B、我们还需要创建⼀个时间戳配置表,⽤于存放每次读取的处理完的数据的最后的时间戳。
C、每次从原表中读取数据时,先查询时间戳配置表,然后就知道了查询原表时的开始时间戳。
D、根据时间戳读取到原表的数据,插⼊到临时表中,然后再将临时表中的数据插⼊到⽬标表中。
E、从缓存表中读取出数据的最⼤时间戳,并且更新到时间戳配置表中。缓存表的作⽤就是使⽤sql获取每次读取到的数据的最⼤的时间戳,当然这些都是完全基于sql语句在kettle中来配置,才需要这样的⼀张临时表。
2、⼤数据时代下的数据同步
1)、基于数据库⽇志(⽐如mysql的binlog)的同步
我们都知道很多数据库都⽀持了主从⾃动同步,尤其是mysql,可以⽀持多主多从的模式。那么我们是不是可以利⽤这种思想呢,答案当然是肯定的,mysql的主从同步的过程是这样的。
A、master将改变记录到⼆进制⽇志(binary log)中(这些记录叫做⼆进制⽇志事件,binary log events,可以通过show binlog events进⾏查看);
B、slave将master的binary log events拷贝到它的中继⽇志(relay log);
C、slave重做中继⽇志中的事件,将改变反映它⾃⼰的数据。
阿⾥巴巴开源的canal就完美的使⽤这种⽅式,canal 伪装了⼀个Slave 去喝Master进⾏同步。
A、 canal模拟mysql slave的交互协议,伪装⾃⼰为mysql slave,向mysql master发送dump协议
B、 mysql master收到dump请求,开始推送binary log给slave(也就是canal)
C、 canal解析binary log对象(原始为byte流)
另外canal 在设计时,特别设计了 client-server 模式,交互协议使⽤ protobuf 3.0 , client 端可采⽤不同语⾔实现不同的消费逻辑。
D、在使⽤canal时,mysql需要开启binlog,并且binlog-format必须为row,可以在mysql的myf⽂件中增加如下配置退休年龄
log-bin=E:/mysql5.5/bin_log/mysql-bin.log
binlog-format=ROW
server-id=123、
E、部署canal的服务端,配置canal.properties⽂件,然后 启动 bin/startup.sh 或bin/startup.bat
#设置要监听的mysql服务器的地址和端⼝
canal.instance.master.address = 127.0.0.1:3306
#设置⼀个可访问mysql的⽤户名和密码并具有相应的权限,本⽰例⽤户名、密码都为canal
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
#连接的数据库
canal.instance.defaultDatabaseName =test
#订阅实例中所有的数据库和表
canal. = .*\\..*
#连接canal的端⼝
canal.port= 11111
#监听到的数据变更发送的队列
canal.destinations= example
F、客户端开发,在maven中引⼊canal的依赖
<dependency>
<groupId></groupId>
<artifactId>canal.client</artifactId>
<version>1.0.21</version>
</dependency>
代码⽰例:
ample;
import canal.client.CanalConnector;
import canal.client.CanalConnectors;
import canalmon.utils.AddressUtils;
import canal.protocol.CanalEntry;
import canal.protocol.Message;
le.protobuf.InvalidProtocolBufferException;
import java.InetSocketAddress;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class CanalClientExample {
public static void main(String[] args) {
while (true) {
//连接canal
CanalConnector connector = wSingleConnector(new HostIp(), 11111), "example", "canal", "canal"); t();
//订阅监控的数据库.表
connector.subscribe("demo_db.user_tab");
//⼀次取10条
Message msg = WithoutAck(10);
long batchId = Id();
int size = Entries().size();
if (batchId < 0 || size == 0) {
System.out.println("没有消息,休眠5秒");
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
//
CanalEntry.RowChange row = null;
for (CanalEntry.Entry entry : Entries()) {
try {
母亲节送妈妈什么礼物比较好row = CanalEntry.RowChange.StoreValue());
List<CanalEntry.RowData> rowDatasList = RowDatasList();
for (CanalEntry.RowData rowdata : rowDatasList) {
List<CanalEntry.Column> afterColumnsList = AfterColumnsList();
Map<String, Object> dataMap = transforListToMap(afterColumnsList);
if (EventType() == CanalEntry.EventType.INSERT) {
//具体业务操作
System.out.println(dataMap);
} else if (EventType() == CanalEntry.EventType.UPDATE) {
/
/具体业务操作
System.out.println(dataMap);
} else if (EventType() == CanalEntry.EventType.DELETE) {
List<CanalEntry.Column> beforeColumnsList = BeforeColumnsList();
for (CanalEntry.Column column : beforeColumnsList) {
if ("id".Name())) {
//具体业务操作
System.out.println("删除的id:" + Value());
}
}
} else {
System.out.println("其他操作类型不做处理");
}
}
} catch (InvalidProtocolBufferException e) {
e.printStackTrace();
}
}
//确认消息
connector.ack(batchId);
}
}
}
public static Map<String, Object> transforListToMap(List<CanalEntry.Column> afterColumnsList) {
出国签证办理流程Map map = new HashMap();
if (afterColumnsList != null && afterColumnsList.size() > 0) {
for (CanalEntry.Column column : afterColumnsList) {
map.Name(), Value());
}
}
return map;
}
}
2)、基于BulkLoad的数据同步,⽐如从hive同步数据到hbase
我们有两种⽅式可以实现,
A、使⽤spark任务,通过HQl读取数据,然后再通过hbase的Api插⼊到hbase中。
但是这种做法,效率很低,⽽且⼤批量的数据同时插⼊Hbase,对Hbase的性能影响很⼤。
在⼤数据量的情况下,使⽤BulkLoad可以快速导⼊,BulkLoad主要是借⽤了hbase的存储设计思想,因为hbase本质是存储在hdfs上的⼀个⽂件夹,然后底层是以⼀个个的Hfile 存在的。HFile的形式存在。Hfile的路径格式⼀般是这样的:
/hbase/data/default(默认是这个,如果hbase的表没有指定命名空间的话,如果指定了,这个就是命名空间的名字)/<tbl_name>/<region_id>/<cf>/<hfile_id>
B、 BulkLoad实现的原理就是按照HFile格式存储数据到HDFS上,⽣成Hfile可以使⽤hadoop的MapReduce来实现。如果不是hive中的数据,⽐如外部的数据,那么我们可以将外部的数据⽣成⽂件,然后上传到hdfs中,组装RowKey,然后将封装后的数据在回写到HDFS上,以HFile的形式存储到HDFS指定的⽬录中。
当然我们也可以不事先⽣成hfile,可以使⽤spark任务直接从hive中读取数据转换成RDD,然后使⽤HbaseContext的⾃动⽣成Hfile⽂件,部分关键代码如下:
…
//将DataFrame转换bulkload需要的RDD格式
val rddnew = datahiveDF.rdd.map(row => {
val rowKey = As[String](rowKeyField)
fields.map(field => {
val fieldValue = As[String](field)
(Bytes(rowKey), Array((Bytes("info"), Bytes(field), Bytes(fieldValue))))
})
}).flatMap(array => {
(array)
})
…
//使⽤HBaseContext的bulkload⽣成HFile⽂件
hbaseContext.bulkLoad[Put](rddnew.map(record => {
val put = new Put(record._1)
record._2.foreach((putValue) => put.addColumn(putValue._1, putValue._2, putValue._3))
put姓金的名人
}), TableName.valueOf(hBaseTempTable), (t : Put) => putForLoad(t), "/tmp/bulkload")
val conn = ateConnection(hBaseConf)
val hbTableName = TableName.Bytes())
val regionLocator = new HRegionLocator(hbTableName, classOf[ClusterConnection].cast(conn))
val realTable = Table(hbTableName)
// bulk load start
val loader = new LoadIncrementalHFiles(hBaseConf)
val admin = Admin()
loader.doBulkLoad(new Path("/tmp/bulkload"),admin,realTable,regionLocator)
sc.stop()
}
…
def putForLoad(put: Put): Iterator[(KeyFamilyQualifier, Array[Byte])] = {
val ret: mutable.MutableList[(KeyFamilyQualifier, Array[Byte])] = mutable.MutableList()
llection.JavaConversions._
for (cells <- Set().iterator()) {
val family = Key
for (value <- Value) {
val kfq = new KeyFamilyQualifier(CellUtil.cloneRow(value), family, CellUtil.cloneQualifier(value))
ret.+=((kfq, CellUtil.cloneValue(value)))
}
}
ret.iterator
}
}
…
C、pg_bulkload的使⽤
3)、基于sqoop的全量导⼊
Sqoop 是hadoop⽣态中的⼀个⼯具,专门⽤于外部数据导⼊进⼊到hdfs中,外部数据导出时,⽀持很多常见的关系型数据库,也是在⼤数据中常⽤的⼀个数据导出导⼊的交换⼯具。
Sqoop从外部导⼊数据的流程图如下:
Sqoop将hdfs中的数据导出的流程如下:
本质都是⽤了⼤数据的数据分布式处理来快速的导⼊和导出数据。
4)、HBase中建表,然后Hive中建⼀个外部表,这样当Hive中写⼊数据后,HBase中也会同时更新,但是需要注意
A、hbase中的空cell在hive中会补null
B、hive和hbase中不匹配的字段会补null
C、hive的外部表是通过hbase handle 来加载数据,在hbase的数据量⾮常⼤时,性能并不好。hive的外部表在数据量⼤时,不管是通过HQL计算查询还是通过spark sql,外部表的性能都⾮常差,因为在加载数据时,会使⽤hbase的scan等,产⽣全表扫描。
我们可以在hbase的shell 交互模式下,创建⼀张hbse表
create 'bokeyuan','zhangyongqing'
使⽤这个命令,我们可以创建⼀张叫bokeyuan的表,并且⾥⾯有⼀个列族zhangyongqing,hbase创建表时,可以不⽤指定字段,但是需要指定表名以及列族
我们可以使⽤的hbase的put命令插⼊⼀些数据
put 'bokeyuan','001','zhangyongqing:name','robot'
put 'bokeyuan','001','zhangyongqing:age','20'
put 'bokeyuan','002','zhangyongqing:name','spring'
put 'bokeyuan','002','zhangyongqing:age','18'
可以通过hbase的scan 全表扫描的⽅式查看我们插⼊的数据
scan ' bokeyuan'
我们继续创建⼀张hive外部表
create external table bokeyuan (id int, name string, age int)
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES ("lumns.mapping" = ":key,zhangyongqing:name,zhangyongqing:age")
TBLPROPERTIES("hbase.table.name" = " bokeyuan");
外部表创建好了后,我们可以使⽤HQL语句来查询hive中的数据了
select * from bokeyuan ;
OK
1 robot 20
2 spring 18
5)、Debezium+bireme:Debezium for PostgreSQL to Kafka Debezium也是⼀个通过监控数据库的⽇志变化,通过对⾏级⽇志的处理来达到数据同步,⽽且Debezium 可以通过把数据放⼊到kafka,这样就可以通过消费kafka的数据来达到数据同步的⽬的。⽽且还可以给多个地⽅进⾏
消费使⽤。
Debezium是⼀个开源项⽬,为捕获数据更改(change data capture,CDC)提供了⼀个低延迟的流式处理平台。你可以安装并且配置Debezium去监控你的数据库,然后你的应⽤就可以消费对数据库的每⼀个⾏级别(row-level)的更改。只有已提交的更改才是可见的,所以你的应⽤不⽤担⼼事务(transaction)或者更改被回滚(roll back)。Debezium为所有的数据库更改事件提供了⼀个统⼀的模型,所以你的应⽤不⽤担⼼每⼀种数据库管理系统的错综复杂性。另外,由于Debezium⽤持久化的、有副本备份的⽇志来记录数据库数据变化的历史,因此,你的应⽤可以随时停⽌再重启,⽽不会错过它停⽌运⾏时发⽣的事件,保证了所有的事件都能被正确地、完全地处理掉。
本来监控数据库,并且在数据变动的时候获得通知其实⼀直是⼀件很复杂的事情。关系型数据库的触
发器可以做到,但是只对特定的数据库有效,⽽且通常只能更新数据库内的状态(⽆法和外部的进程通信)。⼀些数据库提供了监控数据变动的API或者框架,但是没有⼀个标准,每种数据库的实现⽅式都是不同的,并且需要⼤量特定的知识和理解特定的代码才能运⽤。确保以相同的顺序查看和处理所有更改,同时最⼩化影响数据库仍然⾮常具有挑战性。
Debezium正好提供了模块为你做这些复杂的⼯作。⼀些模块是通⽤的,并且能够适⽤多种数据库管理系统,但在功能和性能⽅⾯仍有⼀些限制。另⼀些模块是为特定的数据库管理系统定制的,所以他们通常可以更多地利⽤数据库系统本⾝的特性来提供更多功能,Debezium提供了对MongoDB,mysql,pg,sqlserver的⽀持。
Debezium是⼀个捕获数据更改(CDC)平台,并且利⽤Kafka和Kafka Connect实现了⾃⼰的持久性、可靠性和容错性。每⼀个部署在Kafka Connect分布式的、可扩展的、容错性的服务中的connector监控⼀个上游数据库服务器,捕获所有的数据库更改,然后记录到⼀个或者多个Kafka topic(通常⼀个数据库表对应⼀个kafka topic)。Kafka确保所有这些数据更改事件都能够多副本并且总体上有序(Kafka只能保证⼀个topic的单个分区内有序),这样,更多的客户端可以独⽴消费同样的数据更改事件⽽对上游数据库系统造成的影响降到很⼩(如果N个应⽤都直接去监控数据库更改,对数据库的压⼒为N,⽽⽤debezium汇报数据库更改事件到kafka,所有的应⽤都去消费kafka中的消息,可以把对数据库的压⼒降到1)。另外,客户端可以随时停⽌消费,然后重启,从上次停⽌消费的地⽅接着消费。每个客户端
可以⾃⾏决定他们是否需要exactly-once或者at-least-once消息交付语义保证,并且所有的数据库或者表的更改事件是按照上游数据库发⽣的顺序被交付的。
对于不需要或者不想要这种容错级别、性能、可扩展性、可靠性的应⽤,他们可以使⽤内嵌的Debezium connector引擎来直接在应⽤内部运⾏connector。这种应⽤仍需要消费数据库更改事件,但更希望connector直接传递给它,⽽不是持久化到Kafka⾥。
另外Maxwell也是可以实现MySQL到Kafka的消息中间件,消息格式采⽤Json:
6)、datax
datax 是阿⾥开源的etl ⼯具,实现包括 MySQL、Oracle、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、DRDS 等各种异构数据源之间⾼效的数据同步功能,采⽤java+python进⾏开发,核⼼是java语⾔实现。
A、设计架构:
数据交换通过DataX进⾏中转,任何数据源只要和DataX连接上即可以和已实现的任意数据源同步
B、框架
核⼼模块介绍:
1. DataX完成单个数据同步的作业,我们称之为Job,DataX接受到⼀个Job之后,将启动⼀个进程来完成整个作业同步过程。DataX Job模块是单个作业的中枢管理节点,承
担了数据清理、⼦任务切分(将单⼀作业计算转化为多个⼦Task)、TaskGroup管理等功能。
蚯蚓吃什么2. DataXJob启动后,会根据不同的源端切分策略,将Job切分成多个⼩的Task(⼦任务),以便于并发执⾏。Task便是DataX作业的最⼩单元,每⼀个Task都会负责⼀部分数
据的同步⼯作。
3. 切分多个Task之后,DataX Job会调⽤Scheduler模块,根据配置的并发数据量,将拆分成的Task重新组合,组装成TaskGroup(任务组)。每⼀个TaskGroup负责以⼀定的
并发运⾏完毕分配好的所有Task,默认单个任务组的并发数量为5。
4. 每⼀个Task都由TaskGroup负责启动,Task启动后,会固定启动Reader—>Channel—>Writer的线程来完成任务同步⼯作。
5.
全面预算
DataX作业运⾏起来之后, Job监控并等待多个TaskGroup模块任务完成,等待所有TaskGroup任务完成后Job成功退出。否则,异常退出,进程退出值⾮0
DataX调度流程:
举例来说,⽤户提交了⼀个DataX作业,并且配置了20个并发,⽬的是将⼀个100张分表的mysql数据同步到odps⾥⾯。 DataX的调度决策思路是:
1. DataXJob根据分库分表切分成了100个Task。
2. 根据20个并发,DataX计算共需要分配4个TaskGroup。
3. 4个TaskGroup平分切分好的100个Task,每⼀个TaskGroup负责以5个并发共计运⾏25个Task。
优势:
每种插件都有⾃⼰的数据转换策略,放置数据失真;
提供作业全链路的流量以及数据量运⾏时监控,包括作业本⾝状态、数据流量、数据速度、执⾏进度等。
由于各种原因导致传输报错的脏数据,DataX可以实现精确的过滤、识别、采集、展⽰,为⽤户提过多种脏数据处理模式;
精确的速度控制
健壮的容错机制,包括线程内部重试、线程级别重试;
从插件视⾓看框架
Job:是DataX⽤来描述从⼀个源头到⽬的的同步作业,是DataX数据同步的最⼩业务单元;
Task:为最⼤化⽽把Job拆分得到最⼩的执⾏单元,进⾏并发执⾏;
TaskGroup:⼀组Task集合,在同⼀个TaskGroupContainer执⾏下的Task集合称为TaskGroup;
JobContainer:Job执⾏器,负责Job全局拆分、调度、前置语句和后置语句等⼯作的⼯作单元。类似Yarn中的JobTracker;
TaskGroupContainer:TaskGroup执⾏器,负责执⾏⼀组Task的⼯作单元,类似Yarn中的TAskTacker。
总之,Job拆分为Task,分别在框架提供的容器中执⾏,插件只需要实现Job和Task两部分逻辑。
物理执⾏有三种运⾏模式:
Standalone:单进程运⾏,没有外部依赖;
Local:单进程运⾏,统计信息,错误信息汇报到集中存储;
Distrubuted:分布式多线程运⾏,依赖DataX Service服务;
总体来说,当JobContainer和TaskGroupContainer运⾏在同⼀个进程内的时候就是单机模式,在不同进程执⾏就是分布式模式。
数据源⽀持情况:
类型数据源Reader(读)Writer(写)⽂档
RDBMS 关系型数据库MySQL√√、
Oracle √ √ 、
SQLServer√√、
PostgreSQL√√、
DRDS√√、
通⽤RDBMS(⽀持所有关系型数据库)√√、
阿⾥云数仓数据存储ODPS√√、
ADS√
OSS√√、
OCS√√、
NoSQL数据存储OTS√√、
Hbase0.94√√、
Hbase1.1√√、
Phoenix4.x√√、
Phoenix5.x√√、
MongoDB√√、
Hive√√、
⽆结构化数据存储TxtFile√√、
FTP√√、
HDFS√√、
Elasticsearch√
时间序列数据库OpenTSDB√
TSDB√
7)、OGG
OGG ⼀般主要⽤于Oracle数据库。即Oracle GoldenGate是Oracle的同步⼯具,可以实现两个Oracle数据库之间的数据的同步,也可以实现Oracle数据同步到Kafka,相关的配置操作可以参考如下:
8)、databus
Databus是⼀个实时的、可靠的、⽀持事务的、保持⼀致性的数据变更抓取系统。 2011年在LinkedIn正式进⼊⽣产系统,2013年开源。
Databus通过挖掘数据库⽇志的⽅式,将数据库变更实时、可靠的从数据库拉取出来,业务可以通过定制化client实时获取变更。
Databus的传输层端到端延迟是微秒级的,每台服务器每秒可以处理数千次数据吞吐变更事件,同时还⽀持⽆限回溯能⼒和丰富的变更订阅功能。
databus架构设计:
来源独⽴:Databus⽀持多种数据来源的变更抓取,包括Oracle和MySQL。
可扩展、⾼度可⽤:Databus能扩展到⽀持数千消费者和事务数据来源,同时保持⾼度可⽤性。
事务按序提交:Databus能保持来源数据库中的事务完整性,并按照事务分组和来源的提交顺寻交付变更事件。
低延迟、⽀持多种订阅机制:数据源变更完成后,Databus能在微秒级内将事务提交给消费者。同时,消费者使⽤Databus中的服务器端过滤功能,可以只获取⾃⼰需要的特定数据。
⽆限回溯:这是Databus最具创新性的组件之⼀,对消费者⽀持⽆限回溯能⼒。当消费者需要产⽣数据的完整拷贝时(⽐如新的搜索索引),它不会对数据库产⽣任何额外
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论