背景 用Dinky数据平台 FlinkCDC收集Mysql BinLog 至 Doris 搭建实时数仓
问题 用Dinky CDCSOURCE 字段模式演变 整库同步Mysql到Doris 字段新增删除不生效
组件信息
Flink 1.17
FlinkCDC 3.1
dinky 1.1
Doris 2.1.6
Mysql 8.0
Dinky MySQLCDC 整库到 Doris需要的依赖
Flink/lib 和 dinky/extends 目录下放置
Doris 的 Flink connector jar和 MySQL CDC 的 Flink connector jar
有前三个依赖就可以完成dinky到doris的整库同步 但是字段新增删除不会生效 下面三个依赖用在flinkcdc pipeline方式的同步 下面也会截图Flink/lib 和 dinky/extends 完整的所有依赖
flink-sql-connector-mysql-cdc-3.1.0.jar
flink-doris-connector-1.17-1.6.0.jar
mysql-connector-java-8.0.27.jar
flink-cdc-pipeline-connector-doris-3.1.0.jar
flink-cdc-pipeline-connector-mysql-3.1.0.jar
flink-cdc-dist-3.1.0.jar 这个包需要自己重新编译下 参考问题四
FlinkCDC PIPELINE 样例
这个是基础的单表 整库修改tables 参考flinkcdc3.0+ 官网 还有更多Route和transform功能大家都可以去看下
Route模块提供了表名映射的能力。通过为每一个源表中的数据设置其写入的目标表,通过一对一以及多对一的映射配置,我们能够实现整库同步和简单的分库分表同步功能
简单来说 就是整库同步的时候可以自定义库名 因为Dlink的CDCSOURCE做整库的同步的时候 库表名需要与源库相同 这快感觉相当于做了个补充 挺好的
transform 就是数据转换 应该是可以在Sink之前 对数据做一些基础的转换处理
哈哈哈哈哈 理解的比较浅显 没有去试过这个功能 有不对的地方 请大家多多指教 下面会列举一些过程中遇到的问题 也请教了dinky官方人员 感谢dinky官网的帮助
EXECUTE PIPELINE WITHYAML (
source:
type: mysql
hostname: 152.136.51.49
port: 3306
username: root
password: 'xx-12345'
tables: test.teachers
server-id: 5400-5404
sink:
type: doris
fenodes: 152.136.51.49:8030
username: root
password: '123456'
table.create.properties.light_schema_change: true
table.create.properties.replication_num: 1
pipeline:
name: Sync MySQL Database to Doris
parallelism: 2
)
问题一 Caused by: org.apache.flink.table.api.ValidationException: The MySQL server has a timezone offset (0 seconds ahead of UTC) which does not match the configured timezone Asia/Shanghai. Specify the right server-time-zone to avoid inconsistencies for time-related fields.
其实就是Mysql时区的问题 在 my.cnf中添加default-time-zone=‘+08:00’ 然后重启Mysql就可以了
不要直接 SET GLOBAl time_zone = ‘Asia/Shanghai’; 重启之后不会生效
查看下 时区是东八区就可以了
SHOW GLOBAL VARIABLES LIKE 'time_zone';
问题二 ERROR org.dinky.trans.ddl.CreateCDCSourceOperation 197 execute – connection disabled org.dinky.data.exception.BusException: connection disabled
这个官方给的回答是 Dinky CDCSOURCE的接口实现较老,在新版本 Doris 上支持可能存在问题 建议使用FlinkCDC的pipeline或者doris连接器内的整库同步脚本 所以后续我走上了FlinkCDC的pipeline的道路
问题三 Caused by: java.lang.ClassNotFoundException: org.apache.flink.cdc.runtime.typeutils.EventTypeInfo.EventTypeInfo
EventTypeInfo这个类是在flink-cdc-dist-3.1.0.jar包里面 我将这个包解压缩时候 是可以看到EventTypeInfo的 但是就感觉很奇怪 根本上的原因就是 flinklib下面也需要添加pipeline的依赖 当时我只在dlink下面添加了 因为他任务底层实际上走的还是flink 所以我这边猜测dinky可能不需要这个依赖
flink-cdc-pipeline-connector-doris-3.1.0.jar
flink-cdc-pipeline-connector-mysql-3.1.0.ja
问题四 Caused by: java.io.InvalidClassException: org.apache.doris.flink.sink.batch.DorisBatchSink; local class incompatible: stream classdesc serialVersionUID = -1727597565303701005, local class serialVersionUID = -6424802353855033470
原因 flin Doris 连接器版本高了 我原先使用的时flink-doris-connector-1.17-24.0.0.jar的版本 将这个降到 flink-doris-connector-1.17-1.6.0.jar 就可以了
问题四 java.lang.NoSuchMethodError: org.apache.calcite.tools.FrameworkConfig.getTraitDefs()Lorg/apache/flink/calcite/shaded/com/google/common/collect/ImmutableList
这个主要是参考这个博主的 也是我看到dinky 使用flinkcdc pipeline的方式只有一篇文章 才想起来还是记录下 避免后面还有人遇到这种问题
这个看这个博主说是CDC依赖冲突的问题 需要将flink-cdc-dist-3.1.0.jar里面删除一部分依赖 然后重新打包下
博主文章链接
# 解压 flink-cdc-3.1.0-bin.tar.gz
tar -zxvf flink-cdc-3.1.0-bin.tar.gz
cd flink-cdc-3.1.0/lib/
# 解压jar文件·
jar -xvf flink-cdc-dist-3.1.0.jar
# 删除冲突包
rm -rf org/apache/calcite
# 重新打包
jar -cvf flink-cdc-dist-3.1.0-new.jar
自此就大功告成了 Dinky使用FlinkCDC pipeline的方式实现 Mysql到Doris的整库同步 并且可以捕获 表结构变更自动步(Schema Evolution)