使⽤DataX增量同步数据(转)
关于 DataX
DataX 是阿⾥巴巴集团内被⼴泛使⽤的离线数据同步⼯具/平台,实现包括 MySQL、Oracle、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、DRDS 等各种异构数据源之间⾼效的数据同步功能。
如果想进⼀步了解 DataX ,请进⼀步查看 。
关于增量更新
DataX ⽀持多种数据库的读写, json 格式配置⽂件很容易编写, 同步性能很好, 通常可以达到每秒钟 1 万条记录或者更⾼, 可以说是相当优秀的产品, 但是缺乏对增量更新的内置⽀持。
其实增量更新⾮常简单, 只要从⽬标数据库读取⼀个最⼤值的记录, 可能是 DateTime 或者 RowVersion 类型, 然后根据这个最⼤值对源数据库要同步的表进⾏过滤, 然后再进⾏同步即可。
由于 DataX ⽀持多种数据库的读写, ⼀种相对简单并且可靠的思路就是:
1. 利⽤ DataX 的 DataReader 去⽬标数据库读取⼀个最⼤值;
2. 将这个最⼤值⽤ TextFileWriter 写⼊到⼀个 CSV ⽂件;
3. ⽤ Shell 脚本来读取 CSV ⽂件, 并动态修改全部同步的配置⽂件;
4. 执⾏修改后的配置⽂件, 进⾏增量同步。
接下来就⽤ shell 脚本来⼀步⼀步实现增量更新。
增量更新的 shell 实现
我的同步环境是从 SQLServer 同步到 PostgreSQL , 部分配置如下:
"reader": {
"name": "sqlserverreader",
"parameter": {
"username": "...",
"password": "...",
"connection": [
{
"jdbcUrl": [
"jdbc:sqlserver://[source_server];database=[source_db]"
],
"querySql": [
"SELECT DataTime, PointID, DataValue FROM dbo.Minutedata WHERE 1=1"
]
}
]
}五行属木的字大全
},
"writer": {
"name": "postgresqlwriter",
"parameter": {
"username": "...",
"password": "...",
"connection": [
{
"jdbcUrl": "jdbc:postgresql://[target_server]:5432/[target_db]",
"table": [
"public.minute_data"
]
}
],
"column": [
"data_time",
"point_id",
什么牌子的月饼最好吃"data_value"
],
"preSql": [
"TRUNCATE TABLE @table"
]
}
}
}
],
"setting": { }
}
}
更多的配置可以参考 以及 。
要实现增量更新, ⾸先要 PostgresqlReader 从⽬标数据库读取最⼤⽇期, 并⽤ TextFileWriter 写⼊到⼀个 csv ⽂件, 这⼀步我的配置如下所⽰:
"reader": {
"name": "postgresqlreader",
"parameter": {
"connection": [
{
"jdbcUrl": [
"jdbc:postgresql://[target_server]:5432/[target_db]"
],
"querySql": [
"SELECT max(data_time) FROM public.minute_data"
周记500]
}
],
"password": "...",
"username": "..."
}
},
"writer": {
"name": "txtfilewriter",
"parameter": {
"dateFormat": "yyyy-MM-dd HH:mm:ss",
"fileName": "minute_data_max_time_result",
"fileFormat": "csv",
"path": "/scripts/",
"writeMode": "truncate"
}
}火炬之光2 联机补丁
}
],
"setting": { }
}
}
更多的配置可以看考 以及
有了这两个配置⽂件, 现在可以编写增量同步的 shell ⽂件, 内容如下:
#!/bin/bash
### every exit != 0 fails the script
set -e
# 获取⽬标数据库最⼤数据时间,并写⼊⼀个 csv ⽂件
docker run --interactive --tty --rm --network compose --volume $(pwd):/scripts \
beginor/datax:3.0 \
/scripts/minute_data_max_time.json
if [ $? -ne 0 ]; then
echo"minute_data_sync.sh error, can not get max_time from target db!"
exit 1
fi
# 到 DataX 写⼊的⽂本⽂件,并将内容读取到⼀个变量中
RESULT_FILE=`ls minute_data_max_time_result_*`
MAX_TIME=`cat $RESULT_FILE`
# 如果最⼤时间不为null的话,修改全部同步的配置,进⾏增量更新;
if [ "$MAX_TIME" != "null" ]; then
教师成长计划# 设置增量更新过滤条件
WHERE="DataTime > '$MAX_TIME'"
sed"s/1=1/$WHERE/g" minute_data.json > minute_data_tmp.json
# 将第45⾏的 truncate 语句删除;
sed'45d' minute_data_tmp.json > minute_data_inc.json
梦见很多坟墓# 增量更新
docker run --interactive --tty --rm --network compose --volume $(pwd):/scripts \
beginor/datax:3.0 \
/scripts/minute_data_inc.json
# 删除临时⽂件
rm ./minute_data_tmp.json ./minute_data_inc.json
else
# 全部更新
docker run --interactive --tty --rm --network compose --volume $(pwd):/scripts \
beginor/datax:3.0 \
/scripts/minute_data.json
fi
在上⾯的 shell ⽂件中, 使⽤我制作的 DataX docker 镜像, 使⽤命令 docker pull beginor/datax:3.0 即可获取该镜像, 当也可以修改这个 shell 脚本直接使⽤ datax 命令来执⾏。
为什么⽤ shell 来实现
因为 DataX ⽀持多种数据库的读写, 充分利⽤ DataX 读取各种数据库的能⼒, 减少了很多开发⼯作, 毕竟 DataX 的可靠性是很好的。
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论