当前位置 博文首页 > asd1358355022的博客:每日积累【Day2】SPARK调优 *建议收藏*

    asd1358355022的博客:每日积累【Day2】SPARK调优 *建议收藏*

    作者:[db:作者] 时间:2021-08-04 21:45

    Spark常规优化

    ? executor核心数量设置为Task的 1/3 或者 1/2,官方推荐Task数量为Spark设定的CPU cores的2 到 3倍

    ? RDD优化:当多次对一个RDD进性多次计算时,都需要对这个RDD的父RDD重写进行计算时,可以为这个父RDD进性持久化,意思是对多次使用的RDD进性持久化,可以持久化到内存或者磁盘。如果一个RDD b前面的RDD a经过了非常复杂的shuffle过程,此时也将这个RDD进行持久化。当内存不够持久化时可使用序列化,如果内存充足,可以使用副本并将副本放到其他节点上存储完成容错。

    ? RDD尽早地filter。(比如全连接之前进行过滤)

    ? 广播大变量:将广播变量发送到每一个Executor当中,此Executor中所有Task共享这个变量,只读变量,能修改,但是不影响其他Task。节省了IO和内存占用。也可以使用Redis进行存储,至少能保证任务的正常运行。

    ? 使用Kryo序列化:此序列化为java序列化性能提升10倍,从java和kryo序列化后的之后文件来看,大小差了10倍左右。(实现KryoRegistrator,在此类当中重写方法并将所需要序列化的类注册进去)。在spark2.0之后,简单的类型、字符串类型的Shufflings RDDs默认使用这种模式。

    在这里插入图片描述

    ?

    ? 调节本地化等待的时长:Spark作业提交之后,Driver会对每一个Stage的Task进性分配, Spark的task分配算法优先希望每个task正好分配到它要计算数据所在的节点,这样的话,这样就可以避免数据的网络传输。通常来说,task可能不会被分配到它处理的数据所在的节点,因为这些节点可用的资源可能已经用尽,此时,Spark会等待一段时间,默认3s,如果等待指定时间后仍然无法在指定节点运行,那么会自动降级,比如将task分配到离它要计算的数据比较近的一个节点,然后进行计算,如果当前级别仍然不行,那么继续降级。当task要处理的数据不在task所在节点上时,会发生数据的传输。task会通过所在节点的BlockManager获取数据,BlockManager发现数据不在本地时,户通过网络传输组件从数据所在节点的BlockManager处获取数据。网络传输数据的情况是我们不愿意看到的,大量的网络传输会严重影响性能,因此,我们希望通过调节本地化等待时长,如果在等待时长这段时间内,目标节点处理完成了一部分task,那么当前的task将有机会得到执行,这样就能够改善Spark作业的整体性能。

    val conf = new SparkConf().set("spark.locality.wait", "6")  //将时长设置为6~10s这样子   
    

    Spark算子优化

    ? 使用Mappartition 替换Map : 函数执行频率

    ? 使用ForeachParition 替换 Foreach :函数执行频率

    ? 使用ReduceByKey/aggregateByKey替换GroupByKey :map-side

    ? 使用filter之后进行coalesce操作(减少分区数,一般使用于规模较大的RDD过滤后减少分区数,提高资源的利用率;) : filter后对分区进行压缩

    ? 使用repartitionAndSortWithinPartitions(重新分区,并对分区内的数据进行一个局部排序)替代repartition与sort类操作 :是Spark官网推荐的一个算子,官方建议,如果需要在repartition重分区之后,还要进行排序,建议直接使用repartitionAndSortWithinPartitions算子

    Spark程序优化

    ? 尽可能避免创建重复的RDD

    ? 尽可能复用一个RDD,对常用的RDD进行持久化。

    Spark数据倾斜优化

    ? Reduce side join 变为 map side join :适用于大RDD和小RDDjoin,可以采用广播小RDD全量数据+map算子来实现与join同样的效果,也就是map join

    ? 使用局部聚合算和全局聚合两个阶段:前一个阶段将数据分部分加入不同的随机数(比如分区数量的随机数+_)进行reduce,之后进行分区去掉前缀然后分区内合并进行全局汇总

    ? 过滤少数倾斜Key:使用随机取样调查数量剧增的Key,过滤掉少量的倾斜Key

    ? 通过在Hive中对倾斜的数据进行预处理,以及在进行kafka数据分发时尽量进行平均分配:治标不治本,Hive或者Kafka中还是会发生数据倾斜。

    ? 调整并行度:只是缓解了数据倾斜而已,没有彻底根除问题,效果非常有限的。

    ? 自定义Partitioner:大量不同的Key被分配到了相同的Task造成该Task数据量过大。使用自定义的Partitioner实现类代替默认的HashPartitioner,尽量将所有不同的Key均匀分配到不同的Task中。对于同一Key对应数据集非常大的场景不适用。

    r,尽量将所有不同的Key均匀分配到不同的Task中。对于同一Key对应数据集非常大的场景不适用。

    cs