FlinkCDC 入门之数据同步和故障恢复
创始人
2025-05-31 00:24:01
0

前言

FlinkCDC 是一款基于 Change Data Capture(CDC)技术的数据同步工具,可以用于将关系型数据库中的数据实时同步到 Flink 流处理中进行实时计算和分析,下图来自官网的介绍。img

下图1是 FlinkCDC 与其它常见 开源 CDC 方案的对比:

2

可以看见的是相比于其它开源产品,FlinkCDC 不仅支持增量同步,还支持全量/全量+增量的同步,同时 FlinkCDC 还支持故障恢复(基于检查点机制实现),能够快速恢复数据同步的进度,并且支持的数据源也很丰富2(在 2.3 版本已支持 MongoDB、MySQL、OceanBase、Oracle、PostgressSQL、SQLServer、TiDB、Db2 等数据源)。

本文将介绍 FlinkCDC 在数据同步和故障恢复等方面的内容(以 MySQL 和 Oracle 为例),同时完整代码也已上传到GitHub。

效果展示

MySQL

动画

Oracle(相比 MySQL 延迟会稍高)

动画

数据库配置

MySQL(5.7)

修改my.cnf配置文件(Windows 下是 my.ini 文件),增加以下配置内容:

[mysqld]
# 开启 binlog
log-bin=mysql-bin
# 选择 ROW 模式
binlog-format=ROW
# 对于 MySQL 集群, 不同节点的 server_id 必须不同
server_id=1
# 过期时间
expire_logs_days=30

Tips: 修改完成后需要重启 MySQL 服务

建库建表:

# 建库
create database flink;
# 建表
create table flink.`user` (`id` bigint(20) not null,`username` varchar(20) default null,`password` varchar(63) default null,`status` int(2) default null,`create_time` datetime default null,primary key (`id`)
) ENGINE = InnoDB default CHARSET = utf8mb4;

创建用户并授权:

# 创建用户 flink
CREATE USER flink IDENTIFIED BY 'flink';
# 授权
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'flink'@'%';
# 将 flink 库的所有权限授权给 flink 用户
GRANT ALL PRIVILEGES ON flink.* TO 'flink'@'%';
# 刷新权限
FLUSH PRIVILEGES;

Oracle(11g)

以 DBA 身份连接:

# SID 需要根据实际情况进行设置, 比如: XE.
export ORACLE_SID=SID
sqlplus /nolog
CONNECT sys/manager AS SYSDBA

配置日志:

alter system set db_recovery_file_dest_size = 20G;
# 日志文件的地址可以根据自己的情况进行设置
alter system set db_recovery_file_dest = '/opt/oracle/oradata/recovery_area' scope=spfile;
shutdown immediate;
startup mount;
alter database archivelog;
alter database open;

确认是否配置成功:

archive log list;

image-20230319172813540

创建用户并授权:

CREATE USER flink IDENTIFIED BY flink;
GRANT CREATE SESSION TO flink;
GRANT FLASHBACK ANY TABLE TO flink;
GRANT SELECT ANY TABLE TO flink;
GRANT SELECT_CATALOG_ROLE TO flink;
GRANT EXECUTE_CATALOG_ROLE TO flink;
GRANT SELECT ANY TRANSACTION TO flink;
GRANT CREATE TABLE TO flink;

建表并增加日志记录:

# 建表
CREATE TABLE flink."user" (id NUMBER NOT NULL,username VARCHAR2(20),password VARCHAR2(63),status INTEGER,create_time TIMESTAMP,PRIMARY KEY(id)
);
# 日志配置
ALTER TABLE flink."user" ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;

代码配置

运行环境

依赖版本
Java17
flink-connector2.1.0
flink1.13.0
maven3.6.2

连接配置

flinkcdc:data-source:# 默认类型为 MySQLaddr: localhost:3306database: flinkusername: flinkpassword: flinktable-list:- user

Tips: 关于数据源的连接完整配置属性可参考 DataSourceProperties.java 文件,关于检查点的配置可参考 CheckPointProperties.java 文件

恢复点配置

为了实现故障恢复(应用停止运行过程中数据库有增删改操作的情况)的情况,需要在代码中进行恢复点的相关配置:

// 获取配置的恢复点路径, 首次运行不存在会默认进行创建
var saveDir = checkPointProperties.getSaveDir();
var folder = new File(saveDir);
if (!folder.exists() && !folder.isDirectory()) {if (!folder.mkdirs()) {throw new IllegalStateException("文件夹创建失败");}
}
var dataSourceType = dataSourceProperties.getType().name().toLowerCase();
var dataSourceSaveDir = saveDir + File.separator + dataSourceType;
var savepointDir = SavepointUtils.getSavepointRestore(dataSourceSaveDir);
var configuration = new Configuration();
if (savepointDir != null) {// 设置恢复点路径var savepointRestoreSettings = SavepointRestoreSettings.forPath(savepointDir);SavepointRestoreSettings.toConfiguration(savepointRestoreSettings, configuration);
}
// 启用检查点并设置检查点的保存路径
var env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);
env.enableCheckpointing(checkPointProperties.getInterval(), CheckpointingMode.EXACTLY_ONCE);
var checkpointConfig = env.getCheckpointConfig();
checkpointConfig.setCheckpointStorage(checkPointProperties.getStorageType().getPrefix() + dataSourceSaveDir);

通用注意点

为了避免数值类型显示是一堆字符串,需要增加以下配置:

// 详见 https://github.com/ververica/flink-cdc-connectors/wiki/FAQ(ZH)#%E9%80%9A%E7%94%A8-faq Q5
prop.setProperty("bigint.unsigned.handling.mode","long");
prop.setProperty("decimal.handling.mode","double");

ORACLE 配置注意点

为了避免日志增长过快以及读取日志满的问题,需要增加以下配置:

// 详见 https://github.com/ververica/flink-cdc-connectors/wiki/FAQ(ZH)#oracle-cdc-faq Q1
prop.setProperty("log.mining.strategy", "online_catalog");
prop.setProperty("log.mining.continuous.mine", "true");

对于 Oracle 11g,连接配置中需要增加:

// 详见 https://github.com/ververica/flink-cdc-connectors/wiki/FAQ(ZH)#oracle-cdc-faq Q2
prop.setProperty("database.tablename.case.insensitive", "false");

项目运行及使用介绍

下载代码

由于本人将博客相关的示例代码都集中到了一个仓库,因此如果不想拉取整个仓库,推荐使用GitZip for github这个插件,就可以只下载部分的文件(选中指定文件后点击右下角的下载按钮):

image-20230319183238608

使用介绍

对于需要监控的表,只需要创建相应的实体类,并新建一个类继承AbstractMessageListener(可重写其中的 create、delete、update、read等方法处理相应的事件)即可,其中 FlickCdcMessageListener 注解内的参数填相应的表名即可监听相应的表变更事件(同时需要在 yaml 文件中 tableList 中增加要监听的表,如果是 Oracle 数据库还需要增加日志配置):

import cn.butterfly.flinkcdc.annotation.FlickCdcMessageListener;
import cn.butterfly.flinkcdc.pojo.User;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;/*** 用户表消息监听器** @author zjw* @date 2023-03-14*/
@Slf4j
@Component
@FlickCdcMessageListener("user")
public class UserMessageListener extends AbstractMessageListener {@Overridepublic void create(User user) {log.info("新增用户: {}", user);}}

其它注意点

  1. FlinkCDC 默认的同步策略是第一次运行先进行全量同步,后续即可进行增量读取,因此表数据量比较大的时候,重写 AbstractMessageListener#read 方法时需要特别注意处理大量数据的情况。
  2. 由于 Flink CDC 是根据数据库的事务日志来获取数据更改的,如果恢复点之后发生了数据更改,那么在恢复点之后的数据将被重复读取,因此需要考虑重复读取的情况。

总结

本文简单介绍了 FlinkCDC 的数据同步和故障恢复方面的内容,对相关基础知识进行了省略(例如检查点),如果是第一次接触和使用 FlinkCDC,建议先结合官网的示例进行学习,同时建议先通读一篇官方的FAQ。

参考文献

  1. 基于 Flink CDC 实现海量数据的实时同步和转换
  2. https://github.com/ververica/flink-cdc-connectors#supported-tested-databases

相关内容

热门资讯

走进小城看消费丨江西资溪:低碳...   夏日时节下午4点,江西省抚州市资溪县大觉山景区漂流终点依然热闹。来自南昌的游客余鑫漂流结束后没有...
【中原晨会0625】市场分析专... 来源:市场资讯 (来源:中原证券研究所) 本期重点研报目录 【中原策略】市场分析:电子半导体领涨 ...
南向资金连买4日!低费率+可月... 6月25日早盘,港股红利资产震荡整理。截至11时14分,港股红利低波ETF招商(520550)下跌0...
618成交破百万!紫荆花用一套... 一年一度的618年中大促,是消费市场的晴雨表,也是品牌间最激烈的角力场。当各大品牌在直播间里铆足了劲...
原创 黄... 2026年6月25日的国际金价已经从前期的5500美元高点跌到4200美元下方,累计跌幅超过22%,...
英伟达CEO:Vera Rub... 截至9:38,中证半导体材料设备主题指数(931743)涨2.36%创新高;权重股中,中微公司涨3....
再被催债16亿!“钢铁大王”戴... 澎湃新闻记者 贺梨萍 因“铁本事件”入狱五年的戴国芳重返钢铁行业,但他并没有完成从阶下囚再到“钢铁大...
周三原油价格下跌 随着美国和伊朗在和平谈判中取得进展,越来越多的油轮公开穿越霍尔木兹海峡,原油在战时的价格上涨已经蒸发...
这种蛋白是大脑衰老的开关 这种蛋白是大脑衰老的开关 清晨,假设一位五十岁左右的王女士发现自己常常把手机放在熟悉的抽屉里又找不到...
信通院牵头算力Token出海生... 盘面上,截至11:04,中证科创创业50指数(931643)涨1.68%,创历史新高;权重股中,芯原...
海外 774 亿营收背后:日本... 文 | 游戏价值论 6月23日,彭博社报道了腾讯正在围绕出售多家日本游戏工作室少数股权开展谈判,包...
餐饮“抢人”大战:把店开到公交... 作者 |餐饮老板内参 内参君 医院、公交站、演唱会…餐饮品牌,正在无孔不入 在北京儿童医院,肯德基...
快讯 | 外资扫货!陈翊庭:港... 港交所行政总裁陈翊庭在接受《中国证券报》专访时指出,国际资本对中国资产的看法已彻底扭转,布局中国市场...
2777.77元!A股“股王”... 25日早盘,昨天创下历史新高的A股“股王”联讯仪器,今天上午继续走强,盘中股价再度刷新历史新高。 截...
原创 今... 欧洲自己的媒体直接下结论,欧盟衰退躲不掉,内部分裂拦不住,现在就连欧洲顶尖工业巨头,都偷偷在用中国的...
黄仁勋股东大会放言:本轮AI基... 在当地时间6月24日的英伟达(NVDA.O)2026年度股东大会上,股东批准了该公司全部10名董事会...
国际油价大跌 新华社消息, 纽约原油期货主力合约价格24日盘中跌破每桶70美元,为伊朗战事爆发以来首次。 市场分析...
马云带队插秧,什么信号? 一场别开生面的“务农”,让外界看到了一个不一样的阿里巴巴。 近日,阿里巴巴合伙人、高德董事长刘振飞在...
全球最大产能,最高丰度达99.... 本文转自【科技日报】; 6月23日,高丰度硼-10同位素技术暨产业化成果发布会在山东省东营市举办,全...
黄金大跳水!金饰克价年内暴跌近... 25日,现货黄金盘中震荡,截至发稿,报3985.070美元/盎司,跌0.17%。 当地时间24日,...