持久化

https://cloud.tencent.com/developer/article/1760389

https://blog.csdn.net/dudadudadd/article/details/114102341

https://yiqingqing.blog.csdn.net/article/details/121772325

https://blog.csdn.net/feizuiku0116/article/details/122839247

https://blog.csdn.net/CyAurora/article/details/119654676

https://www.cnblogs.com/Transkai/p/11347224.html

https://blog.csdn.net/CyAurora/article/details/119654676

https://blog.csdn.net/dudadudadd/article/details/114102341

1 缓存

懒执行

空间换时间

rdd3如果不消失,那么绿色链路就不用执行两次

持久化的目标就是将rdd3保存到内存或者磁盘

但是有丢失风险,比如硬盘损坏,内存被清理等,所以为了规避风险,会保留rdd的血缘(依赖)关系

如何保存:

1 persist

2 cache

https://blog.csdn.net/donger__chen/article/details/86366339

底层调用persist,persist的特殊情况,persist(MEMORY_ONLY)

2 checkpoint

特殊的持久化

仅支持硬盘

设计上认为安全没有风险,所以不需要保留血缘关系

如何保存:

3 对比

Hive与传统数据库对比

Hive 传统数据库
查询语言 HQL SQL
数据存储 HDFS Raw Device或者 Local FS
数据格式 用户自定义 系统决定
数据更新 不支持 支持
执行 MapReduce Excutor
执行延迟
处理数据规模
索引 0.8版本后加入位图索引 有复杂的索引
可扩展性

https://cloud.tencent.com/developer/article/1785857

各组件web ui的地址

https://blog.csdn.net/qq_41851454/article/details/79938811

node ip+port

yarn

resource maneger +8088

hdfs

namenode +50070/9870/9871

spark

4040: 是一个运行的Application在运行的过程中临时绑定的端口,用以查看当前任务的状态.4040被占用会顺延到4041.4042等.4040是一个临时端口,当前程序运行完成后, 4040就会被注销哦。当使用spark交互工具,如spark-sql,spark-shell

8080: 默认是StandAlone下, Master角色(进程)的WEB端口,用以查看当前Master(集群)的状态

18080: 默认是历史服务器的端口, 由于每个程序运行完成后,4040端口就被注销了. 在以后想回看某个程序的运行状态就可以通过历史服务器查看,历史服务器长期稳定运行,可供随时查看被记录的程序的运行过程.

配置历史服务器

https://blog.csdn.net/Heitao5200/article/details/79674684

https://blog.csdn.net/yu0_zhang0/article/details/80396080

注意端口号和hadoop一致,9000->8020

Apache Flink runs the dashboard on port 8081. Since this is a common port there might be conflict with some other services running on the same machines

port和端口可以在flink/conf/flink-conf.yaml 中查看

hive metastore

端口9083

hbase

16010

dataframe

0.两种风格

DataFrame支持两种风格进行编程,分别是:

1 DSL风格

DSL称之为:领域特定语言。
其实就是指DataFrame的特有API
DSL风格意思就是以调用API的方式来处理Data
比如:df.where().limit()

2 SQL风格

SQL语法风格
SQL风格就是使用SQL语句处理DataFrame的数据
比如:spark.sql(“SELECT * FROM xxx)

1 用户自定义函数

步骤:

https://blog.csdn.net/qq_43665254/article/details/112379113

https://blog.csdn.net/sunflower_sara/article/details/104044412

1、定义函数

2、注册函数

3、使用函数

2 withColumn

1
2
3
4
5
6
7
8
9
10
from pyspark.sql.functions import col, lit

###
df.withColumn('age2', df.age + 2).collect()
### 结合udf
def fun(A,B):
XXXX
return XX
fun1 = udf(fun, StringType())
df.withColumn('age2', fun1(col_name1,col_name2))###

rdd

0.分类

算子就是分布式集合对象的api

rdd算子分为两类:1.transformation 2.action

https://blog.csdn.net/weixin_45271668/article/details/106441457

1 共享变量

https://blog.csdn.net/Android_xue/article/details/79780463

https://chowdera.com/2022/02/202202091419262471.html

两种共享变量:广播变量(broadcast variable)与累加器(accumulator)

广播变量解决了什么问题?
分布式集合RDD和本地集合进行关联使用的时候, 降低内存占用以及减少网络IO传输, 提高性能.

累加器解决了什么问题?
分布式代码执行中, 进行全局累加

1.广播变量

2.累加器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
sc = spark.sparkContext
rdd1=sc.parallelize([1,2,3,4,5,6,7,8,9,10],2)
count=sc.accumulator(0)

def map_func(data):
global count
count+=data

# count = sc.accumulator(0)
# rdd3=rdd2.map(lambda x:x)
# rdd3.collect()
# print(count)
start_time = time.time()
result=rdd1.reduce(lambda a, b: a + b)
end_time = time.time()
print(result)
print(end_time - start_time)
# print(count)
start_time = time.time()
rdd2 = rdd1.map(map_func)
rdd2.collect()
end_time = time.time()
print(count)
print(end_time - start_time)
1
2
3
4
55
1.092909574508667
55
0.09459614753723145

累加器和reduce都可以得到聚合结果,效率???谁先执行 谁短,怎么衡量

2 *ByKey 操作

https://blog.csdn.net/weixin_43810802/article/details/120772452

https://blog.csdn.net/zhuzuwei/article/details/104446388

https://blog.csdn.net/weixin_40161254/article/details/103472056

Use reduceByKey instead of groupByKey

groupByKey creates a lot of shuffling which hampers the performance, while reduceByKey does not shuffle the data as much

3 reduce

1
2
3
4
5
6
7
in:
table=pd.DataFrame({"num":[1,1,2]})
table = spark.createDataFrame(table)
from pyspark.sql import Row
table.rdd.reduce(lambda a,b:Row(num=a[0]+b[0]))
out:
Row(num=4)

聚合的作用

注意点就是 a,b 操作后的数据类型和a,b保持一致,举个例子 a+b 和a ,b类型一致,否则(a+b)+c会报错

4 collect、 take、top、first,foreach

1 collect

collect()

返回包含此RDD中的所有元素的列表

注意:因为所有数据都会加载到driver,所有只适用于数据量不大的情况

2 first

first()

返回RDD中的第一个元素

3 take

take(num)

取RDD的前num个元素

4 top

top(num, key=None)

排序

Get the top N elements from an RDD

5 foreach

foreach(f)

Applies a function to all elements of this RDD 分区输出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
def f(x):
print(x)

sc.parallelize([1,2,3,4,5]).foreach(f)

output:
4
3
2
1
5
会变
3
5
2
4
1

:D 一言句子获取中...