DataX数据全量,增量同步方案
DataX数据全量,增量同步⽅案
关于DataX 增量更新实现
注:
增量更新总体思路:从⽬标数据库读取⼀个最⼤值的记录,可以是DataTime 或者 RowVersion 类型,然后根据这个最⼤值对源数据库要同步的表进⾏过滤,然后再进⾏同步即可。
由于DataX ⽀持多种数据库的读写,⼀种相对简单并且可靠的思路就是:
1. 从csv ⽂件读取⽬标数据库上次抽取数据的⼀个最⼤值;
2. 动态修改json配置⽂件(增加过滤条件);
3. 执⾏修改后的配置⽂件,进⾏增量同步;
4. 同步成功,将系统当前时间写⼊csv⽂件;
接下来就是shell 脚本来⼀步⼀步实现增量更新
增量更新shell 实现
图⽰如下:
我们的同步环境是从oracle 到 hive,由于dataX ⽆法从⽬的数据库(即hive)中读取值,因此改为从csv⽂件读取,
同样因为⽆法读取hive数据库内最⼤值,因此最⼤值,采⽤上次数据同步时间。我们要做的只是每次数据同步成功后把系统时间写⼊到对应的csv⽂件中(每个表对应⼀个csv⽂件)。
1. 数据同步json 配置⽂件
从oracle 同步到hive ,部分配置如下
例: oracle2hive_ods_s_jy_AB01.json
{
"job": {
"content": [{
买房须知"reader": {
"name": "oraclereader",
"parameter": {
"column": ["AAB001", "AAB004", "AAB041", "AAB043", "AAB013", "to_char(sysdate,'yyyymmddhh24miss') as D_UPDATE"],    "where":"1=1",
"connection": [{
"jdbcUrl": ["jdbc:oracle:thin:@【源数据库ip】:【port】/pdb1"],
"table": ["NBJYJZ_NB_DATA_INF.AB01"]
}],
"username": "***",
"password": "***"
}
},
"writer": {
"name": "hdfswriter",
教师节贺卡怎么做?"parameter": {
"column": [{
"name": "AAB001",
"type": "STRING"
}, {
"name": "AAB004",
"type": "STRING"
},
...
],
"defaultFS": "hdfs://【⽬的ip】:【port】",
"fieldDelimiter": "\t",
"fileName": "ods_s.ods_jy_ab01",
"fileType": "text",
"path": "/user/hive/warehouse/ods_s.db/ods_jy_nb_ab01",
搞笑qq名"writeMode": "append"
}
}
}],
"setting": {
"speed": {
"byte": 10485760,
"channel": "5"
}
}
}
}
数据同步的脚本有了,读取最⼤值的脚本可以写在shell ⾥。下⾯配置dataX增量同步的shell脚本。
2. 增量同步的shell脚本配置
#!/bin/bash
#datax 数据抽取
#create time:2018-11-27 10:00:00
### every exit != 0 fails the script
# 错误检查,有错误的时候退出
set -e
# 我们的dataX脚本⾥需要的参数,不需要的可以不加
systemDate=`date +%Y-%m-%d,%H:%M:%S`
# 获取当前时间,这个是同步后要写到csv⽂件的变量
current_time=`date +"%Y-%m-%d %H:%M:%S"`
echo ${current_time}
shell_array[0]= oracle2hive_ods_s_jy_AB01.json
# shell_array[1]= oracle2hive_ods_s_jy_AB02.json
如何自动关机
# shell_array[2]= oracle2hive_ods_s_jy_AB07.json
# ...
# 遍历⽂件名
for i in ${#shell_array[*]}
张敬轩 出柜do
# 循环执⾏命令
# ${shell_array[$i]} ⽂件名
# 通过⽂件名称截取表名,我们的脚本命名末尾标识的是表名
table_name=${${shell_array[$i]}:0-9:4}
# 1. 到 csv⽂本⽂件,并将内容读取到⼀个变量中
MAX_TIME=`cat /opt/datax/job/jy/result/${table_name}.csv`
# 如果最⼤时间不为 null 的话,修改全部同步的配置,进⾏增量更新;
# 如果最⼤时间为null ,进⾏全量更新;
if [ "$MAX_TIME" != "null" ]; then
# 设置增量更新过滤条件
WHERE="AAE036 > '$MAX_TIME'"
# 2.改写json配置⽂件
sed "s/1=1/$WHERE/g" /opt/datax/job/jy/ql/${shell_array[$i]} > /opt/datax/job/jy/zl/${${shell_array[$i]}/.json/_zl.json}
# 3.增量更新
python /opt/datax/bin/datax.py -p "-DsystemDate='$systemDate'" -j "-Xms4g -Xmx4g" /opt/datax/job/jy/zl/${${shell_array[$i]}/.json/_zl.json}        else
# 全量更新
python /opt/datax/bin/datax.py -p "-DsystemDate='$systemDate'" -j "-Xms4g -Xmx4g"  /opt/datax/job/jy/ql/${shell_array[$i]}
fi
if [ $? -ne 0 ]; then
#命令执⾏失败,不做任何操作
else
# 执⾏成功,将最⼤⽇期写⼊csv ⽂件,覆盖写⼊
table_name=${${shell_array[$i]}:0-9:4}
echo  ${current_time} > /opt/datax/job/jy/result/${table_name}.csv
fi
# 删除临时⽂件,增量⽂件
rm -rf /opt/datax/job/jy/zl/${${shell_array[$i]}/.json/_zl.json}
done
补充:上述脚本中⽤到shell关于字符串的操作(⼤神请略过)
1.shell字符串替换
var=oracle2hive_ods_s_jy_AB01.json
# echo ${var/查字符串/替换字符串}
echo ${$var/.json/_zl.json}
结果:oracle2hive_ods_s_jy_AB01_zl.json
2.shell 字符串截取
var=oracle2hive_ods_s_jy_AB01.json
国际品牌童装echo ${$var:0-9:4}
结果: AB01
>>>#
sed "s/查字符串/替换字符串/g" 源⽂件 > ⽬标⽂件,转载请注明出处。

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。

发表评论