kafka-jdbc-connector-sink实现kafka中的数据同步到mysql
kafka-jdbc-connector-sink实现kafka中的数据同步到mysql
接下来笔者要描述的是mysql的数据通过kafka,然后在实时进⼊其他mysql数据库的⽅案
有同学可能会问到为什么这么⿇烦,⽽不直接使⽤主从复制的⽅案来解决mysql的数据同步呢?原因是通过kafka connector可以做简单的数据过滤。
由于kakfa connctor只能做简单的数据过滤,之后可能会使⽤mysql + kafka + flink的形式实现数据同步
kafka只⽤dbz connector获取mysql中的数据,具体操作细节不是本⽂重点,在此不在赘述,后续会补上关于dbz操作mysql数据库的⽂章
kafka中数据同步到mysql
需求
1. 把kafka中dbz产⽣的数据同步到mysql中并修改表的名字
2. 将某⼀topic中的数据按着不同的字段组合以及定义不同字段为主键将数据同步到不同的表中青鸟 火影
* 暂时不考虑⽀持⾃动创建表和删除操作,因为⼀般⽣产环境中只有软删除
实现
需求1:
dbz中的数据是before-after的格式,需要将after中的数据提取出来同步到mysql中。
通过正则表达式读取topic-name, 将每个topic的名字添加上前缀作为mysql的表名字。
需求2:
topic中的value值不可以为null,对于过滤的字段必须存在topic的struct中。
下雨天的心情说说⼯具
创造营2020最新排名
kafka-jdbc-connector-sink.jar
配置
通过正则读取dbz的topic,替换表名后写⼊mysql(启动⼀个connector操作多个topic,⼀个topci对应⼀张表)
"connection.password": "password",
"tasks.max": "1",
"": "dbz_alpha_testmodity.(.*)",
"ate": false, # 是否⾃动创建表
"auto.evolve": false, # 是否⽀持alert语句
"de": "upsert", # 导⼊数据的模式(insert upsert update)
"batch.size": 3000, # 批量操作数据
发动机大修"abled": false, # 是否⽀持delete操作, 如果⽀持delete操作pk.mode必须为record_key    "pk.mode": "record_key",
"transforms": "dropPrefix, AddPrefix, ExtractField",
# 提取某个字段作为struct传输到下游
"pe": "org.t.transforms.ExtractField$Value",    "transforms.ExtractField.field": "after",
# 过滤掉topic名字的前缀
"pe":"org.t.transforms.RegexRouter",
"":"dbz_alpha_testmodity.(.*)",
"placement":"$1",
# 为topic名字添加前缀
"pe": "org.t.transforms.RegexRouter",
"" : ".*",
"placement" : "new-pre_$0"
他笑了作文500字}' localhost:8083/connectors/jdbc-sink-mysql-name/config
⼀个topic根据不同的列字段对应多个表(⼀个表对应⼀个connector)
"connection.password": "password",
"tasks.max": "1",
"topics": "dbz_alpha_testmoditymodity_txgjdbc",
"table.name.format": "table-name", # 指定表的名字
"ate": false,
"auto.evolve": false,
"de": "upsert",
"batch.size": 3000,
"abled": false,
"pk.mode": "record_key",
"transforms": "ExtractField, ValueToKey, ReplaceField, RenameField",
"pe": "org.t.transforms.ExtractField$Value",
"transforms.ExtractField.field": "after",
# 将value中的某些field作为key
"pe":"org.t.transforms.ValueToKey",
"transforms.ValueToKey.fields":"age",
# 过滤出value中需要的字段(whitelist,blacklist ⿊⽩名单)
"pe": "org.t.transforms.ReplaceField$Value",
"transforms.ReplaceField.whitelist": "name",
# 修改key中某些字段的名字
"pe": "org.t.transforms.ReplaceField$Key",6月份节日
"ames": "age:id,
# 修改value中某些字段的名字
"pe": "org.t.transforms.ReplaceField$Value",
"ames": "name:names"
}' localhost:8083/connectors/jdbc-sink-mysql_name/config
问题(注意)
使⽤"transforms": "tombstoneHandlerExample报不到t.transforms.TombstoneHandler的jar包使⽤"transforms":"dropPrefix"之后⾃动创建表的功能失效了
原始数据的after字段下的${topic}-Values在通过"connector.class":
"org.t.file.FileStreamSinkConnector"输出到控制后不见了
Struct{
before=Struct{id=240,create_time=Sun Oct 20 23: 11: 50 CST 2019,update_time=Sun Oct 20 23: 11: 50 CST 2019,is_deleted=0,origin_effect_id=0,cn_n ame=紧致,platform=3,effect_additional_fields=
},
after=Struct{id=240,create_time=Sun Oct 20 23: 11: 50 CST 2019,update_time=Sun Oct 20 23: 11: 50 CST 2019,is_deleted=0,origin_effect_id=45,cn_n ame=紧致,platform=3,effect_additional_fields=
},
source=Struct{version=0.10.0.CR1,connector=mysql,name=dbz_alpha_test,ts_ms=1574145235000,snapshot=false,db=commodity,table=commodity_eff ect,server_id=106507,gtid=d4832372-720c-11e9-92fa-6c0b84d610b3: 8022222,file=mysql-bin.000071,pos=107643337,row=0,thread=65745185
},op=u,ts_ms=1574145235978
}
当插⼊数据的时候,kafka中的key值会覆盖value中对应字段的值(“pk.mode”: "record_key"时)
transforms字段如果设置多个,那么他们之间的转换没有关系的即上游转换的结果不会传到下游
"transforms": "ExtractField", # [1]
"pe": "org.t.transforms.ExtractField$Value",    "transforms.ExtractField.field": "after",
"transforms": "ValueToKey", # [2]
"pe":"org.t.transforms.ValueToKey",
"transforms.ValueToKey.fields":"age",

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。