FlinkCDC同步MySQL分库分表分⽚数据
mysql-cdc⽀撑正则表达式的库名表名来匹配多个库多个表来获取分库分表情况下的mysql数据。只需要在创建flink源表时在数据库和表名上使⽤正则匹配即可。僵尸片大全林正英
建表语句:
DROP TABLE IF EXISTS `2person`;
CREATE TABLE `2person` (
`id` int(10) unsigned NOT NULL AUTO_INCREMENT,
`name` varchar(20) NOT NULL DEFAULT '',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=2003 DEFAULT CHARSET=utf8;
-
- ----------------------------
-- Records of 1person
-- ----------------------------
INSERT INTO `2person` VALUES ('2001', '2001name');
INSERT INTO `2person` VALUES ('2002', '2name');
DROP TABLE IF EXISTS `3person`;
CREATE TABLE `3person` (
`id` int(10) unsigned NOT NULL AUTO_INCREMENT,
`name` varchar(20) NOT NULL DEFAULT '',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=3003 DEFAULT CHARSET=utf8;
-
- ----------------------------
-- Records of 1person
-- ----------------------------
INSERT INTO `3person` VALUES ('3001', '3001name');
INSERT INTO `3person` VALUES ('3002', '3name');
CREATE TABLE `person_sum` (
`id` int(10) unsigned NOT NULL AUTO_INCREMENT,
`name` varchar(20) NOT NULL DEFAULT '',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=3003 DEFAULT CHARSET=utf8;
java调⽤sql(也可以直接在flinksql客户端执⾏其中的sql):
import org.apache.flink.vironment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class Mysql2MysqlRemote {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = ExecutionEnvironment(); env.setParallelism(1);
StreamTableEnvironment tableEnv = ate(env);
String sourceDDL =
"CREATE TABLE mysql_binlog (\n" +
" id Int,\n" +
" name STRING,\n" +
" primary key (id) not enforced\n" +
") WITH (\n" +
" 'connector' = 'mysql-cdc',\n" +
" 'hostname' = '192.168.128.1',\n" +
" 'port' = '3306',\n" +
" 'username' = 'root',\n" +
" 'password' = '123456',\n" +
" 'database-name' = 'db_[0-9]?',\n" +
" 'table-name' = '[0-9]?persion[0-9]?'\n" +
// ", 'de' = 'latest-offset'\n" +
")";
String sinkDDL =
"CREATE TABLE test_cdc (" +
" id Int," +
" name STRING," +
" primary key (id) not enforced" +
") WITH (" +
" 'connector' = 'jdbc'," +
" 'driver' = 'sql.cj.jdbc.Driver'," +
" 'url' = 'jdbc:mysql://192.168.128.1:3306/db0?serverTimezone=UTC&useSSL=false'," + " 'username' = 'root'," +
" 'password' = '123456'," +
" 'table-name' = 'person_sum'" +
")";
// 简单的聚合处理
String transformDmlSQL = "insert into test_cdc select * from mysql_binlog";
System.out.println(sourceDDL);
System.out.println(sinkDDL);
国际服刺激战场System.out.println(transformDmlSQL);
TableResult tableResult = uteSql(sourceDDL);
TableResult sinkResult = uteSql(sinkDDL);
TableResult result = uteSql(transformDmlSQL);
// 等待flink-cdc完成快照
result.print();
}
}
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="/POM/4.0.0"
xmlns:xsi="/2001/XMLSchema-instance"
xmlns:xsi="/2001/XMLSchema-instance"
xsi:schemaLocation="/POM/4.0.0 /xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion>
<groupId&le</groupId>
<artifactId>flinkCdcMysql</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<mavenpiler.source>8</mavenpiler.source>
<mavenpiler.target>8</mavenpiler.target>
<flink.version>1.13.3</flink.version>
<hive.version>1.1.0</hive.version>
<scala.binary.version>2.12</scala.binary.version>
<mysql.version>5.1.49</mysql.version>
<flinkcdc.version>2.0.1</flinkcdc.version>
<fastjson.version>1.2.75</fastjson.version>
<slf4j.version>1.7.25</slf4j.version>
<log4j.version>2.16.0</log4j.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
第一次世界经济危机<version>${flink.version}</version>
<!-- <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-parquet_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
云南梯田<version>${flinkcdc.version}</version>
</dependency>
<dependency>
己所不欲 勿施于人是什么意思<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>${fastjson.version}</version>
</dependency>
<!-- jdbc -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId> <version>${flink.version}</version>
</dependency>
中国最厉害武器是什么<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.22</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>${slf4j.version}</version>
<scope>compile</scope>
</dependency>
</dependencies>
<build>
<sourceDirectory>src/main/java/</sourceDirectory>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
<verbose>true</verbose>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.22.0</version>
<configuration>
<includes>
<include>**/*.java</include>
</includes>
<skipTests>true</skipTests>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.4</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
执⾏后,flink会启动任务将存量数据同步到⽬标表,并且如果增量修改数据也会被同步过去,可以修改源表数据后再查看⽬标表中的数据库是否变化。
其他问题:
如果各表中的主键有相同的可以通过拼接数据库名和表名来组成联合主键。
在源表建表语句中中增加
database_name STRING METADATA VIRTUAL,
table_name STRING METADATA VIRTUAL,
在⽬标表建表语句中增加
database_name STRING,
table_name STRING,
并设置联合主键
PRIMARY KEY (database_name, table_name, `id`) NOT ENFORCED
如果分库不在⼀个机器上,可以使⽤union all来解决。
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。
发表评论