dataframe

0.两种风格

DataFrame支持两种风格进行编程,分别是:

1 DSL风格

DSL称之为:领域特定语言。
其实就是指DataFrame的特有API
DSL风格意思就是以调用API的方式来处理Data
比如:df.where().limit()

2 SQL风格

SQL语法风格
SQL风格就是使用SQL语句处理DataFrame的数据
比如:spark.sql(“SELECT * FROM xxx)

1 用户自定义函数

步骤:

https://blog.csdn.net/qq_43665254/article/details/112379113

https://blog.csdn.net/sunflower_sara/article/details/104044412

1、定义函数

2、注册函数

3、使用函数

2 withColumn

1
2
3
4
5
6
7
8
9
10
from pyspark.sql.functions import col, lit

###
df.withColumn('age2', df.age + 2).collect()
### 结合udf
def fun(A,B):
XXXX
return XX
fun1 = udf(fun, StringType())
df.withColumn('age2', fun1(col_name1,col_name2))###

rdd

0.分类

算子就是分布式集合对象的api

rdd算子分为两类:1.transformation 2.action

https://blog.csdn.net/weixin_45271668/article/details/106441457

1 共享变量

https://blog.csdn.net/Android_xue/article/details/79780463

https://chowdera.com/2022/02/202202091419262471.html

两种共享变量:广播变量(broadcast variable)与累加器(accumulator)

广播变量解决了什么问题?
分布式集合RDD和本地集合进行关联使用的时候, 降低内存占用以及减少网络IO传输, 提高性能.

累加器解决了什么问题?
分布式代码执行中, 进行全局累加

1.广播变量

2.累加器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
sc = spark.sparkContext
rdd1=sc.parallelize([1,2,3,4,5,6,7,8,9,10],2)
count=sc.accumulator(0)

def map_func(data):
global count
count+=data

# count = sc.accumulator(0)
# rdd3=rdd2.map(lambda x:x)
# rdd3.collect()
# print(count)
start_time = time.time()
result=rdd1.reduce(lambda a, b: a + b)
end_time = time.time()
print(result)
print(end_time - start_time)
# print(count)
start_time = time.time()
rdd2 = rdd1.map(map_func)
rdd2.collect()
end_time = time.time()
print(count)
print(end_time - start_time)
1
2
3
4
55
1.092909574508667
55
0.09459614753723145

累加器和reduce都可以得到聚合结果,效率???谁先执行 谁短,怎么衡量

2 *ByKey 操作

https://blog.csdn.net/weixin_43810802/article/details/120772452

https://blog.csdn.net/zhuzuwei/article/details/104446388

https://blog.csdn.net/weixin_40161254/article/details/103472056

Use reduceByKey instead of groupByKey

groupByKey creates a lot of shuffling which hampers the performance, while reduceByKey does not shuffle the data as much

3 reduce

1
2
3
4
5
6
7
in:
table=pd.DataFrame({"num":[1,1,2]})
table = spark.createDataFrame(table)
from pyspark.sql import Row
table.rdd.reduce(lambda a,b:Row(num=a[0]+b[0]))
out:
Row(num=4)

聚合的作用

注意点就是 a,b 操作后的数据类型和a,b保持一致,举个例子 a+b 和a ,b类型一致,否则(a+b)+c会报错

4 collect、 take、top、first,foreach

1 collect

collect()

返回包含此RDD中的所有元素的列表

注意:因为所有数据都会加载到driver,所有只适用于数据量不大的情况

2 first

first()

返回RDD中的第一个元素

3 take

take(num)

取RDD的前num个元素

4 top

top(num, key=None)

排序

Get the top N elements from an RDD

5 foreach

foreach(f)

Applies a function to all elements of this RDD 分区输出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
def f(x):
print(x)

sc.parallelize([1,2,3,4,5]).foreach(f)

output:
4
3
2
1
5
会变
3
5
2
4
1

数据划分,rdd分区

https://www.jianshu.com/p/3aa52ee3a802

https://blog.csdn.net/hjw199089/article/details/77938688

https://blog.csdn.net/mys_35088/article/details/80864092

https://blog.csdn.net/dmy1115143060/article/details/82620715

https://blog.csdn.net/xxd1992/article/details/85254666

https://blog.csdn.net/m0_46657040/article/details/108737350

https://blog.csdn.net/heiren_a/article/details/111954523

https://blog.csdn.net/u011564172/article/details/53611109

https://blog.csdn.net/qq_22473611/article/details/107822168

https://www.jianshu.com/p/3aa52ee3a802

1 application,job,stage,task,record

0 application

任务

1 Job

一个action 一个job

2 Stage

一个job根据rdd的依赖关系构建dag,根据dag划分stage,一个job包含一个或者多个stage

3 Task

stage根据rdd的分区数决定task数量

4 record

一个task 对应很多record,也就是多少行数据

例子

这里应该是 rdd的分区数决定task数量,task数量决定inputsplit数量,然后决定block的组合

https://juejin.cn/post/6844903848536965134

2 rdd分区

为什么分区?

分区的主要作用是用来实现并行计算,提高效率

分区方式

Spark包含两种数据分区方式:HashPartitioner(哈希分区)和RangePartitioner(范围分区)

分区数设置

https://justdodt.github.io/2018/04/23/Spark-RDD-%E7%9A%84%E5%88%86%E5%8C%BA%E6%95%B0%E9%87%8F%E7%A1%AE%E5%AE%9A/

spark模块

整个Spark 框架模块包含:Spark Core、Spark SQL、Spark Streaming、Spark GraphX、Spark MLlib,而后四项的能力都是建立在核心引擎之上

Spark Core:Spark的核心,Spark核心功能均由Spark Core模块提供,是Spark运行的基础。Spark Core以RDD为数据抽象,提供Python、Java、Scala、R语言的API,可以编程进行海量离线数据批处理计算。
SparkSQL:基于SparkCore之上,提供结构化数据的处理模块。SparkSQL支持以SQL语言对数据进行处理,SparkSQL本身针对离线计算场景。同时基于SparkSQL,Spark提供了StructuredStreaming模块,可以以SparkSQL为基础,进行数据的流式计算。

数据抽象:dataset(Java、Scala) dataframe(Java、Scala、Python、R)
SparkStreaming:以SparkCore为基础,提供数据的流式计算功能。
MLlib:以SparkCore为基础,进行机器学习计算,内置了大量的机器学习库和API算法等。方便用户以分布式计算的模式进行机器学习计算。
GraphX:以SparkCore为基础,进行图计算,提供了大量的图计算API,方便用于以分布式计算模式进行图计算。

Spark vs MapReduce

对比

https://www.educba.com/mapreduce-vs-spark/

MapReduce Spark
Product’s Category From the introduction, we understood that MapReduce enables the processing of data and hence is majorly a data processing engine. Spark, on the other hand, is a framework that drives complete analytical solutions or applications and hence making it an obvious choice for data scientists to use this as a data analytics engine.
Framework’s Performance and Data Processing In the case of MapReduce, reading and writing operations are performed from and to a disk thus leading to slowness in the processing speed. In Spark, the number of read/write cycles is minimized along with storing data in memory allowing it to be 10 times faster. But spark may suffer a major degradation if data doesn’t fit in memory.
Latency As a result of lesser performance than Spark, MapReduce has a higher latency in computing. Since Spark is faster, it enables developers with low latency computing.
Manageability of framework MapReduce being only a batch engine, other components must be handled separately yet synchronously thus making it difficult to manage. Spark is a complete data analytics engine, has the capability to perform batch, interactive streaming, and similar component all under the same cluster umbrella and thus easier to manage!
Real-time Analysis MapReduce was built mainly for batch processing and hence fails when used for real-time analytics use cases. Data coming from real-time live streams like Facebook, Twitter, etc. can be efficiently managed and processed in Spark.
Interactive Mode MapReduce doesn’t provide the gamut of having interactive mode. In spark it is possible to process the data interactively
Security MapReduce has accessibility to all features of Hadoop security and as a result of this, it is can be easily integrated with other projects of Hadoop Security. MapReduce also supports ASLs. In Spark, the security is by default set to OFF which might lead to a major security fallback. In the case of authentication, only the shared secret password method is possible in Spark.
Tolerance to Failure In case of crash of MapReduce process, the process is capable of starting from the place where it was left off earlier as it relies on Hard Drives rather than RAMs In case of crash of Spark process, the processing should start from the beginning and hence becomes less fault-tolerant than MapReduce as it relies of RAM usage.

spark为什么比MapReduce快

https://blog.csdn.net/JENREY/article/details/84873874

1 spark基于内存 ,mapreduce基于磁盘

指的是中间结果

MapReduce:通常需要将计算的中间结果写入磁盘,然后还要读取磁盘,从而导致了频繁的磁盘IO

Spark:不需要每次将计算的中间结果写入磁盘

2 spark粗粒度资源申请,MapReduce细粒度资源申请

spark 执行task不需要自己申请资源,提交任务的时候统一申请了

MapReduce 执行task任务的时候,task自己申请

3 spark基于多线程,mapreduce基于多进程

spark配置

1.设置方式

https://blog.51cto.com/u_16213328/7866422

优先级

hadoop设置???

2.代码中设置(SparkSession、SparkContext、HiveContext、SQLContext)

https://blog.csdn.net/weixin_43648241/article/details/108917865

SparkSession > SparkContext > HiveContext > SQLContext

SparkSession包含SparkContext

SparkContext包含HiveContext

HiveContext包含SQLContext

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
SparkSession.builder.\
config("hive.metastore.uris", "thrift://xxx.xx.x.xx:xxxx").\
config("spark.pyspark.python", "/opt/dm_python3/bin/python").\
config('spark.default.parallelism ', 10 ).\
config('spark.sql.shuffle.partitions', 200 ).\
config("spark.driver.maxResultSize", "16g").\
config("spark.port.maxRetries", "100").\
config("spark.driver.memory","16g").\
config("spark.yarn.queue", "dcp" ).\
config("spark.executor.memory", "16g" ).\
config( "spark.executor.cores", 20).\
config("spark.files", addfile).\
config( "spark.executor.instances", 6 ).\
config("spark.speculation", False).\
config( "spark.submit.pyFiles", zipfile).\
appName("testing").\
master("yarn").\
enableHiveSupport().\
getOrCreate()![11](D:\blog\blog\source\_posts\context\11.JPG)

:D 一言句子获取中...