
StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
//设置job名称
tEnv.getConfig().getConfiguration().setString("pipeline.name", "FlinkCDCTest");
2、创建基于原mysql表的DWD表
connector = 'mysql-cdc’
String createSqlDwd = "CREATE TABLE dwd_order (" +
" GUID VARCHAR," +
" tenant_guid VARCHAR," +
" order_no VARCHAR," +
....
" primary key(GUID) NOT ENFORCED" +
") WITH (" +
"'connector' = 'mysql-cdc'," +
"'hostname'=''hostname," +
"'port'='port'," +
"'username'='username'," +
"'password'='password'," +
"'database-name'='datebase'," +
"'table-name'='table'" +
")";
tEnv.executeSql(createSqlDwd);
3、创建聚合后的主题宽表DWS表,作为目标表
connector’ = 'jdbc’
String createSqlDws = "CREATE TABLE flink (" +
"account_month varchar(6), " +
"sales_territory_guid varchar(32), " +
"primary key(account_month) NOT ENFORCED" +
") WITH (" +
"'connector' = 'jdbc'," +
"'url' = 'jdbc:mysql://ip:port/database'," +
"'username' = 'root'," +
"'password' = 'password'," +
"'table-name' = 'flink_test'," +
"'sink.buffer-flush.max-rows' = '1'," +
"'sink.buffer-flush.interval' = '1s'," +
"'sink.max-retries' = '3'" +
")";
tEnv.executeSql(createSqlDws);
3、实时监听DWD表处理数据到DWS表
String insertSql = "INSERT INTO flink (account_month,sales_territory_guid) " +
"select account_month, " +
"sales_territory_guid " +
"from dwd_order ";
tEnv.executeSql(insertSql);
4、pom.xml设置主程序入口
flink-test-job src/main/java src/main/resources src/test/java maven-compiler-plugin 3.1 1.8 1.8 org.apache.maven.plugins maven-assembly-plugin 2.6 jar-with-dependencies true com.csbr.cloud.flinkserver.task.Test make-assembly package single
项目目录:
Flink 使用之 MySQL CDC:
https://www.jianshu.com/p/0a47e387de51