flink实时数仓(九):增量同步mysql中数据
flink实时数仓(九):增量同步mysql中数据⽂章⽬录
数据库中配置流表
CREATE TABLE `dbus_flow` (
`flowId` int(11) NOT NULL AUTO_INCREMENT COMMENT '⾃增ID',
`mode` int(11) NOT NULL COMMENT '存储类型(#PHOENIX  #NATIVE  #STRING,默认STRING)',
`databaseName` varchar(50) NOT NULL COMMENT 'database',
`tableName` varchar(50) NOT NULL COMMENT 'table',
`hbaseTable` varchar(50) NOT NULL COMMENT 'hbaseTable',
`family` varchar(50) NOT NULL COMMENT 'family',
`uppercaseQualifier` tinyint(1) NOT NULL COMMENT '字段名转⼤写, 默认为true',
`commitBatch` int(11) NOT NULL COMMENT '字段名转⼤写, 默认为true',
`rowKey` varchar(100) NOT NULL COMMENT '组成rowkey的字段名,必须⽤逗号分隔',
`status` int(11) NOT NULL COMMENT '状态:1-初始,2:就绪,3:运⾏',
PRIMARY KEY (`flowId`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
样例数据
INSERT INTO `dbus_flow` VALUES ('1','0','test','zyd_orders','learing_flink:zyd_orders','0','1','10','orderId','2'); jdbc⼯具类
package dbus.utils;
fig.GlobalConfig;
import java.sql.*;
/**
* jdbc通⽤的⽅法
关于教师节的对联
*
*/
public class JdbcUtil {
//url
private static String url = GlobalConfig.DB_URL;
//user
private static String user = GlobalConfig.USER_MAME;
//password
private static String password = GlobalConfig.PASSWORD;
//驱动程序类
private static String driverClass = GlobalConfig.DRIVER_CLASS;
/
**
* 只注册⼀次,静态代码块
*/
static{
try{
Class.forName(driverClass);
}catch(ClassNotFoundException e){
e.printStackTrace();
}
}
/**
/
**
* 获取连接⽅法
重生之回到七四年*/
public static Connection getConnection(){
try{
Connection conn = Connection(url, user, password);
return conn;
}catch(SQLException e){
e.printStackTrace();
throw new RuntimeException(e);
}
}
/**
* 释放资源的⽅法
*/
public static void close(Statement stmt,Connection conn){
if(stmt!=null){
try{
stmt.close();
}catch(SQLException e){
e.printStackTrace();
throw new RuntimeException(e);
}
}
if(conn!=null){
try{
conn.close();
}catch(SQLException e){
e.printStackTrace();
throw new RuntimeException(e);
}
}
}
/
**
* 释放资源的⽅法
*/
public static void close(ResultSet rs,Statement stmt,Connection conn){
if(rs!=null){
try{
rs.close();
}catch(SQLException e){
e.printStackTrace();银行从业人员资格证
throw new RuntimeException(e);
}
}
if(stmt!=null){
try{
stmt.close();
}catch(SQLException e){
e.printStackTrace();
throw new RuntimeException(e);
}
}
if(conn!=null){
try{
conn.close();
}catch(SQLException e){
e.printStackTrace();
throw new RuntimeException(e);
}
}
}
}
public static void main(String[] args){
System.out.Connection());
}
}
枚举类
CodeEnum
package enums;
public interface CodeEnum {
/**
* 获取枚举的code值
*
* @return
*/
Integer getCode();
}
FlowStatusEnum
package enums;
import lombok.Getter;
@Getter
public enum FlowStatusEnum implements CodeEnum { /**
* 初始状态(新添加)
*/
FLOWSTATUS_INIT(0,"初始状态"),
/**
* 就绪状态,初始采集后,可以将状态改为就绪状态    */
FLOWSTATUS_READY(1,"就绪状态"),
/**
* 运⾏状态(增量采集正在运⾏)
*/
FLOWSTATUS_RUNNING(2,"运⾏状态");
private Integer code;
private String message;
FlowStatusEnum(Integer code, String message){
}
}
HBaseStorageModeEnum
package enums;
import lombok.Getter;
@Getter
public enum HBaseStorageModeEnum implements CodeEnum{ /**
* STRING
*/
STRING(0,"STRING"),
/**
* NATIVE
*/
NATIVE(1,"NATIVE"),
/**
* PHOENIX
*/
PHOENIX(2,"PHOENIX");
private Integer code;
private String message;
HBaseStorageModeEnum(Integer code, String message){
}
}
配置类bean对象
Flow
**del;
import enums.FlowStatusEnum;
import enums.HBaseStorageModeEnum;
import lombok.Data;
import lombok.ToString;
@Data
如何选择专业@ToString
public class Flow  implements Serializable{
private Integer flowId;
/**
* HBase中的存储类型, 默认统⼀存为String,
*/
private int mode= Code();
/**
* 数据库名/schema名
*/
private String databaseName;
/**
* mysql表名
*/
private String tableName;
/**
* hbase表名
那些年错过的大雨是什么歌*/
private String hbaseTable;
/**
* 默认统⼀Column Family名称
*/
private String family;
教师节句子简短
/
**
* 字段名转⼤写, 默认为true
*/
private boolean uppercaseQualifier=true;
/**
* 批量提交的⼤⼩, ETL中⽤到
*/
private int commitBatch;
/**
*  组成rowkey的字段名,必须⽤逗号分隔
*/
private String rowKey;
/**
* 状态
*/
private int status= FlowStatusEnum.Code(); }
flink状态类编程

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。