栏目分类:
子分类:
返回
终身学习网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
终身学习网 > IT > 前沿技术 > 大数据 > 大数据系统

Hudi on Flink上手使用总结

大数据系统 更新时间:发布时间: 百科书网 趣学号
解决问题

hudi 基于 mini-batch 的增量计算模型可以提升部分场景的时延、节省计算成本。

随着流计算和实时数仓的普及,Hudi 社区也在积极的拥抱变化,通过流计算对原有基于 mini-batch 的增量计算模型不断优化演进:在 0.7 版本引入了流式数据入湖,在 0.9 版本支持了原生的 CDC format。

hudi编译

环境:flink 1.13.2, YARN 3.1.1

git clone https://github.com/apache/hudi.git && cd hudi
# flink版本修改为1.13.2  1.13.2
mvn clean package -DskipTests
# 注意:默认是用scala-2.11编译的,如果使用的scala是2.12版本,在编译时需要指定具体的profile
mvn clean package -Dscala-2.12
flink-quick-start-guide flink cluster搭建

搭建flink 集群: 修改conf/flink-conf.yaml配置,然后启动

taskmanager.numberOfTaskSlots: 10
parallelism.default: 4
# Fault tolerance and checkpointing
state.backend: rocksdb
state.checkpoints.dir: hdfs://bigdata01:8020/flink/checkpoint/sqlclient
state.savepoints.dir: hdfs://bigdata01:8020/flink/savepoint/sqlclient
state.backend.incremental: true
jobmanager.execution.failover-strategy: region
execution.checkpointing.interval: 300000
启动sql client
./bin/sql-client.sh embedded -j ../hudi-jars/hudi-flink-bundle_2.11-0.10.0-SNAPSHOT.jar shell
batch 模式测试
CREATE TABLE t1(
  uuid VARCHAR(20) NOT NULL PRIMARY KEY NOT ENFORCED,
  name VARCHAR(10),
  age INT,
  ts TIMESTAMP(3),
  `partition` VARCHAR(20)
)
PARTITIonED BY (`partition`)
WITH (
  'connector' = 'hudi',
  'path' = 'hdfs://bigdata01:8020/hudi/t4',
  'table.type' = 'MERGE_ON_READ' 
);

INSERT INTO t1 VALUES
  ('id1','Danny',23,TIMESTAMP '2021-09-30 11:00:01','par1'),
  ('id2','Stephen',33,TIMESTAMP '2021-09-30 11:00:02','par1'),
  ('id3','Julian',53,TIMESTAMP '2021-09-30 11:00:03','par2'),
  ('id4','Fabian',31,TIMESTAMP '2021-09-30 11:00:04','par2'),
  ('id5','Sophia',18,TIMESTAMP '2021-09-30 11:00:05','par3'),
  ('id6','Emma',20,TIMESTAMP '2021-09-30 11:00:06','par3'),
  ('id7','Bob',44,TIMESTAMP '2021-09-30 11:00:07','par4'),
  ('id8','Han',56,TIMESTAMP '2021-09-30 11:00:08','par4');

insert into t1 values('id1','张三',27,TIMESTAMP '2021-09-30 11:10:01','par1');

insert into t1 values('id9','Danny',27,TIMESTAMP '2021-09-30 11:00:01','par1');

insert into t1 values('id2','李四',27,TIMESTAMP '2021-09-30 11:10:01','par1');

stream模式测试

read.streaming.check-interval 指定了 source 监控新的 commits 的间隔为 4s;

CREATE TABLE t1(
   uuid VARCHAR(20) NOT NULL PRIMARY KEY NOT ENFORCED,
  name VARCHAR(10),
  age INT,
  ts TIMESTAMP(3),
  `partition` VARCHAR(20)
)
PARTITIonED BY (`partition`)
WITH (
  'connector' = 'hudi',
  'path' = 'hdfs://bigdata01:8020/hudi/t4',
  'table.type' = 'MERGE_ON_READ',
  'read.streaming.enabled' = 'true',  
  'read.streaming.check-interval' = '4' 
 );

往表中写入一条数据
insert into t1 values(‘id1’,‘张三’,27,TIMESTAMP ‘2021-09-30 11:10:01’,‘par1’);
insert into t1 values(‘id9’,‘Danny’,27,TIMESTAMP ‘2021-09-30 11:00:01’,‘par1’);
insert into t1 values(‘id2’,‘李四’,27,TIMESTAMP ‘2021-09-30 11:10:01’,‘par1’);

3.4隔几秒后在流模式可以读取到变化的数据

将kafka上的cdc数据写入hudi

Flink CDC 2.x changlog格式的使用【监听MySQL表变化并写入kafka示例】 将mysql业务表的实时变化写到kafka中

flink sql配置

注意:需要在yarn上运行,并设置正确的checkpoint

create table orders_cdc2kafka ( 
    order_id INT,
    order_date TIMESTAMP(0),
    customer_name STRING,
    price DECIMAL(10, 5),
    product_id INT,
    order_status BOOLEAN,
    PRIMARY KEY(order_id) NOT ENFORCED
)
 with ( 
  'connector' = 'kafka',
  'topic' = 'flink_test',
  'properties.bootstrap.servers' = '172.25.6.7:9092', 
  'properties.group.id' = 'flink_gp_test99',
  'scan.startup.mode' = 'group-offsets',
  'format' = 'changelog-json',
  'changelog-json.ignore-parse-errors' = 'true',
  'changelog-json.timestamp-format.standard' = 'SQL',
  'properties.zookeeper.connect' = '172.25.6.7:2181/kafka'
 );

CREATE TABLE orders_hudi(
    order_id INT,
    order_date TIMESTAMP(0),
    customer_name STRING,
    price DECIMAL(10, 5),
    product_id INT,
    order_status BOOLEAN,
    PRIMARY KEY(order_id) NOT ENFORCED
) WITH (
  'connector' = 'hudi',
  'path' = 'hdfs://bigdata01:8020/hudi/orders',
  'table.type' = 'MERGE_ON_READ',
  'changelog.enabled' = 'true',
  'compaction.async.enabled' = 'false',
  'hoodie.datasource.write.recordkey.field' = 'order_id',
  'write.operation' = 'upsert',
  'write.precombine.field' = 'order_date'
);

insert into orders_hudi select * from orders_cdc2kafka;
查询hudi上的数据
select * from orders_hudi;

只要orders的数据有变化,就可以在sql client中近实时的看到数据的变化

将kafka debezium-json写入hudi

参见:使用 Flink Hudi 构建流式数据湖

CREATE TABLE debezium_source(
  id INT NOT NULL,
  ts BIGINT,
  name STRING,
  description STRING,
  weight DOUBLE
) WITH (
  'connector' = 'filesystem',
  'path' = 'hdfs://bigdata01:8020/hudi/tmp/source.data',
  'format' = 'debezium-json'
);

select * from debezium_source;


CREATE TABLE hoodie_table(
  id INT NOT NULL PRIMARY KEY NOT ENFORCED,
  ts BIGINT,
  name STRING,
  description STRING,
  weight DOUBLE
) WITH (
  'connector' = 'hudi',
  'path' = 'hdfs://bigdata01:8020/hudi/hoodie_table',
  'table.type' = 'MERGE_ON_READ',
  'changelog.enabled' = 'true',
  'compaction.async.enabled' = 'false'
);

select * from hoodie_table;
select * from hoodie_table;
select count (*) from hoodie_table;
select count (*) from hoodie_table;

source.data对应的数据如下:

{"before":null,"after":{"id":101,"ts":1000,"name":"scooter","description":"Small 2-wheel scooter","weight":3.140000104904175},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606100,"transaction":null}
{"before":null,"after":{"id":102,"ts":2000,"name":"car battery","description":"12V car battery","weight":8.100000381469727},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null}
{"before":null,"after":{"id":103,"ts":3000,"name":"12-pack drill bits","description":"12-pack of drill bits with sizes ranging from #40 to #3","weight":0.800000011920929},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null}
{"before":null,"after":{"id":104,"ts":4000,"name":"hammer","description":"12oz carpenter's hammer","weight":0.75},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null}
{"before":null,"after":{"id":105,"ts":5000,"name":"hammer","description":"14oz carpenter's hammer","weight":0.875},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null}
{"before":null,"after":{"id":106,"ts":6000,"name":"hammer","description":"16oz carpenter's hammer","weight":1},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null}
{"before":null,"after":{"id":107,"ts":7000,"name":"rocks","description":"box of assorted rocks","weight":5.300000190734863},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null}
{"before":null,"after":{"id":108,"ts":8000,"name":"jacket","description":"water resistent black wind breaker","weight":0.10000000149011612},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null}
{"before":null,"after":{"id":109,"ts":9000,"name":"spare tire","description":"24 inch spare tire","weight":22.200000762939453},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":0,"snapshot":"true","db":"inventory","table":"products","server_id":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1589355606101,"transaction":null}
{"before":{"id":106,"ts":6000,"name":"hammer","description":"16oz carpenter's hammer","weight":1},"after":{"id":106,"ts":10000,"name":"hammer","description":"18oz carpenter hammer","weight":1},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1589361987000,"snapshot":"false","db":"inventory","table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":362,"row":0,"thread":2,"query":null},"op":"u","ts_ms":1589361987936,"transaction":null}
{"before":{"id":107,"ts":7000,"name":"rocks","description":"box of assorted rocks","weight":5.300000190734863},"after":{"id":107,"ts":11000,"name":"rocks","description":"box of assorted rocks","weight":5.099999904632568},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1589362099000,"snapshot":"false","db":"inventory","table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":717,"row":0,"thread":2,"query":null},"op":"u","ts_ms":1589362099505,"transaction":null}
{"before":null,"after":{"id":110,"ts":12000,"name":"jacket","description":"water resistent white wind breaker","weight":0.20000000298023224},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1589362210000,"snapshot":"false","db":"inventory","table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":1068,"row":0,"thread":2,"query":null},"op":"c","ts_ms":1589362210230,"transaction":null}
{"before":null,"after":{"id":111,"ts":13000,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.179999828338623},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1589362243000,"snapshot":"false","db":"inventory","table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":1394,"row":0,"thread":2,"query":null},"op":"c","ts_ms":1589362243428,"transaction":null}
{"before":{"id":110,"ts":12000,"name":"jacket","description":"water resistent white wind breaker","weight":0.20000000298023224},"after":{"id":110,"ts":14000,"name":"jacket","description":"new water resistent white wind breaker","weight":0.5},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1589362293000,"snapshot":"false","db":"inventory","table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":1707,"row":0,"thread":2,"query":null},"op":"u","ts_ms":1589362293539,"transaction":null}
{"before":{"id":111,"ts":13000,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.179999828338623},"after":{"id":111,"ts":15000,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.170000076293945},"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1589362330000,"snapshot":"false","db":"inventory","table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":2090,"row":0,"thread":2,"query":null},"op":"u","ts_ms":1589362330904,"transaction":null}
{"before":{"id":111,"ts":16000,"name":"scooter","description":"Big 2-wheel scooter ","weight":5.170000076293945},"after":null,"source":{"version":"1.1.1.Final","connector":"mysql","name":"dbserver1","ts_ms":1589362344000,"snapshot":"false","db":"inventory","table":"products","server_id":223344,"gtid":null,"file":"mysql-bin.000003","pos":2443,"row":0,"thread":2,"query":null},"op":"d","ts_ms":1589362344455,"transaction":null}
转载请注明:文章转载自 www.051e.com
本文地址:http://www.051e.com/it/280693.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 ©2023-2025 051e.com

ICP备案号:京ICP备12030808号