
| 组件 | 版本 |
| Java | 1.8.251 |
| Scala | 1.12.14 |
| Flink | 1.12.2 |
| hudi | 0.9.0 |
| Hadoop | 2.9.2 |
| Hive | 2.3.6 |
测试代码将hdfs-site.xml,core-site.xml,hive-site.xml放入resources下
object TestFlinkSQLOptHudi {
private var logger: org.slf4j.Logger = _
def main(args: Array[String]): Unit = {
logger = LoggerFactory.getLogger(this.getClass.getSimpleName)
Logger.getLogger("org.apache").setLevel(Level.WARN)
Logger.getLogger("hive.metastore").setLevel(Level.WARN)
Logger.getLogger("akka").setLevel(Level.INFO)
val tableEnv = FlinkUtils.initStreamTableEnvironment()
// datagen source 随机生成数据
// val sourceDDL =
// """
// |create table datagen_source (
// | id int,
// | data string,
// | ts as localtimestamp,
// | watermark for ts as ts
// |) with (
// | 'connector' = 'datagen',
// | 'rows-per-second'='10',
// | 'fields.id.kind'='sequence',
// | 'fields.id.start'='1',
// | 'fields.id.end'='100000',
// | 'fields.data.length'='5'
// |)
// |""".stripMargin
// tableEnv.executeSql(sourceDDL)
// kafka sink
// val sinkDDL =
// """
// |create table kafka_sink (
// | id int,
// | data string,
// | ts timestamp
// |) with (
// | 'connector' = 'kafka',
// | 'topic' = 'kafka_sink',
// | 'properties.bootstrap.servers' = 'test-lakehouse:9092',
// | 'properties.group.id' = 'lakehouse',
// | 'scan.startup.mode' = 'earliest-offset',
// | 'format' = 'json',
// | 'json.fail-on-missing-field' = 'false',
// | 'json.ignore-parse-errors' = 'true'
// |)
// |""".stripMargin
// tableEnv.executeSql(sinkDDL)
// insert to kafka
// val insertDML =
// """
// |insert into kafka_sink
// |select * from datagen_soure
// |""".stripMargin
// tableEnv.executeSql(insertDML)
// 1. 建表,写表
// val writeTableDDL =
// """
// |create table if not exists hudi_test_table (
// | id int,
// | data string,
// | ts timestamp(3),
// | `time` string,
// | `date` string
// |) partitioned by (`date`)
// | with (
// | 'connector' = 'hudi',
// | 'table.type' = 'MERGE_ON_READ',
// | 'path' = 'hdfs://test-lakehouse:9000/lakehouse/hudi_test_table',
// // | 'path'='file:///F:workspacelakehouse-huditestsrcmainresourceshudi_test_table',
// | 'write.tasks' = '1',
// | 'hoodie.datasource.write.recordkey.field' = 'id',
// | 'write.precombine.field' = 'ts',
// | 'compaction.tasks' = '1',
// | 'compaction.trigger.strategy' = 'num_or_time',
// | 'compaction.delta_commits' = '2',
// | 'compaction.delta_seconds' = '300'
// |)
// |""".stripMargin
// tableEnv.executeSql(writeTableDDL)
// 2. 直接写入数据
// val insertDML =
// """
// |insert into hudi_test_table values
// |(1, 'a', timestamp '2021-09-26 00:00:01', '2021-09-26 00:00:01', '2021-09-26'),
// |(2, 'b', timestamp '2021-09-26 00:00:02', '2021-09-26 00:00:02', '2021-09-26'),
// |(3, 'c', timestamp '2021-09-26 00:00:03', '2021-09-26 00:00:03', '2021-09-26'),
// |(4, 'd', timestamp '2021-09-27 00:00:04', '2021-09-27 00:00:04', '2021-09-27'),
// |(5, 'e', timestamp '2021-09-27 00:00:05', '2021-09-27 00:00:05', '2021-09-27'),
// |(6, 'f', timestamp '2021-09-27 00:00:06', '2021-09-27 00:00:06', '2021-09-27')
// |""".stripMargin
// tableEnv.executeSql(insertDML)
// 查询结果
//+----+---+-------+---------------------+---------------------+------------+
//| op | d | a | ts | time | date |
//+----+---+- --+---------------------+---------------------+------------+
//| +I | 1 | a | 2021-09-26T00:00:01 | 2021-09-26 00:00:01 | 2021-09-26 |
//| +I | 2 | b | 2021-09-26T00:00:02 | 2021-09-26 00:00:02 | 2021-09-26 |
//| +I | 3 | c | 2021-09-26T00:00:03 | 2021-09-26 00:00:03 | 2021-09-26 |
//| +I | 4 | d | 2021-09-27T00:00:04 | 2021-09-27 00:00:04 | 2021-09-27 |
//| +I | 5 | e | 2021-09-27T00:00:05 | 2021-09-27 00:00:05 | 2021-09-27 |
//| +I | 6 | f | 2021-09-27T00:00:06 | 2021-09-27 00:00:06 | 2021-09-27 |
// update 数据
// val updateDML =
// """
// |insert into t1 values
// |('1','update',timestamp '2021-09-26 12:12:12', date '2021-09-26')
// |""".stripMargin
// tableEnv.executeSql(updateDML)
// 查询结果
//+----+---+---------+---------------------+---------------------+------------+
//| op | d | a | ts | time | date |
//+----+---+--------+---------------------+---------------------+------------+
//| +I | 1 | update | 2021-09-26T12:12:12 | 2021-09-26 12:12:12 | 2021-09-26 |
//| +I | 2 | b | 2021-09-26T00:00:02 | 2021-09-26 00:00:02 | 2021-09-26 |
//| +I | 3 | c | 2021-09-26T00:00:03 | 2021-09-26 00:00:03 | 2021-09-26 |
//| +I | 4 | d | 2021-09-27T00:00:04 | 2021-09-27 00:00:04 | 2021-09-27 |
//| +I | 5 | e | 2021-09-27T00:00:05 | 2021-09-27 00:00:05 | 2021-09-27 |
//| +I | 6 | f | 2021-09-27T00:00:06 | 2021-09-27 00:00:06 | 2021-09-27 |
// 3. source 数据写入 hudi
// val insertDML =
// """
// |insert into hudi_test_table
// |select
// | id,
// | data,
// | ts,
// | date_format(ts, 'yyyy-MM-dd HH:mm:ss') as `time`,
// | date_format(ts, 'yyyy-MM-dd') as `date`
// |from datagen_source
// |""".stripMargin
// tableEnv.executeSql(insertDML)
// 4. 流表查询
tableEnv.executeSql("drop table if exists hudi_test_table")
val readTableDDL =
"""
|create table if not exists hudi_test_table (
| id int,
| data string,
| ts timestamp(3),
| `time` string,
| `date` string
|) partitioned by (`date`)
| with (
| 'connector' = 'hudi',
| 'table.type' = 'MERGE_ON_READ',
| 'path' = 'hdfs://test-lakehouse:9000/lakehouse/hudi_test_table',
| 'hoodie.datasource.write.recordkey.field' = 'id',
// | 'hoodie.datasource.query.type' = 'snapshot',
| 'read.tasks' = '1',
| 'read.streaming.enabled' = 'true',
| 'read.streaming.check-interval' = '5',
| 'read.streaming.start-commit' = '000',
| 'read.utc-timezone' = 'false'
|)
|""".stripMargin
tableEnv.executeSql(readTableDDL)
val queryDML =
"""
|select count(distinct id) from hudi_test_table where `date` = '2021-09-30'
|""".stripMargin
tableEnv.executeSql(queryDML).print()
// 或者 print sink
val sinkDDL =
// val sinkDDL =
// """
// |create table print_sink (
// | id int,
// | data string,
// | ts timestamp(3),
// | `time` string,
// | `date` string
// |) with (
// | 'connector' = 'print'
// |)
// |""".stripMargin
// tableEnv.executeSql(sinkDDL)
//
// val queryDML =
// """
// |insert into print_sink
// |select * from hudi_test_table
// |""".stripMargin
// tableEnv.executeSql(queryDML)
}
}
sql-client 测试
本地测试模式
将 hudi-flink-bundle_2.11-0.9.0.jar 放入 $Flink_HOME/lib 下,启动sql-client既可。
sql-client embedded
yarn-cluster模式
选择一台服务器创建 session
./bin/yarn-session.sh -d -nm dwd_test -yjm 3g -ytm 10g -ys 2
检查session是否正常
yarn application -list | grep application_1632050203454_167491
查看 applictionId
在该服务器上每次创建 session,applictionId 都会改变。
# cat /tmp/.yarn-properties-user,比如 hdfs用户启动的session cat /tmp/.yarn-properties-hdfs #Generated YARN properties file #Wed Sep 29 17:00:09 CST 2021 dynamicPropertiesString= applicationID=application_1632050203454_167491
切换 hdfs 用户启动 sql-client 进行测试,如果 /tmp/.yarn-properties-hdfs 中 applicationId 发生改变,该会话将不可用,所以测试时只创建一次 session,可启动多个 sql-client 多个 Job 往该session提交即可。
bin/sql-client.sh embedded -s application_1632050203454_167491 # 检查会话是否可用 select now();
部分参数设置
set execution.checkpointing.interval = 10sec; set execution.result-mode = tableau; set execution.restart-strategy.type = fixed-delay; set execution.restart-strategy.attempts = 10; set execution.restart-strategy.delay = 10000;
建表
create table if not exists datagen_source ( id int, data string, ts as localtimestamp, watermark for ts as ts ) with ( 'connector' = 'datagen', --'number-of-rows' = '10000', 'rows-per-second'='10', 'fields.id.kind'='sequence', 'fields.id.start'='1', 'fields.id.end'='100000', 'fields.data.length'='5' ); create table if not exists hudi_test_table( id int, data string, ts timestamp(3), `time` string, `date` string ) comment 'flink hudi ods table : 用户维系记录表' partitioned by (`date`) with ( 'connector' = 'hudi', 'table.type' = 'MERGE_ON_READ', 'path' = 'hdfs:///user/hive/datalake/hudi_test_db/hudi_test_table', 'read.tasks' = '1', 'read.streaming.enabled' = 'true', 'read.streaming.check-interval' = '10', 'read.streaming.start-commit' = '000', 'read.utc-timezone' = 'false', 'write.tasks' = '1', 'write.rate.limit' = '2000', 'write.bucket_assign.tasks' = '3', 'write.precombine.field' = 'ts', 'write.index_bootstrap.tasks' = '3' 'index.global.enabled' = 'true' 'index.bootstrap.enabled' = 'true' 'index.state.ttl' = '15' 'clean.retain_commits' = '60', 'hoodie.datasource.write.recordkey.field' = 'id', 'hoodie.datasource.write.partitionpath.field' = 'date', 'hoodie.datasource.query.type' = 'snapshot', 'hoodie.datasource.merge.type' = 'payload_combine', 'compaction.tasks' = '1', 'compaction.trigger.strategy' = 'num_or_time', 'compaction.delta_commits' = '5', 'compaction.delta_seconds' = '300', 'compaction.max_memory' = '1024' 'hive_sync.enable' = 'true', 'hive_sync.metastore.uris' = 'thrift://ip:9083', 'hive_sync.jdbc_url' = 'jdbc:hive2://ip:10000', 'hive_sync.use_jdbc' = 'true', 'hive_sync.db' = 'hudi_test_db', 'hive_sync.table' = 'hudi_test_table', 'hive_sync.partition_fields' = 'date', --'hive_sync.partition_extractor_class' = 'org.apache.hudi.hive.NonPartitionedExtractor', 'hive_sync.partition_extractor_class' = 'org.apache.hudi.hive.MultiPartKeysValueExtractor', 'hive_sync.file_format' = 'PARQUET', 'hive_sync.support_timestamp' = 'true', 'hive_sync.username' = 'hive', 'hive_sync.password' = '123456' );
写入
-- 直接写入 insert into hudi_test_table values (1, 'a', timestamp '2021-09-26 00:00:01', '2021-09-26 00:00:01', '2021-09-26'), (2, 'b', timestamp '2021-09-26 00:00:02', '2021-09-26 00:00:02', '2021-09-26'), (3, 'c', timestamp '2021-09-26 00:00:03', '2021-09-26 00:00:03', '2021-09-26'), (4, 'd', timestamp '2021-09-27 00:00:04', '2021-09-27 00:00:04', '2021-09-27'), (5, 'e', timestamp '2021-09-27 00:00:05', '2021-09-27 00:00:05', '2021-09-27'), (6, 'f', timestamp '2021-09-27 00:00:06', '2021-09-27 00:00:06', '2021-09-27'); -- 单条 upsert insert into hudi_test_table values (1, 'update', timestamp '2021-09-26 12:12:12', '2021-09-26 12:12:12', '2021-09-26'); -- 从 source 写入 insert into hudi_test_table select id, data, ts, date_format(ts, 'yyyy-MM-dd HH:mm:ss') as `time`, date_format(ts, 'yyyy-MM-dd') as `date` from datagen_source;
流读
set table.dynamic-table-options.enabled = true; select count(0) from ;pom文件
UTF-8 1.8 1.8 3.2.2 3.8.1 3.1.1 1.8 2.12.13 2.12 2.9.2 1.12.2 0.9.0 2.3.9 compile org.scala-lang scala-library${scala.version} ${scope.type} org.apache.flink flink-runtime-web_${scala.binary.version}${flink.version} ${scope.type} org.apache.flink flink-core${flink.version} ${scope.type} org.apache.flink flink-scala_${scala.binary.version}${flink.version} ${scope.type} org.apache.flink flink-table-common${flink.version} ${scope.type} org.apache.flink flink-table-api-scala-bridge_${scala.binary.version}${flink.version} ${scope.type} org.apache.flink flink-streaming-scala_${scala.binary.version}${flink.version} ${scope.type} org.apache.flink flink-table-planner-blink_${scala.binary.version}${flink.version} ${scope.type} org.apache.flink flink-clients_${scala.binary.version}${flink.version} ${scope.type} org.apache.flink flink-csv${flink.version} ${scope.type} org.apache.flink flink-json${flink.version} ${scope.type} org.apache.flink flink-orc_${scala.binary.version}${flink.version} ${scope.type} org.apache.flink flink-statebackend-rocksdb_2.11${flink.version} ${scope.type} org.apache.flink flink-sql-connector-kafka_${scala.binary.version}${flink.version} ${scope.type} org.apache.flink flink-statebackend-rocksdb_2.11${flink.version} ${scope.type} org.apache.flink flink-connector-hive_${scala.binary.version}${flink.version} ${scope.type} com.alibaba.ververica flink-sql-connector-mysql-cdc1.2.0 ${scope.type} org.apache.hudi hudi-flink-bundle_${scala.binary.version}${hoodie.version} ${scope.type} org.apache.hudi hudi-flink-client${hoodie.version} ${scope.type} org.apache.hadoop hadoop-common${hadoop.version} ${scope.type} org.apache.hadoop hadoop-hdfs${hadoop.version} ${scope.type} org.apache.hadoop hadoop-client${hadoop.version} ${scope.type} org.apache.hive hive-exec${hive.version} ${scope.type} org.apache.logging.log4j log4j-slf4j-implorg.apache.hive hive-llap-teznet.alchim31.maven scala-maven-plugin${scala.maven.plugin.version} compile org.apache.maven.plugins maven-assembly-plugin${maven.assembly.plugin.version} jar-with-dependencies make-assembly package single