
高可用HA(High Availability)是分布式系统架构设计中必须考虑的因素之一,它通常是指,通过设计减少系统不能提供服务的时间。
2.高可用性计算指标假设系统一直能够提供服务,我们说系统的可用性是100%。
如果系统每运行100个时间单位,会有1个时间单位无法提供服务,我们说系统的可用性是99%。
很多公司的高可用目标是4个9,也就是99.99%,这就意味着,系统的年停机时间为8.76个小时。
(1)Flink on Yarn定义
https://blog.csdn.net/yulei_qq/article/details/87803854
在运行高可用性YARN群集时,我们不会运行多个JobManager(ApplicationMaster)实例,而只会运行一个,由YARN在失败时重新启动。具体操作会根据yarn的版本不同而不同
(2)Flink on Yarn的优势
相对于 Standalone 模式,在Yarn 模式下有以下几点好处:
1.资源按需使用,提高集群的资源利用率;
2.任务有优先级,根据优先级运行作业;
3.基于 Yarn 调度系统,能够自动化地处理各个角色的 Failover:
JobManager 进程和 TaskManager 进程都由 Yarn NodeManager 监控;
如果 JobManager 进程异常退出,则 Yarn ResourceManager 会重新调度 JobManager 到其他机器;
如果 TaskManager 进程异常退出,JobManager 会收到消息并重新向 Yarn ResourceManager 申请资源,重新启动 TaskManager。
(3) Per-Job模式
https://blog.csdn.net/h952520296/article/details/114318407
https://juejin.cn/post/6954227258655506440
一致性就是数据保持一致,在分布式系统中,可以理解为多个节点中数据的值是一致的。 同时,一致性也是指事务的基本特征或特性相同,其他特性或特征相类似 。
详细解释:https://cloud.tencent.com/developer/article/1520172
(1)什么是状态一致性
(2)分类
(3)flink实现状态一致性(exactly-once)
- 内部保证——分布式快照(checkpoint)
https://zhuanlan.zhihu.com/p/266620519
1. 定义
分布式中的可靠性可理解为容错性,checkpoint机制就是flink可靠性的基石。 Flink采用基于 checkpoint 的分布式快照机制,能够保证作业出现 fail-over 后可以从最新的快照进行恢复,即分布式快照机制可以保证 Flink 系统内部的“精确一次”处理。
Flink 分布式快照的核心元素包括:
Barrier(数据栅栏):可以把 Barrier 简单地理解成一个标记,该标记是严格有序的,并且随着数据流往下流动。每个 Barrier 都带有自己的 ID,Barrier 极其轻量,并不会干扰正常的数据处理。
异步:每次在把快照存储到我们的状态后端时,如果是同步进行就会阻塞正常任务,从而引入延迟。因此 Flink 在做快照存储时,采用异步方式
增量:由于 checkpoint 是一个全局状态,用户保存的状态可能非常大,多数达 G 或者 T 级别,checkpoint 的创建会非常慢,而且执行时占用的资源也比较多,因此 Flink 提出了增量快照的概念。也就是说,每次进行的全量 checkpoint,是基于上次进行更新的。
2.流程
https://www.jianshu.com/p/4d31d6cddc99
**- source端一致性 **
以下内容适用于 Flink 1.4 及之后版本
kafka consumer 作为 source,可以将偏移量保存下来,当发生故障时可以从发生故障前的偏移量重新消费数据,从而保证一致性。
flink kafka consumer :https://segmentfault.com/a/1190000023547691
- sink端一致性——两阶段提交
Sink 端是最复杂的,因为数据是落地到其他系统上的,数据一旦离开 Flink 之后,Flink 就监控不到这些数据了,所以精准一次处理语义必须也要应用于 Flink 写入数据的外部系统,故这些外部系统必须提供一种手段允许提交或回滚这些写入操作,同时还要保证与 Flink Checkpoint 能够协调使用(Kafka 0.11 版本已经实现精确一次处理语义)。
我们以 Flink 与 Kafka 组合为例,Flink 从 Kafka 中读数据,处理完的数据在写入 Kafka 中。
为什么以Kafka为例,第一个原因是目前大多数的 Flink 系统读写数据都是与 Kafka 系统进行的。第二个原因,也是最重要的原因 Kafka 0.11 版本正式发布了对于事务的支持,这是与Kafka交互的Flink应用要实现端到端精准一次语义的必要条件。
kafka producer 作为 sink,采用两阶段提交 sink,需要实现一个 TwoPhaseCOmmitSinkFunction
https://www.hnbian.cn/posts/6adf75db.html
1.sink收到第一条数据之后,开启一个 kafka 的事务( transaction),这时把数据写入kafka 标记为未提交, 这就是“预提交”;
2. jobmanager 触发 checkpoint 操作,barrier 从 source 开始向下传递,遇到barrier 的算子将状态存入状态后端,并通知 jobmanager
3. sink 连接器收到 barrier,保存当前状态,存入 checkpoint,通知jobmanager,并开启下一阶段的事务,用于提交下个检查点的数据
4. jobmanager 收到所有任务的通知,发出确认信息,表示 checkpoint 完成
5. sink 任务收到 jobmanager 的确认信息,正式提交这段时间的数据
6. 外部 kafka 关闭事务,提交的数据可以正常消费了。
kafka事务性:https://zhuanlan.zhihu.com/p/120796378
https://guosmilesmile.github.io/2020/09/07/FlinkKafkaConsumer%E9%87%8D%E5%A4%8D%E6%B6%88%E8%B4%B9%E6%95%B0%E6%8D%AE%E9%97%AE%E9%A2%98/
四、数据不丢失https://support.huaweicloud.com/intl/zh-cn/dli_faq/dli_03_0096.html
五、flink实现有序性——watermarkhttps://cloud.tencent.com/developer/news/841119
https://blog.csdn.net/qq_39657909/article/details/106081543
1. 定义
Watermark 是在 Source function 处或之后立即生成的。Source function 的每个并行子任务通常独立地生成 Watermark。这些 Watermark 定义了该特定并行源的事件时间。
事件时间小于 t 但是晚于 Watermark(t) 到达。实际运行过程中,事件可能被延迟任意的时间,所以不可能指定一个时间,保证该时间之前的所有事件都被处理了。而且,即使延时时间是有界限的,过多的延迟的时间也是不理想的,会造成时间窗口处理的太多延时。
系统允许设置一个可容忍的延迟时间,在距离 t 的时间在可容忍的延迟时间内,可以继续处理数据,否则丢弃。
2. Kafka Connector 应用 WatermarkStrategy
每个 Kafka 分区一个时间戳
当使用 Kafka 作为数据源的时候,每个分区可能有一个简单的事件时间模式(按时间戳升序或其他)。当消费来自 Kafka 的流数据时,多个分区一般会并行消费。分区中的事件交替消费,会破坏分区中的模式。
在这种情况下,可以使用 Flink 的 Kafka-partition-aware(分区感知)watermark 生成器。使用这个特性的时候,watermark 会在 Kafka 消费者内部为每个分区生成,并且每个分区 watermark 的合并方式与在流进行 shuffle 时合并的方式相同。
3.窗口
http://wuchong.me/blog/2016/05/25/flink-internals-window-mechanism/
parallelism 是指 taskmanager 实际使用的并发能力。在 Flink 里面代表每个任务的并行度,适当的提高并行度可以大大提高 job 的执行效率,比如你的 job 消费 kafka 数据过慢,适当调大可能就消费正常了。
parallelism.default:1参数可以配置算子的并行度,没有指定的话,默认为1。不同算子可以单独设置并行度。
https://www.jianshu.com/p/b58988bcfb48
slot 是指 taskmanager 可提供的并发执行能力。 taskmanager.numberOfTaskSlots:3参数可以配置每个taskmanager的slot数
Task Manager 是从 Job Manager 处接收需要部署的 Task,任务的并行性由每个 Task Manager 上可用的 slot 决定。每个 Flink TaskManager 在集群中提供 slot。 slot 的数量通常与每个 TaskManager 的可用 CPU 内核数成比例。一般情况下slot 数是你每个 TaskManager 的 cpu 的核数。
如果 Task Manager 有四个 slot,那么它将为每个 slot 分配 25% 的内存。 可以在一个 slot 中运行一个或多个线程。 同一 slot 中的线程共享相同的 JVM。 同一 JVM 中的任务共享 TCP 连接和心跳消息。Task Manager 的一个 Slot 代表一个可用线程,该线程具有固定的内存,注意 Slot 只对内存隔离,没有对 CPU 隔离。默认情况下,Flink 允许子任务共享 Slot,即使它们是不同 task 的 subtask,只要它们来自相同的 job。这种共享可以有更好的资源利用率。
上面图片中有两个 Task Manager,每个 Task Manager 有三个 slot,这样我们的算子最大并行度那么就可以达到 6 个,在同一个 slot 里面可以执行 1 至多个子任务。因此,上图中source/map/keyby/window/apply 最大可以有 6 个并行度,sink 只用了 1 个并行。
· 在flink的配置文件中flink-conf.yaml,默认的并行度为1;
· 在以shell的方式提交flink job的时候,可以使用-p指定程序的并行度;
./bin/flink run -p 10 ../word-count.jar
· 在flink job程序内设置并行度(这样设置的是所有算子的并行度,不能算子粒度设置)
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(10);
· 每个算子指定并行度 并行度设置优先级是:算子设置并行度 > env 设置并行度 > 配置文件默认并行度
data.keyBy(new xxxKey())
.flatMap(new XxxFlatMapFunction()).setParallelism(5)
.map(new XxxMapFunction).setParallelism(5)
.addSink(new XxxSink()).setParallelism(1)
3.parallelism设置的注意事项
https://cloud.tencent.com/developer/article/1613761
Apache Flink的并行度设置并不是说越大越好、数据处理的效率就越高。而是需要设置合理的并行度。那么何谓合理呢?
Apache Flink的 并行度取决于每个TaskManager上的slot数量而决定的。Flink的JobManager把任务分成子任务提交给slot进行执行。相同的slot共享相同的JVM资源,同时对Flink提供维护的心跳等信息。
slot是指TaskManagere的并发执行能力,通常来说TaskManager有多少核CPU也就会有多少个slot。这样来看,我们设置的并行度其实是与TaskManager所有Slot数量有关的。
https://www.pianshen.com/article/67611720660/
1.flink的有状态计算程序计算过程中,在程序内部产生的中间结果,并提供给后续的算子。每个模块把自己的结果传递给下面的Task,也就是状态计算。
2.flink的state划分https://zhuanlan.zhihu.com/p/104171679
实现方法有:ValueState,ListState,ReducingState,AggregatingState,MapState。
· Operator State:与并行的算子实例绑定。并且在并行度发生变化的时候(划分一个State),能够自动重新分配状态数据。Operator State可以用在所有算子上,每个算子子任务或者说每个算子实例共享一个状态,流入这个算子子任务的数据可以访问和更新这个状态。下图展示了Operator State,算子子任务1上的所有数据可以共享第一个Operator State,以此类推,每个算子子任务上的数据共享自己的状态。
实现方法有:ListState,BroadcastState。
https://www.cnblogs.com/029zz010buct/p/9403283.html
https://bbs.huaweicloud.com/blogs/310613