spark oom(out of memory)问题

https://blog.csdn.net/yhb315279058/article/details/51035631

https://www.cnblogs.com/yanshw/p/11988347.html

1 driver内存不够

增加 Driver 内存

1
--driver-memory MEM         Memory for driver (e.g. 1000M, 2G) (Default: 1024M). 

1 读入数据太大

解决思路是增加 Driver 内存

1
2
3
4
5
from pyspark import SparkContext
sc = SparkContext(master='yarn')
rdd = sc.parallelize(range(300000000))
# spark-submit --master yarn-client --driver-memory 512M driver_oom.py 内存溢出
# spark-submit --master yarn-client --driver-memory 3G driver_oom.py 可以执行

2 数据回传太大,也就是聚合到driver的数据太大

解决思路是分区输出,具体做法是 foreach

1
2
3
4
5
rdd = sc.parallelize(range(100))
rdd.flatMap(lambda x: ['%d'%x*50 for _ in range(100000)]).collect() # 内存溢出

def func(x): print(x)
rdd.flatMap(lambda x: ['%d'%x*50 for _ in range(100000)]).foreach(func) # 分区输出

2 excutor内存不够

通用的解决办法就是增加 Executor 内存 但这并不一定是最好的办法

1 map 过程产生大量对象

解决思路是减少每个 task 的大小,从而减少每个 task 的输出

具体做法是在 会产生大量对象的 map 操作前 添加 repartition(重新分区) 方法,分区成更小的块传入 map

1
2
3
4
rdd.flatMap(lambda x: ['%d'%x*50 for _ in range(100000000)]).count()      # 100 * 100000000 个对象,内存溢出
rdd.flatMap(lambda x: len(['%d'%x*50 for _ in range(100000000)])).sum() # 内存溢出

rdd.repartition(1000000).flatMap(lambda x: ['%d'%x*50 for _ in range(100000000)]).count()

2 shuffle导致

shuffle内存溢出的情况可以说都是shuffle后发生数据倾斜,单个文件过大导致

参考数据倾斜解决方案

spark资源参数调优

https://www.cnblogs.com/gtscool/p/13072051.html

https://blog.csdn.net/l1394049664/article/details/81811642

https://tech.meituan.com/2016/04/29/spark-tuning-basic.html

性能

task

设置分区数

  • 参数说明:该参数用于设置每个stage的默认task数量。这个参数极为重要,如果不设置可能会直接影响你的Spark作业性能。
  • 参数调优建议:Spark作业的默认task数量为500~1000个较为合适。很多同学常犯的一个错误就是不去设置这个参数,那么此时就会导致Spark自己根据底层HDFS的block数量来设置task的数量,默认是一个HDFS block对应一个task。通常来说,Spark默认设置的数量是偏少的(比如就几十个task),如果task数量偏少的话,就会导致你前面设置好的Executor的参数都前功尽弃。试想一下,无论你的Executor进程有多少个,内存和CPU有多大,但是task只有1个或者10个,那么90%的Executor进程可能根本就没有task执行,也就是白白浪费了资源!因此Spark官网建议的设置原则是,设置该参数为num-executors * executor-cores的2~3倍较为合适,比如Executor的总CPU core数量为300个,那么设置1000个task是可以的,此时可以充分地利用Spark集群的资源。

worker

几个执行节点 ,一个worker 一般对应1个 executor

executor

num-executors

  • 参数说明:该参数用于设置Spark作业总共要用多少个Executor进程来执行。Driver在向YARN集群管理器申请资源时,YARN集群管理器会尽可能按照你的设置来在集群的各个工作节点上,启动相应数量的Executor进程。这个参数非常之重要,如果不设置的话,默认只会给你启动少量的Executor进程,此时你的Spark作业的运行速度是非常慢的。
  • 参数调优建议:每个Spark作业的运行一般设置50~100个左右的Executor进程比较合适,设置太少或太多的Executor进程都不好。设置的太少,无法充分利用集群资源;设置的太多的话,大部分队列可能无法给予充分的资源。

executor-cores

  • 参数说明:该参数用于设置每个Executor进程的CPU core数量。这个参数决定了每个Executor进程并行执行task线程的能力。因为每个CPU core同一时间只能执行一个task线程,因此每个Executor进程的CPU core数量越多,越能够快速地执行完分配给自己的所有task线程。
  • 参数调优建议:Executor的CPU core数量设置为2~4个较为合适。同样得根据不同部门的资源队列来定,可以看看自己的资源队列的最大CPU core限制是多少,再依据设置的Executor数量,来决定每个Executor进程可以分配到几个CPU core。同样建议,如果是跟他人共享这个队列,那么num-executors * executor-cores不要超过队列总CPU core的1/3~1/2左右比较合适,也是避免影响其他同学的作业运行。

内存

driver

driver-memory

  • 参数说明:该参数用于设置Driver进程的内存
  • 参数调优建议:Driver的内存通常来说不设置,或者设置1G左右应该就够了。唯一需要注意的一点是,如果需要使用collect算子将RDD的数据全部拉取到Driver上进行处理,那么必须确保Driver的内存足够大,否则会出现OOM内存溢出的问题。

executor

Executor的内存主要分为三块:第一块是让task执行我们自己编写的代码时使用,默认是占Executor总内存的20%;第二块是让task通过shuffle过程拉取了上一个stage的task的输出后,进行聚合等操作时使用,默认也是占Executor总内存的20%;第三块是让RDD持久化时使用,默认占Executor总内存的60%。

executor-memory

  • 参数说明:该参数用于设置每个Executor进程的内存。Executor内存的大小,很多时候直接决定了Spark作业的性能,而且跟常见的JVM OOM异常,也有直接的关联。
  • 参数调优建议:每个Executor进程的内存设置4G~8G较为合适。但是这只是一个参考值,具体的设置还是得根据不同部门的资源队列来定。可以看看自己团队的资源队列的最大内存限制是多少,num-executors乘以executor-memory,是不能超过队列的最大内存量的。此外,如果你是跟团队里其他人共享这个资源队列,那么申请的内存量最好不要超过资源队列最大总内存的1/3~1/2,避免你自己的Spark作业占用了队列所有的资源,导致别的同学的作业无法运行。

spark.shuffle.memoryFraction

  • 参数说明:该参数用于设置shuffle过程中一个task拉取到上个stage的task的输出后,进行聚合操作时能够使用的Executor内存的比例,默认是0.2。也就是说,Executor默认只有20%的内存用来进行该操作。shuffle操作在进行聚合时,如果发现使用的内存超出了这个20%的限制,那么多余的数据就会溢写到磁盘文件中去,此时就会极大地降低性能。
  • 参数调优建议:如果Spark作业中的RDD持久化操作较少,shuffle操作较多时,建议降低持久化操作的内存占比,提高shuffle操作的内存占比比例,避免shuffle过程中数据过多时内存不够用,必须溢写到磁盘上,降低了性能。此外,如果发现作业由于频繁的gc导致运行缓慢,意味着task执行用户代码的内存不够用,那么同样建议调低这个参数的值。

spark.storage.memoryFraction

  • 参数说明:该参数用于设置RDD持久化数据在Executor内存中能占的比例,默认是0.6。也就是说,默认Executor 60%的内存,可以用来保存持久化的RDD数据。根据你选择的不同的持久化策略,如果内存不够时,可能数据就不会持久化,或者数据会写入磁盘。
  • 参数调优建议:如果Spark作业中,有较多的RDD持久化操作,该参数的值可以适当提高一些,保证持久化的数据能够容纳在内存中。避免内存不够缓存所有的数据,导致数据只能写入磁盘中,降低了性能。但是如果Spark作业中的shuffle类操作比较多,而持久化操作比较少,那么这个参数的值适当降低一些比较合适。此外,如果发现作业由于频繁的gc导致运行缓慢(通过spark web ui可以观察到作业的gc耗时),意味着task执行用户代码的内存不够用,那么同样建议调低这个参数的值。

资源参数参考示例

以下是一份spark-submit命令的示例,大家可以参考一下,并根据自己的实际情况进行调节:

1
2
3
4
5
6
7
8
9
./bin/spark-submit \
--master yarn-cluster \
--num-executors 100 \
--executor-memory 6G \
--executor-cores 4 \
--driver-memory 1G \
--conf spark.default.parallelism=1000 \
--conf spark.storage.memoryFraction=0.5 \
--conf spark.shuffle.memoryFraction=0.3 \

持久化

https://cloud.tencent.com/developer/article/1760389

https://blog.csdn.net/dudadudadd/article/details/114102341

https://yiqingqing.blog.csdn.net/article/details/121772325

https://blog.csdn.net/feizuiku0116/article/details/122839247

https://blog.csdn.net/CyAurora/article/details/119654676

https://www.cnblogs.com/Transkai/p/11347224.html

https://blog.csdn.net/CyAurora/article/details/119654676

https://blog.csdn.net/dudadudadd/article/details/114102341

1 缓存

懒执行

空间换时间

rdd3如果不消失,那么绿色链路就不用执行两次

持久化的目标就是将rdd3保存到内存或者磁盘

但是有丢失风险,比如硬盘损坏,内存被清理等,所以为了规避风险,会保留rdd的血缘(依赖)关系

如何保存:

1 persist

2 cache

https://blog.csdn.net/donger__chen/article/details/86366339

底层调用persist,persist的特殊情况,persist(MEMORY_ONLY)

2 checkpoint

特殊的持久化

仅支持硬盘

设计上认为安全没有风险,所以不需要保留血缘关系

如何保存:

3 对比

shuffle

https://www.cnblogs.com/arachis/p/Spark_Shuffle.html

https://zhuanlan.zhihu.com/p/70331869

https://www.educba.com/spark-shuffle/

https://lmrzero.blog.csdn.net/article/details/106015264?spm=1001.2014.3001.5502

https://blog.csdn.net/zp17834994071/article/details/107887292

https://zhuanlan.zhihu.com/p/431015932

0 shuffle是什么,什么时候shuffle

what:多个partition的数据流向一个partition

when:宽依赖会有shuffle

shuffle分为两个阶段:shuffle read , shuffle write

map端-》shuffle read-》 shuffle write-》reduce端

1 mapreduce shuffle

2 spark shuffle

1 简介

2 分类

https://www.51cto.com/article/703950.html#

过去hash shuffle ,现在sort shuffle

1.Hash Shuffle

2.Sort Shuffle

1 普通机制的SortShuffleManager

2 bypass

此时task会为每个reduce端的task都创建一个临时磁盘文件,并将数据按key进行hash,然后根据key的hash值,将key写入对应的磁盘文件之中。当然,写入磁盘文件时也是先写入内存缓冲,缓冲写满之后再溢写到磁盘文件的。最后,同样会将所有临时磁盘文件都合并成一个磁盘文件,并创建一个单独的索引文件。
该过程的磁盘写机制其实跟未经优化的HashShuffleManager是一模一样的,因为都要创建数量惊人的磁盘文件,只是在最后会做一个磁盘文件的合并而已。因此少量的最终磁盘文件,也让该机制相对未经优化的HashShuffleManager来说,shuffle read的性能会更好。

3 总结

bypass与普通SortShuffleManager运行机制的不同在于:
第一,磁盘写机制不同;
第二,不会进行排序。也就是说,启用该机制的最大好处在于,shuffle write过程中,不需要进行数据的排序操作,也就节省掉了这部分的性能开销。

3 对比

https://www.zhihu.com/question/27643595

4 优化

因此在我们的开发过程中,能避免则尽可能避免使用会进行shuffle的算子,尽量使用非shuffle算子

1 shuffle算子:

https://blog.csdn.net/py_tamir/article/details/95457813

reduceByKey、join、distinct、repartition

2 非shuffle算子

map,flatMap

spark优化

8 Performance Optimization Techniques Using Spark

https://www.syntelli.com/eight-performance-optimization-techniques-using-spark#

Spark性能优化指南(美团)

https://tech.meituan.com/2016/04/29/spark-tuning-basic.html

https://tech.meituan.com/2016/05/12/spark-tuning-pro.html

0 分析web ui

分析时间的消耗

1.多个map合并(??)

1
rdd1.map().map() ->  rdd1.map() 

2.减少action算子

说白了就是多个action操作,transformation逻辑可以写一起,最后action

3 增加分区 增加并行度

分数是说rdd分区

并行度是说executor num*executor core num


:D 一言句子获取中...