CDC 的全称是 Change Data Capture ,在广义的概念上,只要是能捕获数据变更的技术,我们都可以称之为 CDC 。目前通常描述的 CDC 技术主要面向数据库的变更,是一种用于捕获数据库中数据变更的技术。
mysqlcdc需要mysql开启binlog,找到my.cnf,在[mysqld]
中加入如下信息
[mysqld]
server-id=1
log-bin=mysql-bin
binlog-format=row
重启数据库。
2.创建springboot项目,pom添加依赖
1.8
1.13.6
2.11
1.7.30org.apache.flink
flink-table-planner-blink_2.11
1.13.6org.apache.flink
flink-java
${flink.version}org.apache.flink
flink-clients_${scala.binary.version}
${flink.version}mysql
mysql-connector-java
8.0.17org.apache.flink
flink-table-api-java
${flink.version}org.apache.flink
flink-table-api-java-bridge_${scala.binary.version}
${flink.version}com.ververica
flink-connector-mysql-cdc
2.2.0org.apache.flink
flink-connector-jdbc_2.12
1.13.1
org.apache.maven.plugins
maven-shade-plugin
3.1.0package
shade
*:*
META-INF/*.SF
META-INF/*.DSA
META-INF/*.RSA
Flink cdc实现mysql到mysql代码
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;public class FlinkMysqlToMysql {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000));
env.enableCheckpointing(5000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 创建Table环境
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);// 注册源表和目标表
tEnv.executeSql(“create table sourceTable(id bigint,organization_code VARCHAR, organization_name VARCHAR, parent_code VARCHAR, parent_name VARCHAR,PRIMARY KEY (id) NOT ENFORCED ) WITH (n” +
//源表连接器一定得是mysql-cdc
“‘connector’ = ‘mysql-cdc’,” +
“‘hostname’ = ‘localhost’,n” +
” ‘port’ = ‘3306’,n” +
” ‘database-name’ = ‘quarant_db’,n” +
” ‘table-name’ = ‘organization_info’,n” +
” ‘username’ = ‘root’,n” +
” ‘password’ = ‘admin’n” +
“)”);
// Table result = tEnv.sqlQuery(“SELECT id, name,card_num,phone,address FROM quarantine”);
// tEnv.registerTable(“sourceTable”,result);
tEnv.executeSql(“create table targetTable(id bigint,organization_code VARCHAR, organization_name VARCHAR, parent_code VARCHAR, parent_name VARCHAR,PRIMARY KEY (id) NOT ENFORCED ) WITH (n” +
//目标表连接器是jdbc
“‘connector’ = ‘jdbc’,” +
“‘url’ = ‘jdbc:mysql://localhost:3306/testdb?serverTimezone=UTC&useUnicode=true&characterEncoding=utf-8&useSSL=false’,n” +
” ‘table-name’ = ‘organization_info’,n” +
” ‘username’ = ‘root’,n” +
” ‘driver’ = ‘com.mysql.cj.jdbc.Driver’,n” +
” ‘password’ = ‘admin’n” +
“)”);
// 执行CDC过程
String query = “INSERT INTO targetTable SELECT * FROM sourceTable”;
tEnv.executeSql(query).print();
}
}
运行Main方法
Flink会同步源表数据到目标表,后续源表的增删改都会实时同步至目标表中。
3.将程序打包成flink jar
idea使用快捷键control+alt+shift+s,点击Artifacts->JAR
选择Main class,点击ok
然后选择上面菜单栏Build Artifacts
点击build
生成的jar在项目目录下面有个out目录
至此,flink jar程序就写好了,可以把jar丢到flink上运行了