基于canal实现mysql、oracle的数据库实时同步
基于canal实现mysql、oracle的数据库实时同步
1.  前⾔
产品⽣态链中有⼀块是数据库实时同步模块,⼀直以来使⽤的是数据库特定相关的⽅法(如触发器)实现数据库同步,随着产品越来越多,该设计⽅法逐渐显得不合理。于是想到是否有通⽤的数据库实时同步模块,问了度娘,到canal。
2.  需求
2.1.      oracle发送端⽀持
canal开源代码中发送端仅仅⽀持mysql,接收端由于采⽤jdbc,mysql、oracle等可以通吃。
2.2.      传输⽅式
公司所在⾏业对⽹络安全要求较⾼,不同安全分区使⽤纵向隔离装置⽽⾮防⽕墙进⾏安全隔离,纵向隔离装置原理与linux rsync相同,因此发送端与接收端不能采⽤⽹络⽅式传输,发送端将数据库记录写⼊⽂件,通过隔离装置主动将⽂件穿到对⽅服务器后,接收端加载⽂件,将记录⼊库。
⽽canal基于Google protobuf实现⽹络通信,因此这⼀块需要被替换。
2.3.      其他需求
同步时,对某些表可能需要带条件的同步,如某列=1的所有记录,同时需要将该记录对应的⽂件同步过去,该通⽤模块如何与⽂件打配合,需要好好考虑。
某些记录需要⼈⼯同步过去,⽆论是否已经同步过。
3.  oracle数据库同步原理
oracle基于logminer实现数据库同步。
3.1.      设置LogMiner字典⽂件路径
sqlplus /nolog
SQL>conn / as sysdba
SQL>create directory utlfile as ‘/home/logmnr’;
SQL>alter system set utl_file_dir='/home/logmnr' scope=spfile;
注意⽂件夹权限给oracle
查看LogMiner⽂件夹是否设置:
SQL>show parameter utl;
3.2.      创建数据库同步⽤户
-- create user username identified by password
SQL>create user logminer identified by logminer;
SQL> grant dba to logminer;
3.3.      设置追加⽇志
添加追加⽇志:
SQL>alter database add supplemental log data(primary key,unique index) columns;
检查追加⽇志:
SQL>select supplemental_log_data_min,supplemental_log_data_pk,supplemental_log_data_ui from v$database;
删除追加⽇志:
alter database drop supplemental log data (primary key ,unique index) columns
3.4.      重启数据库
SQL> shutdown abort
SQL> startup
3.5.      查看⽇志清单
SQL>select * from v$logfile
3.6.      程序实现
3.6.1.    创建数据库字典
exec dbms_logmnr_d.build(dictionary_filename=>'a',dictionary_location=>'/home/logmnr');
3.6.2.    添加⽇志
可先查看⽇志清单,然后根据⽇志清单动态⽣成添加⽇志语句
exec dbms_logmnr.add_logfile(logfilename=>'/u01/app/oracle/oradata/orcl/redo01.log', options=>w); exec dbms_logmnr.add_logfile(logfilename=>'/u01/app/oracle/oradata/orcl/redo02.log', options=>dbms_logmnr.addfile); exec dbms_logmnr.add_logfile(logfilename=>'/u01/app/oracle/oradata/orcl/redo03.log', options=>dbms_logmnr.addfile);
3.6.3.    从某个scn序列号开始分析⽇志
exec dbms_logmnr.start_logmnr(startScn=>’0’, dictfilename=>'/home/a’,
options=>_rowid_in_stmt);
3.6.
4.    查询所有结果
SELECT scn,operation,timestamp,status,sql_redo FROM v$logmnr_contents WHERE seg_owner='ZH9000’ and
seg_type_name=’TABLE’;
演员刘涛简历3.6.5.    释放分析内存
exec d_logmnr;
3.7.      附件
3.7.1.    options定义
COMMITTED_DATA_ONLY顾名思义就是只显⽰已经提交了的,那些正在进⾏中的及Oracle内部操作都忽略掉了
DDL_DICT_TRACKING适⽤于在线⽇志存放LogMiner字典的情况,当表发⽣了添加字段等情况,字典⾃动更新。
NO_SQL_DELIMITER 去掉SQL_REDO及SQL_UNDO中SQL语句最后的分号,以CURSOR⽅式循环执⾏解析出的SQL会很⽅便和快捷。
NO_ROWID_IN_STMT在SQL_REDO和SQL_UNDO列语句中去掉ROWID
我只在乎你日语
4.  canal重构
canal.serverty.handler.SessionHandler负责处理⽹络版本接收端的订阅(subscription)、记录传输(get)、取消订阅(unsubscribe)、传输确认(ack),经模仿slave向master数据库请求后,回应给接收端。
改为⽂件传输后,canal.serverty.handler.SessionHandler不再继承SimpleChannelHandler,抽出subscription、get、unsubscribe、ack⽅法,在get完成时不再⽹络传输,翻译成sql语句后写本地⽂件。mysql版本的SessionHandler 修改后的代码如下:
public MysqlSessionHandler(CanalServerWithEmbedded embeddedServer){
}
public void subscription(){
ClientIdentity clientIdentity = new ClientIdentity(destination, clientId, filter);
MDC.put("destination", Destination());
// 尝试启动,如果已经启动,忽略
if (!embeddedServer.Destination())) {
ServerRunningMonitor runningMonitor =
if (!runningMonitor.isStart()) {
runningMonitor.start();
}
}
embeddedServer.subscribe(clientIdentity);
}
public void unsubscribe(){
ClientIdentity clientIdentity = new ClientIdentity(destination, clientId, filter);
MDC.put("destination", Destination());
embeddedServer.unsubscribe(clientIdentity);
stopCanalInstanceIfNecessary(clientIdentity);// 尝试关闭
//NettyUtils.Channel(), null);
}
public long get(){
ClientIdentity clientIdentity = new ClientIdentity(destination, clientId, filter);
MDC.put("destination", Destination());
int batchSize = 1000;
Message message = WithoutAck(clientIdentity, batchSize);
Entries(), Id());
初中家长意见怎么写Id();
}
public void ack(long batchId){
ClientIdentity clientIdentity = new ClientIdentity(destination, clientId, filter);
MDC.put("destination", Destination());
embeddedServer.ack(clientIdentity, batchId);
日本护肤品哪个牌子好
}
private void stopCanalInstanceIfNecessary(ClientIdentity clientIdentity) {
List<ClientIdentity> clientIdentitys = embeddedServer.Destination());
if (clientIdentitys != null && clientIdentitys.size() == 1 && ains(clientIdentity)) {
ServerRunningMonitor runningMonitor =
if (runningMonitor.isStart()) {
}
}
}
private void printEntry(List<Entry> entrys, long batchId) {
for (Entry entry : entrys) {
if (EntryType() == EntryType.TRANSACTIONBEGIN || EntryType() ==
EntryType.TRANSACTIONEND) {
continue;
}
RowChange rowChage = null;
try {
rowChage = RowChange.StoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + String(),e);            }
EventType eventType = EventType();
/*System.out.println(String.format("==> binlog[%s:%s] , name[%s,%s] , eventType : %s",
查询商标是否被注册eventType));
*/
if(dbName.Header().getSchemaName()))
{
String Header().getTableName();
List<String> sqls=new LinkedList<String>();
for (RowData rowData : RowDatasList()) {
String sql=buildSql(tableName, rowData, eventType);
if(sql!=null){
sqls.add(sql);
}
}
try {
toLocal(sqls, batchId);
明朝历代帝王排序
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
final static String DELETE_SQL="delete from _tn_ where _cn_";
final static String INSERT_SQL="insert into _tn_(_cn_) values(_vn_)";
final static String UPDATE_SQL="update _tn_ set _vn_ where _cn_";
//创建SQL
private String buildSql(String tableName,RowData rowData,EventType eventType){
tableName="`"+tableName+"`";
if (eventType == EventType.DELETE) {
StringBuffer cn=new StringBuffer();
StringBuffer cn2=new StringBuffer();
for (Column column : BeforeColumnsList()) {
IsKey()){
if(cn2.length()>0){
cn2.append(" and ");
}
cn2.append("`"+Name()+"`");
cn2.append("=");
if(!MysqlType())){

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。