spark使用总结

小技巧

1.调试

先local 然后集群

可以用本地ide 或者jupyter 调试

2.把数据想象成表,本质就是对行操作

3.多少task就是多少砖,多少并行度就是有多少工人,需要设置合理

日志

如果是yarn client模式

1.driver

客户端可以查看

2.executor

yarn

web ui可以查看

命令行查看 yarn logs —applicationId XXX

spark

web ui可以查看

不同行保存到不同文件

增加字段 区分不同行

落盘的时候 partitionBy(“field1”,”field2”)

spark oom(out of memory)问题

https://blog.csdn.net/yhb315279058/article/details/51035631

https://www.cnblogs.com/yanshw/p/11988347.html

1 driver内存不够

增加 Driver 内存

1
--driver-memory MEM         Memory for driver (e.g. 1000M, 2G) (Default: 1024M). 

1 读入数据太大

解决思路是增加 Driver 内存

1
2
3
4
5
from pyspark import SparkContext
sc = SparkContext(master='yarn')
rdd = sc.parallelize(range(300000000))
# spark-submit --master yarn-client --driver-memory 512M driver_oom.py 内存溢出
# spark-submit --master yarn-client --driver-memory 3G driver_oom.py 可以执行

2 数据回传太大,也就是聚合到driver的数据太大

解决思路是分区输出,具体做法是 foreach

1
2
3
4
5
rdd = sc.parallelize(range(100))
rdd.flatMap(lambda x: ['%d'%x*50 for _ in range(100000)]).collect() # 内存溢出

def func(x): print(x)
rdd.flatMap(lambda x: ['%d'%x*50 for _ in range(100000)]).foreach(func) # 分区输出

2 excutor内存不够

通用的解决办法就是增加 Executor 内存 但这并不一定是最好的办法

1 map 过程产生大量对象

解决思路是减少每个 task 的大小,从而减少每个 task 的输出

具体做法是在 会产生大量对象的 map 操作前 添加 repartition(重新分区) 方法,分区成更小的块传入 map

1
2
3
4
rdd.flatMap(lambda x: ['%d'%x*50 for _ in range(100000000)]).count()      # 100 * 100000000 个对象,内存溢出
rdd.flatMap(lambda x: len(['%d'%x*50 for _ in range(100000000)])).sum() # 内存溢出

rdd.repartition(1000000).flatMap(lambda x: ['%d'%x*50 for _ in range(100000000)]).count()

2 shuffle导致

shuffle内存溢出的情况可以说都是shuffle后发生数据倾斜,单个文件过大导致

参考数据倾斜解决方案

spark资源参数调优

https://www.cnblogs.com/gtscool/p/13072051.html

https://blog.csdn.net/l1394049664/article/details/81811642

https://tech.meituan.com/2016/04/29/spark-tuning-basic.html

性能

task

设置分区数

  • 参数说明:该参数用于设置每个stage的默认task数量。这个参数极为重要,如果不设置可能会直接影响你的Spark作业性能。
  • 参数调优建议:Spark作业的默认task数量为500~1000个较为合适。很多同学常犯的一个错误就是不去设置这个参数,那么此时就会导致Spark自己根据底层HDFS的block数量来设置task的数量,默认是一个HDFS block对应一个task。通常来说,Spark默认设置的数量是偏少的(比如就几十个task),如果task数量偏少的话,就会导致你前面设置好的Executor的参数都前功尽弃。试想一下,无论你的Executor进程有多少个,内存和CPU有多大,但是task只有1个或者10个,那么90%的Executor进程可能根本就没有task执行,也就是白白浪费了资源!因此Spark官网建议的设置原则是,设置该参数为num-executors * executor-cores的2~3倍较为合适,比如Executor的总CPU core数量为300个,那么设置1000个task是可以的,此时可以充分地利用Spark集群的资源。

worker

几个执行节点 ,一个worker 一般对应1个 executor

executor

num-executors

  • 参数说明:该参数用于设置Spark作业总共要用多少个Executor进程来执行。Driver在向YARN集群管理器申请资源时,YARN集群管理器会尽可能按照你的设置来在集群的各个工作节点上,启动相应数量的Executor进程。这个参数非常之重要,如果不设置的话,默认只会给你启动少量的Executor进程,此时你的Spark作业的运行速度是非常慢的。
  • 参数调优建议:每个Spark作业的运行一般设置50~100个左右的Executor进程比较合适,设置太少或太多的Executor进程都不好。设置的太少,无法充分利用集群资源;设置的太多的话,大部分队列可能无法给予充分的资源。

executor-cores

  • 参数说明:该参数用于设置每个Executor进程的CPU core数量。这个参数决定了每个Executor进程并行执行task线程的能力。因为每个CPU core同一时间只能执行一个task线程,因此每个Executor进程的CPU core数量越多,越能够快速地执行完分配给自己的所有task线程。
  • 参数调优建议:Executor的CPU core数量设置为2~4个较为合适。同样得根据不同部门的资源队列来定,可以看看自己的资源队列的最大CPU core限制是多少,再依据设置的Executor数量,来决定每个Executor进程可以分配到几个CPU core。同样建议,如果是跟他人共享这个队列,那么num-executors * executor-cores不要超过队列总CPU core的1/3~1/2左右比较合适,也是避免影响其他同学的作业运行。

内存

driver

driver-memory

  • 参数说明:该参数用于设置Driver进程的内存
  • 参数调优建议:Driver的内存通常来说不设置,或者设置1G左右应该就够了。唯一需要注意的一点是,如果需要使用collect算子将RDD的数据全部拉取到Driver上进行处理,那么必须确保Driver的内存足够大,否则会出现OOM内存溢出的问题。

executor

Executor的内存主要分为三块:第一块是让task执行我们自己编写的代码时使用,默认是占Executor总内存的20%;第二块是让task通过shuffle过程拉取了上一个stage的task的输出后,进行聚合等操作时使用,默认也是占Executor总内存的20%;第三块是让RDD持久化时使用,默认占Executor总内存的60%。

executor-memory

  • 参数说明:该参数用于设置每个Executor进程的内存。Executor内存的大小,很多时候直接决定了Spark作业的性能,而且跟常见的JVM OOM异常,也有直接的关联。
  • 参数调优建议:每个Executor进程的内存设置4G~8G较为合适。但是这只是一个参考值,具体的设置还是得根据不同部门的资源队列来定。可以看看自己团队的资源队列的最大内存限制是多少,num-executors乘以executor-memory,是不能超过队列的最大内存量的。此外,如果你是跟团队里其他人共享这个资源队列,那么申请的内存量最好不要超过资源队列最大总内存的1/3~1/2,避免你自己的Spark作业占用了队列所有的资源,导致别的同学的作业无法运行。

spark.shuffle.memoryFraction

  • 参数说明:该参数用于设置shuffle过程中一个task拉取到上个stage的task的输出后,进行聚合操作时能够使用的Executor内存的比例,默认是0.2。也就是说,Executor默认只有20%的内存用来进行该操作。shuffle操作在进行聚合时,如果发现使用的内存超出了这个20%的限制,那么多余的数据就会溢写到磁盘文件中去,此时就会极大地降低性能。
  • 参数调优建议:如果Spark作业中的RDD持久化操作较少,shuffle操作较多时,建议降低持久化操作的内存占比,提高shuffle操作的内存占比比例,避免shuffle过程中数据过多时内存不够用,必须溢写到磁盘上,降低了性能。此外,如果发现作业由于频繁的gc导致运行缓慢,意味着task执行用户代码的内存不够用,那么同样建议调低这个参数的值。

spark.storage.memoryFraction

  • 参数说明:该参数用于设置RDD持久化数据在Executor内存中能占的比例,默认是0.6。也就是说,默认Executor 60%的内存,可以用来保存持久化的RDD数据。根据你选择的不同的持久化策略,如果内存不够时,可能数据就不会持久化,或者数据会写入磁盘。
  • 参数调优建议:如果Spark作业中,有较多的RDD持久化操作,该参数的值可以适当提高一些,保证持久化的数据能够容纳在内存中。避免内存不够缓存所有的数据,导致数据只能写入磁盘中,降低了性能。但是如果Spark作业中的shuffle类操作比较多,而持久化操作比较少,那么这个参数的值适当降低一些比较合适。此外,如果发现作业由于频繁的gc导致运行缓慢(通过spark web ui可以观察到作业的gc耗时),意味着task执行用户代码的内存不够用,那么同样建议调低这个参数的值。

资源参数参考示例

以下是一份spark-submit命令的示例,大家可以参考一下,并根据自己的实际情况进行调节:

1
2
3
4
5
6
7
8
9
./bin/spark-submit \
--master yarn-cluster \
--num-executors 100 \
--executor-memory 6G \
--executor-cores 4 \
--driver-memory 1G \
--conf spark.default.parallelism=1000 \
--conf spark.storage.memoryFraction=0.5 \
--conf spark.shuffle.memoryFraction=0.3 \

spark常见错误

Python in worker has different version 2.7 than that in driver 3.7, PySpark cannot run with different minor versions

核心思路:分别指定driver和excutor的python版本,使其统一

方法一:修改环境变量

1./在环境变量文件 /etc/profile 中添加指定的pyspark,python的版本

1
2
export PYSPARK_PYTHON=指定的python路径
export PYSPARK_DRIVER_PYTHON=指定的python路径

保存后source一下 /etc/profile ,使之生效

2.代码内指定

1
2
os.environ["PYSPARK_DRIVER_PYTHON"]="" ##driver 
os.environ["PYSPARK_PYTHON"]="" ### worker ,excutor

方法二:spark-submit工具指定

在spark-submit时增加参数 --conf spark.pyspark.python--conf spark.pyspark.driver.python

1
2
3
spark-submit \
--driver-memory 5g --num-executors 5 --executor-cores 1 --executor-memory 1G
--conf spark.pyspark.python=./.../bin/python --conf spark.pyspark.driver.python=./.../bin/python xx.py

spark.sql 不能查询到hive的数据库,只查询到default数据库

说明spark没有连接到hive

https://www.cnblogs.com/yjt1993/p/13963144.html

持久化

https://cloud.tencent.com/developer/article/1760389

https://blog.csdn.net/dudadudadd/article/details/114102341

https://yiqingqing.blog.csdn.net/article/details/121772325

https://blog.csdn.net/feizuiku0116/article/details/122839247

https://blog.csdn.net/CyAurora/article/details/119654676

https://www.cnblogs.com/Transkai/p/11347224.html

https://blog.csdn.net/CyAurora/article/details/119654676

https://blog.csdn.net/dudadudadd/article/details/114102341

1 缓存

懒执行

空间换时间

rdd3如果不消失,那么绿色链路就不用执行两次

持久化的目标就是将rdd3保存到内存或者磁盘

但是有丢失风险,比如硬盘损坏,内存被清理等,所以为了规避风险,会保留rdd的血缘(依赖)关系

如何保存:

1 persist

2 cache

https://blog.csdn.net/donger__chen/article/details/86366339

底层调用persist,persist的特殊情况,persist(MEMORY_ONLY)

2 checkpoint

特殊的持久化

仅支持硬盘

设计上认为安全没有风险,所以不需要保留血缘关系

如何保存:

3 对比


:D 一言句子获取中...