栏目分类:
子分类:
返回
终身学习网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
终身学习网 > IT > 前沿技术 > 大数据 > 大数据系统

Flink CDC 实时同步mysql

大数据系统 更新时间:发布时间: 百科书网 趣学号
1、创建环境
StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
//设置job名称
tEnv.getConfig().getConfiguration().setString("pipeline.name", "FlinkCDCTest");    
2、创建基于原mysql表的DWD表

connector = 'mysql-cdc’

 String createSqlDwd = "CREATE TABLE dwd_order (" +
            "  GUID VARCHAR," +
            "  tenant_guid VARCHAR," +
            "  order_no VARCHAR," +
		    ....
            "  primary key(GUID)  NOT ENFORCED" +
            ") WITH (" +
            "'connector' = 'mysql-cdc'," +
            "'hostname'=''hostname," +
            "'port'='port'," +
            "'username'='username'," +
            "'password'='password'," +
            "'database-name'='datebase'," +
            "'table-name'='table'" +
            ")";
    tEnv.executeSql(createSqlDwd);
3、创建聚合后的主题宽表DWS表,作为目标表

connector’ = 'jdbc’

String createSqlDws = "CREATE TABLE flink (" +
                "account_month varchar(6), " +
                "sales_territory_guid varchar(32), " +
                "primary key(account_month)  NOT ENFORCED" +
                ") WITH (" +
                "'connector' = 'jdbc'," +
                "'url' = 'jdbc:mysql://ip:port/database'," +
                "'username' = 'root'," +
                "'password' = 'password'," +
                "'table-name' = 'flink_test'," +
                "'sink.buffer-flush.max-rows' = '1'," +
                "'sink.buffer-flush.interval' = '1s'," +
                "'sink.max-retries' = '3'" +
                ")";
                
                tEnv.executeSql(createSqlDws);
3、实时监听DWD表处理数据到DWS表
String insertSql = "INSERT INTO flink (account_month,sales_territory_guid) " +
                "select account_month, " +
                "sales_territory_guid " +
                "from dwd_order ";

        tEnv.executeSql(insertSql);
4、pom.xml设置主程序入口

        flink-test-job
        
            
                src/main/java
            
            
                src/main/resources
            
        
        
            
                src/test/java
            
        
        
            
                maven-compiler-plugin
                3.1
                
                    1.8
                    1.8
                
            
            
                org.apache.maven.plugins
                maven-assembly-plugin
                2.6
                
                    
                        jar-with-dependencies
                    
                    
                        
                            true
                            
                            com.csbr.cloud.flinkserver.task.Test
                        
                    

                
                
                    
                        make-assembly
                        package
                        
                            single
                        
                    
                
            
        
    

项目目录:

Flink 使用之 MySQL CDC:
https://www.jianshu.com/p/0a47e387de51

转载请注明:文章转载自 www.051e.com
本文地址:http://www.051e.com/it/460515.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 ©2023-2025 051e.com

ICP备案号:京ICP备12030808号