大家好,欢迎来到IT知识分享网。
第一章 Spark性能调优
1.1 常规性能调优
1.1.1 常规性能调优一:最优资源配置
Spark 性能调优第一步就是为任务分配更多的资源,在一定范围内,增加资源的分配与性能的提升是成正比的,实现了最有资源分配后,在次基础上在考虑更进一步的调优策略
资源的分配在使用脚本提交Spark任务时进行指定,标准的Spark任务提交脚本如下所示:
bin/spark-submit \ --class com.atguigu.spark.Analysis \ --master yarn --deploy-mode cluster --num-executors 80 \ 配置Executor的数量 --driver-memory 6g \ 配置Driver内存 --executor-memory 6g \ 配置每个Executor的内存 --executor-cores 3 \ 配置每个Executor的CPU core数量 /usr/opt/modules/spark/jar/spark.jar \
调节原则:尽量将任务分配的资源调节到可以使用的资源最大限度。对于具体资源的分配,分别讨论Spark的两种Cluster运行模式:
- Spark Standalone模式,你在提交任务前,一定知道或者可以从运维部门获取到你可以使用的资源情况,在编写submit脚本的时候,就根据可用的资源情况进行资源的分配,比如说集群有15台机器,每台机器为8G内存,2个CPU core,那么就指定15个Executor,每个Executor分配8G内存,2个CPU core。
- Spark Yarn模式,由于Yarn使用资源队列进行资源的分配和调度,在编写submit脚本的时候,就根据Spark作业要提交到的资源队列,进行资源的分配,比如资源队列有400G内存,100个CPU core,那么指定50个Executor,每个Executor分配8G内存,2个CPU core。
补充:生产环境Spark submit脚本配置
bin/spark-submit \ --class com.atguigu.spark.WordCount \ --master yarn\ --deploy-mode cluster\ --num-executors 80 \ --driver-memory 6g \ --executor-memory 6g \ --executor-cores 3 \ --queue root.default \ --conf spark.yarn.executor.memoryOverhead=2048 \ --conf spark.core.connection.ack.wait.timeout=300 \ /usr/local/spark/spark.jar
1.1.2 常规性能调优二:RDD优化
- RDD复用
在对RDD进行算子时,要避免相同的算子和计算逻辑之下对RDD进行重复的计算 - RDD持久化
在Spark中,当多次对同一个RDD执行算子操作时,每一次都会对这个RDD以之前的父RDD重新计算一次,这种情况是必须要避免的,对同一个RDD的重复计算是对资源的极大浪费,因此,必须对多次使用的RDD进行持久化,通过持久化将公共RDD的数据缓存到内存/磁盘中,之后对于公共RDD的计算都会从内存/磁盘中直接获取RDD数据。
对于RDD的持久化,有两点需要说明:
- RDD的持久化是可以进行序列化的,当内存无法将RDD的数据完整的进行存放的时候,可以考虑使用序列化的方式减小数据体积,将数据完整存储在内存中。
- 如果对于数据的可靠性要求很高,并且内存充足,可以使用副本机制,对RDD数据进行持久化。当持久化启用了复本机制时,对于持久化的每个数据单元都存储一个副本,放在其他节点上面,由此实现数据的容错,一旦一个副本数据丢失,不需要重新计算,还可以使用另外一个副本。
- RDD尽可能早的filter操作
1.1.3 常规性能调优三:并行度调节
理想的并行度设置,应该是让并行度与资源相匹配,简单来说就是在资源允许的前提下,并行度要设置的尽可能大,达到可以充分利用集群资源。合理的设置并行度,可以提升整个Spark作业的性能和运行速度。
val conf = new SparkConf() .set("spark.default.parallelism", "500")
1.1.4 常规性能调优四:广播大变量
1.1.5 常规性能调优五:Kryo序列化
public class MyKryoRegistrator implements KryoRegistrator {
@Override public void registerClasses(Kryo kryo) {
kryo.register(StartupReportLogs.class); } }
配置Kryo序列化方式的实例代码:
//创建SparkConf对象 val conf = new SparkConf().setMaster(…).setAppName(…) //使用Kryo序列化库,如果要使用Java序列化库,需要把该行屏蔽掉 conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); //在Kryo序列化库中注册自定义的类集合,如果要使用Java序列化库,需要把该行屏蔽掉 conf.set("spark.kryo.registrator", "atguigu.com.MyKryoRegistrator");
1.1.6 常规性能调优六:调节本地化等待时长
1.2算子调优
1.2.1算子调优一 mapPartitions
普通的map算子对RDD中的每一个元素进行操作,而mapPartitions算子对RDD中每一个分区进行操作。如果是普通的map算子,假设一个partition有1万条数据,那么map算子中的function要执行1万次,也就是对每个元素进行操作。如果是mapPartition算子,由于一个task处理一个RDD的partition,那么一个task只会执行一次function,function一次接收所有的partition数据,效率比较高。mapPartitions算子适用于数据量不是特别大的时候,此时使用mapPartitions算子对性能的提升效果还是不错的。(当数据量很大的时候,一旦使用mapPartitions算子,就会直接OOM)
1.2.2 算子调优二:foreachPartition优化数据库操作
1.2.3 算子调优三:filter与coalesce的配合使用
- A > B(多数分区合并为少数分区)
- A与B相差值不大
此时使用coalesce即可,无需shuffle过程。 - A与B相差值很大
此时可以使用coalesce并且不启用shuffle过程,但是会导致合并过程性能低下,所以推荐设置coalesce的第二个参数为true,即启动shuffle过程。
- A < B(少数分区分解为多数分区)
此时使用repartition即可,如果使用coalesce需要将shuffle设置为true,否则coalesce无效。
我们可以在filter操作之后,使用coalesce算子针对每个partition的数据量各不相同的情况,压缩partition的数量,而且让每个partition的数据量尽量均匀紧凑,以便于后面的task进行计算操作,在某种程度上能够在一定程度上提升性能。
注意:local模式是进程内模拟集群运行,已经对并行度和分区数量有了一定的内部优化,因此不用去设置并行度和分区数量。
1.2.4 算子调优四:repartition解决SparkSQL低并行度问题
为了解决Spark SQL无法设置并行度和task数量的问题,我们可以使用repartition算子
1.2.5 算子调优五:reduceByKey预聚合
reduceByKey相较于普通的shuffle操作一个显著的特点就是会进行map端的本地聚合,map端会先对本地的数据进行combine操作,然后将数据写入给下个stage的每个task创建的文件中,也就是在map端,对每一个key对应的value,执行reduceByKey算子函数。
使用reduceByKey对性能的提升如下:
- 本地聚合后,在map端的数据量变少,减少了磁盘IO,也减少了对磁盘空间的占用;
- 本地聚合后,下一个stage拉取的数据量变少,减少了网络传输的数据量;
- 本地聚合后,在reduce端进行数据缓存的内存占用减少;
- 本地聚合后,在reduce端进行聚合的数据量减少。
基于reduceByKey的本地聚合特征,我们应该考虑使用reduceByKey代替其他的shuffle算子,例如groupByKey,,groupByKey不会进行map端的聚合,而是将所有map端的数据shuffle到reduce端,然后在reduce端进行数据的聚合操作。由于reduceByKey有map端聚合的特性,使得网络传输的数据量减小,因此效率要明显高于groupByKey。
1.3 Shuffle调优
1.3.1 Shuffle调优一:调节map端缓冲区大小
val conf = new SparkConf() .set("spark.shuffle.file.buffer", "64")
1.3.2 Shuffle调优二:调节reduce端拉取数据缓冲区大小
val conf = new SparkConf() .set("spark.reducer.maxSizeInFlight", "96")
1.3.3 Shuffle调优三:调节reduce端拉取数据重试次数
reduce端拉取数据重试次数可以通过spark.shuffle.io.maxRetries参数进行设置,该参数就代表了可以重试的最大次数。如果在指定次数之内拉取还是没有成功,就可能会导致作业执行失败,默认为3,该参数的设置方法如代码清单所示:
val conf = new SparkConf() .set("spark.shuffle.io.maxRetries", "6")
1.3.4 Shuffle调优四:调节reduce端拉取数据等待间隔
val conf = new SparkConf() .set("spark.shuffle.io.retryWait","60s")
1.3.5 Shuffle调优五:调节SortShuffle排序操作阈值
val conf = new SparkConf() .set("spark.shuffle.sort.bypassMergeThreshold", "400")
1.4 JVM 调优
对于JVM调优,首先应该明确,full gc/minor gc,都会导致JVM的工作线程停止工作,即stop the world。
1.4.1 JVM调优一:降低cache操作的内存占比
- 静态内存管理机制
根据Spark静态内存管理机制,堆内存被划分为了两块,Storage和Execution。Storage主要用于缓存RDD数据和broadcast数据,Execution主要用于缓存在shuffle过程中产生的中间数据,Storage占系统内存的60%,Execution占系统内存的20%,并且两者完全独立。
在一般情况下,Storage的内存都提供给了cache操作,但是如果在某些情况下cache操作内存不是很紧张,而task的算子中创建的对象很多,Execution内存又相对较小,这回导致频繁的minor gc,甚至于频繁的full gc,进而导致Spark频繁的停止工作,性能影响会很大。
在Spark UI中可以查看每个stage的运行情况,包括每个task的运行时间、gc时间等等,如果发现gc太频繁,时间太长,就可以考虑调节Storage的内存占比,让task执行算子函数式,有更多的内存可以使用。
Storage内存区域可以通过spark.storage.memoryFraction参数进行指定,默认为0.6,即60%,可以逐级向下递减,如代码清单所示:
val conf = new SparkConf() .set("spark.storage.memoryFraction", "0.4")
- 统一内存管理机制
根据Spark统一内存管理机制,堆内存被划分为了两块,Storage和Execution。Storage主要用于缓存数据,Execution主要用于缓存在shuffle过程中产生的中间数据,两者所组成的内存部分称为统一内存,Storage和Execution各占统一内存的50%,由于动态占用机制的实现,shuffle过程需要的内存过大时,会自动占用Storage的内存区域,因此无需手动进行调节。
1.4.2 JVM调优二:调节Executor堆外内存
1.4.3 JVM调优三:调节连接等待时长
第2章 Spark数据倾斜
- Spark作业的大部分task都执行迅速,只有有限的几个task执行的非常慢,此时可能出现了数据倾斜,作业可以运行,但是运行得非常慢;
- Spark作业的大部分task都执行迅速,但是有的task在运行过程中会突然报出OOM,反复执行几次都在某一个task报出OOM错误,此时可能出现了数据倾斜,作业无法正常运行。
定位数据倾斜问题: - 查阅代码中的shuffle算子,例如reduceByKey、countByKey、groupByKey、join等算子,根据代码逻辑判断此处是否会出现数据倾斜;
- 查看Spark作业的log文件,log文件对于错误的记录会精确到代码的某一行,可以根据异常定位到的代码位置来明确错误发生在第几个stage,对应的shuffle算子是哪一个;
2.1 解决方案一:聚合原数据
- 避免shuffle
绝大多数情况下,Spark作业的数据来源都是Hive表,这些Hive表基本都是经过ETL之后的昨天的数据。为了避免数据倾斜,我们可以考虑避免shuffle过程,如果避免了shuffle过程,那么从根本上就消除了发生数据倾斜问题的可能。 - 缩小key的粒度(增大数据倾斜可能性,降低每个task的数据量)
key的数量增加,可能使数据倾斜更严重。 - 增大key的粒度(减小数据倾斜可能性,增大每个task的数据量)
如果没有办法对每个key聚合出来一条数据,在特定场景下,可以考虑扩大key的聚合粒度。
2.2 解决方案二:过滤导致倾斜的key
如果在Spark作业中允许丢弃某些数据,那么可以考虑将可能导致数据倾斜的key进行过滤,滤除可能导致数据倾斜的key对应的数据,这样,在Spark作业中就不会发生数据倾斜了。
2.3 解决方案三:提高shuffle操作中的reduce并行度
当方案一和方案二对于数据倾斜的处理没有很好的效果时,可以考虑提高shuffle过程中的reduce端并行度,reduce端并行度的提高就增加了reduce端task的数量,那么每个task分配到的数据量就会相应减少,由此缓解数据倾斜问题。
2.4 解决方案四:使用随机key实现双重聚合
2.5 解决方案五:将reduce join转换为map join
不使用join算子进行连接操作,而使用Broadcast变量与map类算子实现join操作,进而完全规避掉shuffle类的操作,彻底避免数据倾斜的发生和出现。将较小RDD中的数据直接通过collect算子拉取到Driver端的内存中来,然后对其创建一个Broadcast变量;接着对另外一个RDD执行map类算子,在算子函数内,从Broadcast变量中获取较小RDD的全量数据,与当前RDD的每一条数据按照连接key进行比对,如果连接key相同的话,那么就将两个RDD的数据用你需要的方式连接起来。
2.6 解决方案六:sample采样对倾斜key单独进行join
2.7 解决方案七:使用随机数扩容进行join
免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://haidsoft.com/113572.html