总结:Flink

总结:FlinkFlink 梳理 flink

大家好,欢迎来到IT知识分享网。

一、为什么要使用分布式计算框架?

1、计算能力

对于不涉及到IO的计算,分布式计算相当于多个人计算,如10台计机器计算速度是1台机器计算速度的10倍。而分布式计算框架能充分发挥分布式计算优势。

2、丰富的API

3、高可用,故障恢复,易扩展

二、一些常见问题记录

1、滑动窗口

  • 主要解决数据不连续的问题,比如数据访问少,导致某个时间段内其实没有数据,需要滑动大一点的时间来找数据,一般数据正常的话用翻转窗口

2、关于kafka消息数据的问题

  • 如果是用的新的消费者组,消费的是最新的数据,老的不消费
  • 如果是老的组,挂了之后一段时间重新启动,会消费之前没有的数据,这也涉及到到底用处理时间还是事件时间,因为如果用处理时间,假设计算次数的时候就会将次数算到当前时间,但是用事件时间就不会

3、时间时间与处理时间

  • 一般用处理时间:比较简单,性能高
  • 事件时间的话需要考虑的点比较多,比如数据乱序,或者有未来数据,当flink先接收到未来的数据,之前的数据就会丢掉了,因此一般事件时间的话前面还得加个Filter把未来数据过滤掉
    • 事件时间得要水印
    • 水印前得加个过滤器过滤未来时间的

4、union的作用

  • 和mysql的概念几乎一致,就是将相同字段的可以放到一起,目的是用一个sink就可以处理了。
  • 如果不用union,就得写多个sink
  • 关于不同的计算窗口:
    • union不会等窗口都结束,而是来一条数据就union一条,然后就到下个sink去处理了,主要是为了少写sink
  • join和union的区别
    • union:把多个流管道放到一个管道中
    • join:类似于mysql的join,多个流之间进行关联计算用的,如相同的key进行除法运算。

5、聚合计算

  • 一般一个聚合计算就是一个算子,用一个sink处理(当然也可以union后用同一个sink)
    • 我当时的问题是相同token和uri的count和haoshi放不到一行,于是和同事沟通知道了自定义聚合
  • 但是如果我在一条数据中即想计算次数,又想计算耗时,可以自己写个自定义聚合计算类
    • / * 自定义聚合函数 * 目的是将访问次数与平均耗时放到mysql记录的同一行 * @author ww * */ public class CommonAverageAggregate implements AggregateFunction<TokenApiAccessEvent, TokenApiAccessEvent, TokenApiAccessEvent> { private static final long serialVersionUID = L; public TokenApiAccessEvent createAccumulator() { return new TokenApiAccessEvent(); } / * 因为flink任务可能是多个task,也就会涉及多个分区,得用merge */ public TokenApiAccessEvent merge(TokenApiAccessEvent a, TokenApiAccessEvent b) { a.setCount(b.getCount() + a.getCount()); a.setValue(b.getValue() + a.getValue()); return a; } / * 和merge计算方式一样,只是add是单个分区的 */ public TokenApiAccessEvent add(TokenApiAccessEvent value, TokenApiAccessEvent acc) { value.setValue(value.getValue() + acc.getValue()); value.setCount(value.getCount() + acc.getCount()); return value; } public TokenApiAccessEvent getResult(TokenApiAccessEvent acc) { float valueF = acc.getValue() / acc.getCount(); int value = (int) valueF; acc.setValue(value); return acc; } }

三、Flink优秀设计理念之强一致性(灾备)

Flink是如何做到在Checkpoint恢复过程中没有任何数据的丢失和数据的冗余?来保证精准计算的?

这个设计理念非常好,很适合做灾备。每个拓扑节点会生成一次快照,故障时候选择最新快照即可。比如拓扑节点A,B,C,在B拓扑节点出故障(B还未来得及生成快照),则在机器重启后取A节点处理即可。

这其中原因是Flink利用了一套非常经典的Chandy-Lamport算法,它的核心思想是把这个流计算看成一个流式的拓扑,定期从这个拓扑的头部Source点开始插入特殊的Barries,从上游开始不断的向下游广播这个Barries。每一个节点收到所有的Barries,会将State做一次Snapshot,当每个节点都做完Snapshot之后,整个拓扑就算完整的做完了一次Checkpoint。接下来不管出现任何故障,都会从最近的Checkpoint进行恢复。

三、API

1、Filter与Map

Filter:主要作用是数据筛选,其返回类型为boolean,当为true的时候,进入后续处理环节,反之丢弃数据;

Map:主要是对数据进行映射处理

2、keyBy

分流,如keyBy(“id”)会把id相同的分到一起

3、timeWindow

窗口操作,类似于小型批处理

总结:Flink

4、并行度设置

4.1、全局配置:

总结:Flink

4.2、以上的队列,其实就是资源池,相当于是我们申请的配额,即CPU多少核,内存多少GB等

4.3、代码设置并行度:

总结:Flink

4.4、并行度

如上设置的并行度不能大于全局设置的并行度

默认的并行度 =  TaskManager Slot个数  *  TaskManager数量

如上,全局设置的并行度 = 1 * 100 = 100,此时代码中设置的并行度最大不能超过100,超过则会报错!!!,如果某一个操作是希望多一些线程,则不要设置即可!!!不设置默认占满所有slot

所以,代码中setParallelism一般目的某个操作可能不需要那么多并行度,从而降低并行度

如本配置总并行度是100,而我的操作可能2就够了,不希望占满slot,则可以设置并行度为2。

4.5、测试本地执行,并行度是多少?

  • 和晓宇沟通,Flink应该是根据本机的核数判断的

4.6、如果代码中指定了很高的并行度,比如150,本地启动会报错吗?

  • 和晓宇沟通,不会报错,只是达不到那么多的并行度而已,可以本地启动测试

四、架构

1、经典的主从架构

Flink分布式程序包含2个主要的进程:JobManagerTaskManager

当程序运行时,不同的进程就会参与其中,包括Jobmanager、TaskManager和JobClient。

JobManager为Master,TaskManager为Slave。

总结:Flink

当 Flink 集群启动后,首先会启动一个 JobManger 和一个或多个的 TaskManager。

  • Client 提交任务给 JobManager
  • JobManager 再调度任务到各个 TaskManager 去执行
  • 然后 TaskManager 将心跳和统计信息汇报给 JobManager

TaskManager 之间以流的形式进行数据的传输。上述三者均为独立的 JVM 进程

2、角色关联

JobManager

Master进程,负责Job的管理和资源的协调。包括任务调度,检查点管理,失败恢复等。

当然,对于集群HA模式,可以同时多个master进程,其中一个作为leader,其他作为standby。当leader失败时,会选出一个standby的master作为新的leader(通过zookeeper实现leader选举)。

检查点

Flink的检查点机制是保证其一致性容错功能的骨架。它持续的为分布式的数据流和有状态的operator生成一致性的快照。Flink的容错机制持续的构建轻量级的分布式快照,因此负载非常低。通常这些有状态的快照都被放在HDFS中存储(state backend)。程序一旦失败,Flink将停止executor并从最近的完成了的检查点开始恢复(依赖可重发的数据源+快照)。

TaskManager

一个TaskManager表示一个进程,一台机器有可能跑多个进程(不同端口);

TaskManager就是真正干活的JVM进程。

Task Slot

进程中的线程,一个TaskManager可以有多个Task Slot,TaskManager会将其管理的内存平均分给各个 slot。

Slot是TaskManager中的资源分配单元,每个Slot可以执行一个子任务。一个TaskManager可以有多个Slot,这意味着一个TaskManager可以同时执行多个子任务。

总结:Flink  总结:Flink

3、运行架构

在这里插入图片描述
参考:
Flink 原理与实现:数据流上的类型和操作:http://wuchong.me/blog/2016/05/20/flink-internals-streams-and-operations-on-streams
Flink Stream 算子:https://flink.sojb.cn/dev/stream/operators


五、思考Flink的价值

看下一下通过yarn申请的资源,TaskManager是100个(和同事确认表示100核,即100核CPU),每个taskManager的内存是2G,总的算起来,这个任务占用的总资源是:100核CPU,200G内存。可见占用的资源量是很大的,所以,如果不用flink,我们自己用多台机器跑这个任务不也能达到效果吗?

话虽这么说,但是有几个问题要考虑:

1、多台机器的话,每台机器的资源利用率高吗?(一般不高,导致资源浪费)

2、flink提供了很多遍历的数据处理方法,这个也需要自己写。

3、扩容困难

所以,总结下来,我觉得Flink的价值主要是

1、计算资源利用率高;

2、任务处理并行化,如map操作,filter操作,sink操作,各个操作之间都并行处理;

3、丰富的API,各种基于数据流的处理操作,基于有界数据的批处理操作,各种封装好的处理类,如kafka消费类;

4、高可用,故障恢复,易扩展

六、Flink的流处理与批处理

批处理案例:DataSet,基于有界数据,如读取文件

package com.aii.bi.examples.batch; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.util.Collector; public class WordCount { public static void main(String[] args) throws Exception { // 初始化运行环境 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); // 加载数据源 DataSet<String> text = env.readTextFile("E:/opt/word.txt"); // 数据转换 DataSet<Tuple2<String, Integer>> counts = text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { private static final long serialVersionUID = 1L; @Override public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception { String[] tokens = value.split(" "); for (String token : tokens) { if (token.length() > 0) { out.collect(new Tuple2<>(token, 1)); } } } }).groupBy(0).sum(1); //sink data counts.print(); } }

流处理案例:DataStream,基于

免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://haidsoft.com/110249.html

(0)
上一篇 2026-02-01 20:26
下一篇 2026-02-01 20:45

相关推荐

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注微信