一种基于flink实现分布式实时同步SqlServer数据库的方法[发明专利]
(19)中华人民共和国国家知识产权局
(12)发明专利申请
(10)申请公布号 (43)申请公布日 (21)申请号 202011493344.2
(22)申请日 2020.12.17
(71)申请人 杭州玳数科技有限公司
地址 310030 浙江省杭州市西湖区紫霞街
176号杭州互联网创新创业园2号楼8F
(72)发明人 李珞 
(74)专利代理机构 常州佰业腾飞专利代理事务
所(普通合伙) 32231
代理人 毛姗
(51)Int.Cl.
G06F  16/22(2019.01)
G06F  16/2458(2019.01)
G06F  16/27(2019.01)
(54)发明名称
一种基于f l i n k 实现分布式实时同步
SqlServer数据库的方法
(57)摘要
本发明公开了一种基于flink实现分布式实
时同步SqlServer数据库的方法,包括如下步骤:
刘震云语录1)配置数据源信息及任务相关参数信息;2)插件
端校验数据源及任务相关信息接口;3)构建数据
库监听接口;4)数据捕捉及解析接口;5)基于
flink实现分布式数据同步。本发明对于现有技
术而言,通过简单的任务参数配置,可以快速实
现对SqlServer数据库表数据的实时采集功能。权利要求书2页  说明书4页  附图3页CN 112527799 A 2021.03.19
C N  112527799
A
1.一种基于flink实现分布式实时同步SqlServer数据库的方法,其特征在于,包括如下步骤:
1)配置数据源信息及任务相关参数信息;
2)插件端校验数据源及任务相关信息接口;
3)构建数据库监听接口;
4)数据捕捉及解析接口;
5)基于flink实现分布式数据同步。
2.如权利要求1所述的基于flink实现分布式实时同步SqlServer数据库的方法,其特征在于,所述步骤2)包括如下步骤:
2.1)解析提交的json,内容包括任务的数据源链接、账号、密码、采集的类型以及表信息;
2.2)校验username,若为空则抛出IllegalArgumentException异常;校验password,若为空则抛出I l l e g a l A r g u m e n t E x c e p t i o n异常;校验u r l,若为空则抛出I l l e g a l A r g u m e n t E x c e p t i o n异常;校验d a t a b a s e N a m e,若为空则抛出I l l e g a l A r g u m e n t E x c e p t i o n异常;校验t a b l e L i s t,若为空则抛出IllegalArgumentException异常;校验cat,若为空则抛出IllegalArgumentException异常;
2.3)通过执行命令,检测SqlServer数据库是否开启了变更数据捕获功能,若未开启则任务终止并提示用户数据库未开启变更数据捕获功能。
2.4)通过执行命令,去数据库中查询所有已经开启变更数据捕获功能的表,然后与配置的tableList的表逐个进行对比,检测用户配置的表在SqlServer数据库是否启用了变更数据捕获功能,若存在未启用变更数据捕获功能的表则任务终止并提示用户具体哪张表未开启变更数据捕获功能。
3.如权利要求2所述的基于flink实现分布式实时同步SqlServer数据库的方法,其特征在于,所述步骤3)包括如下步骤:
3.1)获取上一步中校验通过的参数信息,将传入的cat字符串数组转化为对应的编号,注册采集数据类型的实例;
3.2)通过Duration.of方法,将传入的间隔参数构建出微秒级别的时间时钟;
3.3)查询数据库中启动变更数据捕获功能的表信息,并与用户配置的表进行对比,过滤出需要采集的表信息储存在白名单中;
3.4)根据白名单中的表的数据库名称、schema名称和表名称,计算出源表id以及源表的指针数组,存储在缓存中。
4.如权利要求3所述的基于flink实现分布式实时同步SqlServer数据库的方法,其特征在于,所述步骤4)包括如下步骤:
4.1)插件端启动一个线程,该线程会在每一次循环结束休眠指定的时间;
4.2)当线程第一次启动的时候会执行SELECT sys.fn_cdc_get_max_lsn()语句,从数据库中查询并记录当前数据库中最大的日志序列号lsn;
4.3)通过判断lsn二进制字段binary是否为null来中的校验当前最大日志序列号是否可用;若不可用则说明数据库代理服务没有启动。任务将每隔一段时间等待并输出警告日
志信息直到代理服务启动;
4.4)比较当前最大日志序列号与记录的序列号,若相等,则说明数据库中无数据变更。线程休眠并进入下一次的循环;
4.5)根据当前最大日志序列号与记录的日志序列号,使用SELECT*FROM.cdc语句查询出区间内所有变更的数据信息;
4.6)根据缓存中的源表指针ChangeTablePointer及id来获取具体的changeTable,通过对比lsn和table id来判断当前changeTable是否为需要捕获的表。如果是,则从当前ChangeTablePointer中判断当前数据的类型是insert、update或者delete,并与采集类型进行对比,如果符合配置的采集类型,则通过getData方法从resultSet中获取具体的数据;
4.7)逐条解析变更的数据信息。
5.如权利要求3所述的基于flink实现分布式实时同步SqlServer数据库的方法,其特征在于,所述步骤5)包括如下步骤:
5.1)构建jobgraph,将任务提交至jobmanager;
5.2)jobmanager初始化分片分发器,并根据配置的任务并行度在不同的服务器中启动对应数量的taskmanager,构建任务分布式运行的基础运行环境;
5.3)jobmanager初始化数据分片,调用数据捕捉及解析接口,开始执行数据同步任务。
一种基于flink实现分布式实时同步SqlServer数据库的方法
技术领域
[0001]本发明涉及一种基于flink实现分布式实时同步SqlServer数据库的方法。
背景技术
[0002]目前,现有的采集SqlServer数据库数据的技术是基于数据库增量字段,每隔一段时间查询数据
库的数据,因此该技术不能实时捕捉SqlServer数据库变更的数据。
发明内容
[0003]为了实时捕捉SqlServer数据库变更的数据,本发明提供了一种基于flink实现分布式实时同步SqlServer数据库的方法。
[0004]一种基于flink实现分布式实时同步SqlServer数据库的方法,其特征在于,包括如下步骤:
[0005]1)配置数据源信息及任务相关参数信息;
[0006]2)插件端校验数据源及任务相关信息接口;
[0007]3)构建数据库监听接口;
同舟共济是什么意思
[0008]4)数据捕捉及解析接口;成也萧何败萧何典故
[0009]5)基于flink实现分布式数据同步。
[0010]进一步的,所述步骤2)包括如下步骤:
[0011]  2.1)解析提交的json,内容包括任务的数据源链接、账号、密码、采集的类型以及表信息;
[0012]  2.2)校验username,若为空则抛出IllegalArgumentException异常;校验password,若为空则抛出IllegalArgumentException异常;校验url,若为空则抛出I l l e g a l A r g u m e n t E x c e p t i o n异常;校验d a t a b a s e N a m e,若为空则抛出I l l e g a l A r g u m e n t E x c e p t i o n异常;校验t a b l e L i s t,若为空则抛出IllegalArgumentException异常;校验cat,若为空则抛出IllegalArgumentException异常;
[0013]  2.3)通过执行命令,检测SqlServer数据库是否开启了变更数据捕获功能,若未开启则任务终止并提示用户数据库未开启变更数据捕获功能。
[0014]  2.4)通过执行命令,去数据库中查询所有已经开启变更数据捕获功能的表,然后与配置的tableList的表逐个进行对比,检测用户配置的表在SqlServer数据库是否启用了变更数据捕获功能,若存在未启用变更数据捕获功能的表则任务终止并提示用户具体哪张表未开启变更数据捕获功能。
[0015]进一步的,所述步骤3)包括如下步骤:
[0016]  3.1)获取上一步中校验通过的参数信息,将传入的cat字符串数组转化为对应的编号,注册采集数据类型的实例;
[0017]  3.2)通过Duration.of方法,将传入的间隔参数构建出微秒级别的时间时钟;
[0018]  3.3)查询数据库中启动变更数据捕获功能的表信息,并与用户配置的表进行对比,过滤出需要采集的表信息储存在白名单中;
浮尘层
[0019]  3.4)根据白名单中的表的数据库名称、schema名称和表名称,计算出源表id以及源表的指针数组,存储在缓存中。
[0020]进一步的,所述步骤4)包括如下步骤:
[0021]  4.1)插件端启动一个线程,该线程会在每一次循环结束休眠指定的时间;[0022]  4.2)当线程第一次启动的时候会执行SELECT sys.fn_cdc_get_max_lsn()语句,从数据库中查询并记录当前数据库中最大的日志序列号lsn;
[0023]  4.3)通过判断lsn二进制字段binary是否为null来中的校验当前最大日志序列号是否可用;若不可用则说明数据库代理服务没有启动。任务将每隔一段时间等待并输出警告日志信息直到代理服务启动;
[0024]  4.4)比较当前最大日志序列号与记录的序列号,若相等,则说明数据库中无数据变更。线程休眠并进入下一次的循环;
[0025]  4.5)根据当前最大日志序列号与记录的日志序列号,使用SELECT*FROM.cdc语句查询出区间
全世界最帅的车
内所有变更的数据信息;
[0026]4.6)根据缓存中的源表指针C h a n g e T a b l e P o i n t e r及i d来获取具体的changeTable,通过对比lsn和table id来判断当前changeTable是否为需要捕获的表。如果是,则从当前ChangeTablePointer中判断当前数据的类型是insert、update或者delete,并与采集类型进行对比,如果符合配置的采集类型,则通过getData方法从resultSet中获取具体的数据;
[0027]  4.7)逐条解析变更的数据信息。
[0028]进一步的,所述步骤5)包括如下步骤:
[0029]  5.1)构建jobgraph,将任务提交至jobmanager;
[0030]  5.2)jobmanager初始化分片分发器,并根据配置的任务并行度在不同的服务器中启动对应数量的taskmanager,构建任务分布式运行的基础运行环境;
[0031]  5.3)jobmanager初始化数据分片,调用数据捕捉及解析接口,开始执行数据同步任务。
[0032]本发明的有益效果是:本发明对于现有技术而言,通过简单的任务参数配置,可以快速实现对SqlServer数据库表数据的实时采集功能。
附图说明
[0033]结合附图,并通过参考下面的详细描述,将会更容易地对本发明有更完整的理解并且更容易地理解其伴随的优点和特征,其中:
[0034]图1是本发明中步骤2)的流程示意图;
[0035]图2是本发明中步骤3)的流程示意图;
[0036]图3为本发明中步骤4)的流程示意图。
形态各异的近义词具体实施方式
[0037]为使本发明的内容更加清楚易懂,以下结合说明书附图,对本发明的内容作进一

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系QQ:729038198,我们将在24小时内删除。