DebeziumSQLserver2016同步到kafka
DebeziumSQLserver2016同步到kafka
写这篇⽂章的主要⽬的是因为在⾃⼰使⽤Debezium的时候遇见的问题,⽹上的案例⼜⽐较少,⾃⼰决定写⼀篇⽂章,希望可以帮助到其他⼈,因为是第⼀次写博客,有什么不⾜的地⽅希望见谅或提出宝贵意见。
⼀、需求和软件的版本
(1)需求:监控sellInfo表的部分列,当这些列新增或更新或删除的时候,kafka⽣成⼀条变更信息到消费端,业务代码拿到这些信息来更新Elasticsearch
(2)软件版本:sqlServer版本,Microsoft SQL Server 2016 (SP1) ;
kafka版本,kafka2.12-2.5.0;
Debezium,JAR版本,debezium-debezium-connector-sqlserver-1.2.1
⼆、开启CDC
(1)为数据库开启CDC(需要sysadmin权限)
EXEC sys.sp_cdc_enable_db
(2)为数据库表开启CDC
EXEC sys.sp_cdc_enable_table
@source_schema = 'dbo',      --数据库名
@source_name = 'SellInfo',        --数据库表名
@capture_instance = default,    --默认实例
@role_name = NULL,        --⾓⾊权限,null表⽰不设置权限
@captured_column_list = 'ArticleId,HtmlID,Title'  --为哪些列开启监控
sellInfo开启cdc成功的标志,数据库出现cdc库,如图所⽰,这是发⽣update操作时,cdc会做记录。
⼆、安装kafka和开启kafka-connect
(1)下载kafka,并解压,重命名
当前路径  /root
tar zxvf kafka_2.12-2. -C ./
mv kafka_2.12-2.5.0 kafka
(2)修改kafka/config/server.properties
listeners = PLAINTEXT://192.168.16.2:9092
advertised.listeners=PLAINTEXT://192.168.16.2:9092
(3)开启zookeeper,开启kafka
当前路径  /root/kafka
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
bin/kafka-server-start.sh -daemon config/server.properties
(4)下载Debezium JAR包
JAR下载地址,下载jar,并解压到 /kafka/kafka_connect_plugins(⾃⼰创建⼀个⽂件夹)
(5)修改kafka/config/connect-distribute.properties
bootstrap.servers=192.168.16.2:9092
plugin.path= /root/kafka/kafka_connect_plugins
(6)启动kafka-connect
玻璃杯什么牌子好当前路径 /root/kafka
bin/connect-distributed.sh -daemon config/connect-distributed.properties
开启成功的标志,postman可以使⽤访问端⼝(默认的端⼝号,可以在connect-distributed.properties 更改)
(7)编辑connect的属性
当前路径 /root/kafka
创建⼀个json⽂件,vi sellInfo.json
{
"name": "sqlserver-cdc-dbo-sellInfo",
"config": {
"connector.class": "tor.sqlserver.SqlServerConnector",
"database.hostname": "",    ---数据库ip
"database.port": "1433",                        ----端⼝号
wmv转mp4
"database.user": "xxxx",                        ----数据库⽤户名
"database.password": "xxxx",                ----数据库密码
"database.dbname": "idea_test",            -----数据库名
"database.server.name": "fullfillment",
"table.whitelist": "dbo.SellInfo",              -----数据库表⽩名单
"de":"schema_only",          ----快照⽅式,inital--表数据全量更新,schema_only--表数据开启cdc后的更新        "column.blacklist":","  ----不需要监控的表的字段(下⾯有我⽤的时候遇见的问题)
"database.history.kafka.bootstrap.servers": "192.168.16.2:9092",
"database.pic": "dbhistory.fullfillment",
"able":"false",
"verter":"org.t.json.JsonConverter",
"event.processing.de":"warn"
防晒隔离霜
}
}
更多数据设置可以参考Debezium的官⽅⽂档:
⽤于SQL Server的Debezium连接器(⾥⾯还有⼀些其他的连接器,mysql等)
(8)执⾏连接器
开启成功的标志:
更多postman关于连接器的操作可以参考这篇⽂章,帮助我很多
sqlserver增量订阅&消费实时同步kafka,最新解决⽅案,看完不会你打我!
三、验证成果
1、update [dbo].SellInfo set Audit = 2 where ArticleId = 1 (可以是新增,可以是删除)
2、查看当前的有哪些topic
此时路径 /root/kafka
bin/kafka-topics.sh --zookeeper localhost --list
__consumer_offsets  ----默认创建
connect-configs        ----默认创建
connect-offsets          ----默认创建
connect-status          ----默认创建
dbhistory.fullfillment  ----记录表的结构信息印度暴雨致近200人遇难
fullfillment                  ----记录表的结构信息
fullfillment.dbo.SellInfo  -----SellInfo表改变的记录
查看fullfillment.dbo.SellInfo的信息:
当前路径  /root/kafka
bin/kafka-console-consumer.sh --bootstrap-server 192.168.16.2:9092 --topic fullfillment.dbo.SellInfo --from-beginning
四、总结
(1)第⼀次搞这种冷门的东西,刚开始确实有点头疼,后来明⽩,⼀定要学会参考别⼈的东西,查看官⽅⽂档,制定流程分阶段进⾏等,这也是我为什么要写这篇⽂章的原因
(2)可能出现的问题,CDC的开启,如果在执⾏命令的时候出现错误、按照给出的错误信息去baidu;kafka的安装与运⾏ --daemon是可以让程序离开shell界⾯后台运⾏,但是不会显⽰开启时的⽇
志;kafka-connect的配置⽂件sellInfo.json中的'column.blacklist'字段出现问题,⽐如⼀张表a,id,name,age,phone,我CDC监控id,age ,’column.blacklist‘剩下的字段,运⾏connect同步数据时会出错,我看了源码感觉应该是⼀个bug,当然很有可能是有些配置不知道怎么使⽤导致的,最后我通过分析源码将表的结构变成id,age,name,phone避免了这种异常,错误代码我贴出来,希望有⼈可以解答这个问题。
yy礼包08-03 12:41:56,160] ERROR Error requesting a row value, row: 3, requested index: 15 at position 3
(lational.TableSchemaBuilder:221) [2020-08-03 12:41:56,160] ERROR Producer failure (io.debezium.pipeline.ErrorHandler:31) org.t.errors.ConnectException: Error while processing event at offset {transaction_id=null, event_serial_no=2,
commit_lsn=000003cb:000ec557:0003, change_lsn=000003cb:000ec557:0002} at
io.debezium.pipeline.EventDispatcher.dispatchDataChangeEvent(EventDispatcher.java:220) at
tor.sqlserver.SqlServerStreamingChangeEventSource.lambda$execute$1(SqlServerStre amingChangeEventSource.java:247) at
io.debezium.jdbc.JdbcConnection.prepareQuery(JdbcConnection.java:524) at
tor.ChangesForTables(SqlServerConnection.java:190 ) at
tor.ute(SqlServerStreamingChan geEventSource.java:162) at
io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0(ChangeEventSourceCoordinator.java: 108) at urrent.Executors$RunnableAdapter.call(Executors.java:511) at
urrent.FutureTask.run(FutureTask.java:266) at
urrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at
urrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at
java.lang.Thread.run(Thread.java:745) Caused by: org.t.errors.ConnectException: Data row is smaller than a column index, internal schema representation is probably out of sync with real
database schema at
lational.TableSchemaBuilder.validateIncomingRowToInternalMetadata(TableSchemaBuilder.jav a:222) at
lational.TableSchemaBuilder.lambda$createValueGenerator$5(TableSchemaBuilder.java:251) at lational.TableSchema.valueFromColumnData(TableSchema.java:143) at
itUpdateRecord(RelationalChangeRecordEmitter.ja va:97) at
itChangeRecords(RelationalChangeRecordEmitter.热爱祖国演讲稿
java:51) at io.debezium.pipeline.EventDispatcher.dispatchDataChangeEvent(EventDispatcher.java:193) (10)
more

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。