使⽤CanalClientAdapter实现Mysql的DDL、DML同步到PostgreSQL ⽬录
1. 说明
各组件说明如下表:
组件版本服务器安装教程作⽤
canal 1.1.5canal1, canal2, canal3
zookeeper 3.6.3canal1, canal2, canal3
mysql8.0.25canal1, canal2源数据库
mysql8.0.25canal3ClientAdapter远程配置数据库postgresql13canal3⽬标数据库postgresql创建数据库db1和表db1.public.tb1_1
2. canal.deploy配置
1. canal.properties配置
papi酱 综艺canal.serverMode = tcp
2. instance配置
example_rdb_db1/instance.properties设置只同步db1数据库的数据,内容如下
>>>>>>>>>####
## mysql serverId , v1.0.26+ will autoGen
# canal2为1262,canal3为1263
sql.slaveId=1261
# enable gtid use true/false
idon=false
# position info
canal.instance.master.address=canal2:3306
canal.instance.master.journal.name=mysql-bin.000026
canal.instance.master.position=4
canal.instance.master.timestamp=
canal.id=
# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=
# table meta tsdb info
canal.able=true
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal
canal.instance.standby.address = canal1:3306
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.id=
# username/password
canal.instance.dbUsername=canal
canal.instance.dbPassword=Canal_123
tionCharset = UTF-8
# enable druid Decrypt database password
ableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6L eHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==
# table regex
canal.=db1\\.tb\\d_\\d
# table black regex
canal.instance.=mysql\\.slave_.*
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch
# mq config
pic=example
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6
>>>>>>>>>####
# 需要开启⼼跳检查
canal.able = true
# ⼼跳检查sql
canal.instance.detecting.sql = select 1
# ⼼跳检查频率
canal.instance.detecting.interval.time = 3
六一快乐的祝福语# ⼼跳检查失败次数阀值,超过该阀值后会触发mysql链接切换,⽐如切换到standby机器上继续消费binlog
canal.threshold = 3
# ⼼跳检查超过失败次数阀值后,是否开启master/standby的切换
canal.instance.detecting.heartbeatHaEnable = true
3. 获取ClientAdapter
从github下载的canal.adapter-1.1.,存在postgresql兼容性问题,改⽤源码编译,编译过程请,之后再上传,解压
[root@canal1 ~]#
[root@canal1 ~]# mkdir canal.adapter-1.1.5
[root@canal1 ~]#
[root@canal1 ~]# tar -zxvf canal.adapter-1.1. -C canal.adapter-1.1.5
[root@canal1 ~]#
4. ClientAdapter配置
春节过路费免费时间公布20234.1 l配置摆摊卖什么好
server:
port: 8081
logging:
level:
canal.client.adapter.rdb: DEBUG
spring:
jackson:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
default-property-inclusion: non_null
mode: tcp #tcp kafka rocketMQ rabbitMQ
flatMessage: true
zookeeperHosts: canal1:2181,canal2:2181,canal3:2181
batchSize: 500
syncBatchSize: 1000
retries: -1 # -1表⽰⽆限重试
timeout:
accessKey:
secretKey:
consumerProperties:
# canal tcp consumer
# p.server.host: 127.0.0.1:11111
# kafka consumer
kafka.bootstrap.servers: 127.0.0.1:9092
kafka.automit.interval.ms: 1000
kafka.set: latest
kafka.session.timeout.ms: 30000
kafka.isolation.level: read_committed
kafka.ds: 1000
# rocketMQ consumer
rocketmq.namespace:
rocketmq.namesrv.addr: 127.0.0.1:9876
rocketmq.batch.size: 1000
pic:
rocketmq.access.channel:
rocketmq.subscribe.filter:
# rabbitMQ consumer
rabbitmq.host:
rabbitmq.virtual.host:
rabbitmq.username:
rabbitmq.password:
srcDataSources:
defaultDS:
url: jdbc:mysql://canal1:3306/default_test?useUnicode=true # default_test数据库⽆数据 username: root
password: Root_123
canalAdapters:
- instance: example_rdb_db1 # canal instance Name or mq topic name
怎么申请淘宝账号groups: # ⼀个instance可以被多个group并⾏消费
- groupId: g1
outerAdapters:
- name: logger
# - name: rdb
# key: mysql1
# properties:
# jdbc.driverClassName: sql.jdbc.Driver
# jdbc.url: jdbc:mysql://127.0.0.1:3306/mytest2?useUnicode=true
# jdbc.username: root
# jdbc.password: 121212
# - name: rdb
# key: oracle1
# key: oracle1
# properties:
# jdbc.driverClassName: oracle.jdbc.OracleDriver
# jdbc.url: jdbc:oracle:thin:@localhost:49161:XE
# jdbc.username: mytest
# jdbc.password: m121212
- name: rdb
key: postgres1 # 唯⼀值, 与表映射配置的outerAdapterKey对应
properties:
jdbc.driverClassName: org.postgresql.Driver
jdbc.url: jdbc:postgresql://canal3:5432/db1
jdbc.username: postgres
jdbc.password: postgres123
threads: 2
commitSize: 3000
# - name: hbase
# properties:
# keeper.quorum: 127.0.0.1
# keeper.property.clientPort: 2181
# de.parent: /hbase
# - name: es
# hosts: 127.0.0.1:9300 # 127.0.0.1:9200 for rest mode
# properties:
# mode: transport # or rest
# # security.auth: test:123456 # only used for rest mode
# cluster.name: elasticsearch
# - name: kudu
# key: kudu
# properties:
# kudu.master.address: 127.0.0.1 # ',' split multi address
4.2 conf/rdb配置
adapter会⾃动加载conf/rdb下的所有.yml结尾的表映射配置⽂件1. l
#dataSourceKey: defaultDS
#destination: example
#groupId: g1
#outerAdapterKey: mysql1
#concurrent: true垃圾分类手抄报内容
#dbMapping:
# database: mytest
# table: user
# targetTable: mytest2.user
# targetPk:
# id: id
# mapAll: true
# targetColumns:
# id:
# name:
# role_id:
# c_time:
# test1:
# etlCondition: "where c_time>={}"
# commitBatch: 3000 # 批量提交的⼤⼩
## Mirror schema synchronize config
dataSourceKey: defaultDS
destination: example_rdb_db1
groupId: g1
outerAdapterKey: postgres1 # 对应l中outAdapters的key
concurrent: true # 并⾏同步的表必须保证主键不会更改及主键不能为其他同步表的外键dbMapping:
mirrorDb: true
database: db1 # 源数据库和⽬标数据库名
2. l
⼿动运⾏curl ETL命令,进⾏此⽂件中表的全量数据同步
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
Oracle数据库批量数据无损迁移技术
下一篇 »
发表评论