spark使用总结

小技巧

1.调试

先local 然后集群

可以用本地ide 或者jupyter 调试

2.把数据想象成表,本质就是对行操作

3.多少task就是多少砖,多少并行度就是有多少工人,需要设置合理

日志

如果是yarn client模式

1.driver

客户端可以查看

2.executor

yarn

web ui可以查看

命令行查看 yarn logs —applicationId XXX

spark

web ui可以查看

不同行保存到不同文件

增加字段 区分不同行

落盘的时候 partitionBy(“field1”,”field2”)

spark常见错误

Python in worker has different version 2.7 than that in driver 3.7, PySpark cannot run with different minor versions

核心思路:分别指定driver和excutor的python版本,使其统一

方法一:修改环境变量

1./在环境变量文件 /etc/profile 中添加指定的pyspark,python的版本

1
2
export PYSPARK_PYTHON=指定的python路径
export PYSPARK_DRIVER_PYTHON=指定的python路径

保存后source一下 /etc/profile ,使之生效

2.代码内指定

1
2
os.environ["PYSPARK_DRIVER_PYTHON"]="" ##driver 
os.environ["PYSPARK_PYTHON"]="" ### worker ,excutor

方法二:spark-submit工具指定

在spark-submit时增加参数 --conf spark.pyspark.python--conf spark.pyspark.driver.python

1
2
3
spark-submit \
--driver-memory 5g --num-executors 5 --executor-cores 1 --executor-memory 1G
--conf spark.pyspark.python=./.../bin/python --conf spark.pyspark.driver.python=./.../bin/python xx.py

spark.sql 不能查询到hive的数据库,只查询到default数据库

说明spark没有连接到hive

https://www.cnblogs.com/yjt1993/p/13963144.html

dataframe

0.两种风格

DataFrame支持两种风格进行编程,分别是:

1 DSL风格

DSL称之为:领域特定语言。
其实就是指DataFrame的特有API
DSL风格意思就是以调用API的方式来处理Data
比如:df.where().limit()

2 SQL风格

SQL语法风格
SQL风格就是使用SQL语句处理DataFrame的数据
比如:spark.sql(“SELECT * FROM xxx)

1 用户自定义函数

步骤:

https://blog.csdn.net/qq_43665254/article/details/112379113

https://blog.csdn.net/sunflower_sara/article/details/104044412

1、定义函数

2、注册函数

3、使用函数

2 withColumn

1
2
3
4
5
6
7
8
9
10
from pyspark.sql.functions import col, lit

###
df.withColumn('age2', df.age + 2).collect()
### 结合udf
def fun(A,B):
XXXX
return XX
fun1 = udf(fun, StringType())
df.withColumn('age2', fun1(col_name1,col_name2))###

rdd

0.分类

算子就是分布式集合对象的api

rdd算子分为两类:1.transformation 2.action

https://blog.csdn.net/weixin_45271668/article/details/106441457

1 共享变量

https://blog.csdn.net/Android_xue/article/details/79780463

https://chowdera.com/2022/02/202202091419262471.html

两种共享变量:广播变量(broadcast variable)与累加器(accumulator)

广播变量解决了什么问题?
分布式集合RDD和本地集合进行关联使用的时候, 降低内存占用以及减少网络IO传输, 提高性能.

累加器解决了什么问题?
分布式代码执行中, 进行全局累加

1.广播变量

2.累加器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
sc = spark.sparkContext
rdd1=sc.parallelize([1,2,3,4,5,6,7,8,9,10],2)
count=sc.accumulator(0)

def map_func(data):
global count
count+=data

# count = sc.accumulator(0)
# rdd3=rdd2.map(lambda x:x)
# rdd3.collect()
# print(count)
start_time = time.time()
result=rdd1.reduce(lambda a, b: a + b)
end_time = time.time()
print(result)
print(end_time - start_time)
# print(count)
start_time = time.time()
rdd2 = rdd1.map(map_func)
rdd2.collect()
end_time = time.time()
print(count)
print(end_time - start_time)
1
2
3
4
55
1.092909574508667
55
0.09459614753723145

累加器和reduce都可以得到聚合结果,效率???谁先执行 谁短,怎么衡量

2 *ByKey 操作

https://blog.csdn.net/weixin_43810802/article/details/120772452

https://blog.csdn.net/zhuzuwei/article/details/104446388

https://blog.csdn.net/weixin_40161254/article/details/103472056

Use reduceByKey instead of groupByKey

groupByKey creates a lot of shuffling which hampers the performance, while reduceByKey does not shuffle the data as much

3 reduce

1
2
3
4
5
6
7
in:
table=pd.DataFrame({"num":[1,1,2]})
table = spark.createDataFrame(table)
from pyspark.sql import Row
table.rdd.reduce(lambda a,b:Row(num=a[0]+b[0]))
out:
Row(num=4)

聚合的作用

注意点就是 a,b 操作后的数据类型和a,b保持一致,举个例子 a+b 和a ,b类型一致,否则(a+b)+c会报错

4 collect、 take、top、first,foreach

1 collect

collect()

返回包含此RDD中的所有元素的列表

注意:因为所有数据都会加载到driver,所有只适用于数据量不大的情况

2 first

first()

返回RDD中的第一个元素

3 take

take(num)

取RDD的前num个元素

4 top

top(num, key=None)

排序

Get the top N elements from an RDD

5 foreach

foreach(f)

Applies a function to all elements of this RDD 分区输出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
def f(x):
print(x)

sc.parallelize([1,2,3,4,5]).foreach(f)

output:
4
3
2
1
5
会变
3
5
2
4
1

spark模块

整个Spark 框架模块包含:Spark Core、Spark SQL、Spark Streaming、Spark GraphX、Spark MLlib,而后四项的能力都是建立在核心引擎之上

Spark Core:Spark的核心,Spark核心功能均由Spark Core模块提供,是Spark运行的基础。Spark Core以RDD为数据抽象,提供Python、Java、Scala、R语言的API,可以编程进行海量离线数据批处理计算。
SparkSQL:基于SparkCore之上,提供结构化数据的处理模块。SparkSQL支持以SQL语言对数据进行处理,SparkSQL本身针对离线计算场景。同时基于SparkSQL,Spark提供了StructuredStreaming模块,可以以SparkSQL为基础,进行数据的流式计算。

数据抽象:dataset(Java、Scala) dataframe(Java、Scala、Python、R)
SparkStreaming:以SparkCore为基础,提供数据的流式计算功能。
MLlib:以SparkCore为基础,进行机器学习计算,内置了大量的机器学习库和API算法等。方便用户以分布式计算的模式进行机器学习计算。
GraphX:以SparkCore为基础,进行图计算,提供了大量的图计算API,方便用于以分布式计算模式进行图计算。

spark配置

1.设置方式

https://blog.51cto.com/u_16213328/7866422

优先级

hadoop设置???

2.代码中设置(SparkSession、SparkContext、HiveContext、SQLContext)

https://blog.csdn.net/weixin_43648241/article/details/108917865

SparkSession > SparkContext > HiveContext > SQLContext

SparkSession包含SparkContext

SparkContext包含HiveContext

HiveContext包含SQLContext

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
SparkSession.builder.\
config("hive.metastore.uris", "thrift://xxx.xx.x.xx:xxxx").\
config("spark.pyspark.python", "/opt/dm_python3/bin/python").\
config('spark.default.parallelism ', 10 ).\
config('spark.sql.shuffle.partitions', 200 ).\
config("spark.driver.maxResultSize", "16g").\
config("spark.port.maxRetries", "100").\
config("spark.driver.memory","16g").\
config("spark.yarn.queue", "dcp" ).\
config("spark.executor.memory", "16g" ).\
config( "spark.executor.cores", 20).\
config("spark.files", addfile).\
config( "spark.executor.instances", 6 ).\
config("spark.speculation", False).\
config( "spark.submit.pyFiles", zipfile).\
appName("testing").\
master("yarn").\
enableHiveSupport().\
getOrCreate()![11](D:\blog\blog\source\_posts\context\11.JPG)

提交Spark任务

1.spark-submit

https://spark.apache.org/docs/latest/submitting-applications.html

The spark-submit script in Spark’s bin directory is used to launch applications on a cluster. It can use all of Spark’s supported cluster managers through a uniform interface so you don’t have to configure your application especially for each one.

1
2
3
4
5
6
7
8
./bin/spark-submit \
--class <main-class> \
--master <master-url> \
--deploy-mode <deploy-mode> \
--conf <key>=<value> \
... # other options
<application-jar> \
[application-arguments]
  • --class: The entry point for your application (e.g. org.apache.spark.examples.SparkPi)
  • --master: The master URL for the cluster (e.g. spark://23.195.26.187:7077)
  • --deploy-mode: Whether to deploy your driver on the worker nodes (cluster) or locally as an external client (client) (default: client)
  • --conf: Arbitrary Spark configuration property in key=value format. For values that contain spaces wrap “key=value” in quotes (as shown). Multiple configurations should be passed as separate arguments. (e.g. --conf = --conf =)
  • application-jar: Path to a bundled jar including your application and all dependencies. The URL must be globally visible inside of your cluster, for instance, an hdfs:// path or a file:// path that is present on all nodes.
  • application-arguments: Arguments passed to the main method of your main class, if any

当前为客户端,driver在哪取决于deploy mode

2.python file.py

应该只能local和client

此时若是代码指定cluster会报错

1
config("spark.submit.deployMode", "cluster")

Exception in thread “main” org.apache.spark.SparkException: Cluster deploy mode is not applicable to Spark shells.

3.jupyter notebook

应该只能local和clien


:D 一言句子获取中...