rdd

0.分类

算子就是分布式集合对象的api

rdd算子分为两类:1.transformation 2.action

https://blog.csdn.net/weixin_45271668/article/details/106441457

1 共享变量

https://blog.csdn.net/Android_xue/article/details/79780463

https://chowdera.com/2022/02/202202091419262471.html

两种共享变量:广播变量(broadcast variable)与累加器(accumulator)

广播变量解决了什么问题?
分布式集合RDD和本地集合进行关联使用的时候, 降低内存占用以及减少网络IO传输, 提高性能.

累加器解决了什么问题?
分布式代码执行中, 进行全局累加

1.广播变量

2.累加器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
sc = spark.sparkContext
rdd1=sc.parallelize([1,2,3,4,5,6,7,8,9,10],2)
count=sc.accumulator(0)

def map_func(data):
global count
count+=data

# count = sc.accumulator(0)
# rdd3=rdd2.map(lambda x:x)
# rdd3.collect()
# print(count)
start_time = time.time()
result=rdd1.reduce(lambda a, b: a + b)
end_time = time.time()
print(result)
print(end_time - start_time)
# print(count)
start_time = time.time()
rdd2 = rdd1.map(map_func)
rdd2.collect()
end_time = time.time()
print(count)
print(end_time - start_time)
1
2
3
4
55
1.092909574508667
55
0.09459614753723145

累加器和reduce都可以得到聚合结果,效率???谁先执行 谁短,怎么衡量

2 *ByKey 操作

https://blog.csdn.net/weixin_43810802/article/details/120772452

https://blog.csdn.net/zhuzuwei/article/details/104446388

https://blog.csdn.net/weixin_40161254/article/details/103472056

Use reduceByKey instead of groupByKey

groupByKey creates a lot of shuffling which hampers the performance, while reduceByKey does not shuffle the data as much

3 reduce

1
2
3
4
5
6
7
in:
table=pd.DataFrame({"num":[1,1,2]})
table = spark.createDataFrame(table)
from pyspark.sql import Row
table.rdd.reduce(lambda a,b:Row(num=a[0]+b[0]))
out:
Row(num=4)

聚合的作用

注意点就是 a,b 操作后的数据类型和a,b保持一致,举个例子 a+b 和a ,b类型一致,否则(a+b)+c会报错

4 collect、 take、top、first,foreach

1 collect

collect()

返回包含此RDD中的所有元素的列表

注意:因为所有数据都会加载到driver,所有只适用于数据量不大的情况

2 first

first()

返回RDD中的第一个元素

3 take

take(num)

取RDD的前num个元素

4 top

top(num, key=None)

排序

Get the top N elements from an RDD

5 foreach

foreach(f)

Applies a function to all elements of this RDD 分区输出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
def f(x):
print(x)

sc.parallelize([1,2,3,4,5]).foreach(f)

output:
4
3
2
1
5
会变
3
5
2
4
1

sql常见操作

1 拼接

1 union ,union all

https://www.w3school.com.cn/sql/sql_union.asp

1
2
3
select 'mobile' as platform union
select 'desktop' as platform union
select 'both' as platform

2 join

left join 、right join、 inner join,FULL OUTER JOIN,默认join 为 inner join

https://www.cnblogs.com/ingstyle/p/4368064.html

1
2
3
4
5
6
7
8
#多个left join
select a.*,b.*,c.*
from a left join b on a.id=b.id
left join c on b.id=c.id

select *
from Trips T left join Users U1 on T.client_id =U1.users_id
left join Users U2 on T.driver_id =U2.users_id
1
2
3
4
5
from Trips T1
left join Users U1 on (T1.client_id =U1.users_id and U1.banned ="Yes" ) 不可

from Trips T1
join Users U1 on (T1.client_id =U1.users_id and U1.banned ="Yes" ) 可

3.from student A,student B,student C

将三个 student 表相互连接成一个

https://blog.csdn.net/zhangyj_315/article/details/2249209

2 分组

1 group by

1.group by columns1,columns2

按照字段分组,多个字段看成整体, 分完每组就一行,取每组第一行数据

2.+条件判断

Having

having 子句的作用是筛选满足条件的组,不是在组内选行

在 SQL 中增加 HAVING 子句原因是,WHERE 关键字无法与聚合函数一起使用。

3.注意

有时候不能取第一行,怎么解决?1070. 产品销售分析 III

a 可以先排序,把目标行变成第一行 可以参考https://blog.csdn.net/shiyong1949/article/details/78482737 好像不行

b 使用开窗函数解决,可以

4 子查询

嵌套

http://c.biancheng.net/sql/sub-query.html

1
2
3
4
select a.Score as Score,
(select count(DISTINCT b.Score) from Scores b where b.Score >= a.Score) as 'Rank'
from Scores a
order by a.Score DESC

5 别名

as后加别名,也可不要as

6 模糊匹配

关键字like

通配符

https://www.cnblogs.com/Leophen/p/11397621.html

1
select * from table where name like "%三%"

7 条件

1.IF

表达式:IF( condition, valua1, value2 )

条件为true,则值是valua1,条件为false,值就是value2

2 case

input几行output几行

一行一行来

https://zhuanlan.zhihu.com/p/240717732

两种形式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
--type1
CASE <表达式>
WHEN <值1> THEN <操作>
WHEN <值2> THEN <操作>
...
ELSE <操作>
END
--type2
CASE
WHEN <条件1> THEN <命令>
WHEN <条件2> THEN <命令>
...
ELSE commands
END

then后面多个值

then 1 可

then (1,1) 不可

where xx in (1,1)可

3 where

1126. 查询活跃业务

1
2
3
4
5
6
7
8
# Write your MySQL query statement below
select t.business_id
from
(select *, avg(E1.occurences ) over(partition by E1.event_type ) as ave_occu
from Events E1 ) t
where t.occurences > t.ave_occu
group by t.business_id
having count(t.business_id)>=2

4 判断

1 in

(E1.id , E1.month) in ((1,8))

2 BETWEEN AND

BETWEEN ‘2019-06-28’ AND ‘2019-07-27’

3 EXISTS

用于判断查询子句是否有记录,如果有一条或多条记录存在返回 True,否则返回 False。

1
2
3
4
SELECT column_name(s)
FROM table_name
WHERE EXISTS
(SELECT column_name FROM table_name WHERE condition);

8 NULL

1.空字符和null的区别

https://blog.csdn.net/weixin_42214393/article/details/80463912

2.判断NULL

is not NULL

!=NULL 有问题

ifnull(sum(quantity), 0)

3 和 in、not in结合时

https://blog.csdn.net/qq_22592457/article/details/108024521

  1. 使用in时,in后面的括号中忽略null值
  2. 使用not in时,如果 not in后面的括号中没有null,正常判断,会查询条件列中符合要求的数据
  3. 使用not in时,如果 not in后面的括号中有null,直接返回false,查询结果为空。

9 日期

1 大小判断

available_from < ‘2019-05-23’

datediff(date1,date2)

2 格式转化

DATE_FORMAT()

https://www.w3school.com.cn/sql/func_date_format.asp

10 去重

1 distinct

https://blog.csdn.net/zhangzehai2234/article/details/88361586

1
select distinct expression[,expression...] from tables [where conditions];

在使用distinct的过程中主要注意一下几点:

  • 在对字段进行去重的时候,要保证distinct在所有字段的最前面
  • 如果distinct关键字后面有多个字段时,则会对多个字段进行组合去重,只有多个字段组合起来的值是相等的才会被去重

distinct , count 一起用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
##建表
Create table If Not Exists Spending (user_id int, spend_date date, platform ENUM('desktop', 'mobile'), amount int);
Truncate table Spending;
insert into Spending (user_id, spend_date, platform, amount) values ('1', '2019-07-01', 'mobile', '100');
insert into Spending (user_id, spend_date, platform, amount) values ('1', '2019-07-01', 'desktop', '100');
insert into Spending (user_id, spend_date, platform, amount) values ('2', '2019-07-01', 'mobile', '100');
insert into Spending (user_id, spend_date, platform, amount) values ('2', '2019-07-02', 'mobile', '100');
insert into Spending (user_id, spend_date, platform, amount) values ('3', '2019-07-01', 'desktop', '100');
insert into Spending (user_id, spend_date, platform, amount) values ('3', '2019-07-02', 'desktop', '100');

###查询
select distinct spend_date ,count(user_id )
from Spending

##result
##返回结果就一行,distinct后多行,count一行,多行一行还是一行;count结果还是distinct前的数量
spend_date count(user_id )
2019-07-01 6

11 show status

https://blog.csdn.net/qq_29168493/article/details/79149132

查看当前数据库状态,可以统计当前数据库不同语句的执行频次

12 explain

获取sql执行计划,结果明细参考

https://cloud.tencent.com/developer/article/1093229

14 事务

http://m.biancheng.net/sql/transaction.html

15 递归

https://zhuanlan.zhihu.com/p/372330656

https://medium.com/swlh/recursion-in-sql-explained-graphically-679f6a0f143b

https://www.sqlservertutorial.net/sql-server-basics/sql-server-recursive-cte/

1
2
3
4
5
6
7
8
9
10
11
12
WITH expression_name (column_list)
AS
(
-- Anchor member
initial_query
UNION ALL
-- Recursive member that references expression_name.
recursive_query
)
-- references expression name
SELECT *
FROM expression_name

分析 https://zhuanlan.zhihu.com/p/372330656

R0 :

1
2
3
SELECT UserID,ManagerID,Name,Name AS ManagerName
FROM dbo.Employee
WHERE ManagerID=-1
userid managerid name managername
1 -1 boss boss

R1:

1
2
3
SELECT c.UserID,c.ManagerID,c.Name,p.Name AS ManagerName
FROM CTE P
INNER JOIN dbo.Employee c ON p.UserID=c.ManagerID

此时Employee为完整的,cte为

userid managerid name managername
1 -1 boss boss

得到结果为

c.userid c.managerid c.name c.managername p.userid p.managerid p.name p.managername
11 1 A1 A1 1 -1 boss boss
12 1 A2 A2 1 -1 boss boss
13 1 A3 A3 1 -1 boss boss
1
SELECT c.UserID,c.ManagerID,c.Name,p.Name AS ManagerName
userid managerid name name
11 1 A1 boss
12 1 A2 boss
13 1 A3 boss

R2,R3…

最后union all R0,R1,R2,。。。

16 select 常数

1
2
3
4
5
6
7
8
9
10
11
12
13
SELECT 1,-1,N'Boss'
UNION ALL
SELECT 11,1,N'A1'
UNION ALL
SELECT 12,1,N'A2'
UNION ALL
SELECT 13,1,N'A3'
UNION ALL
SELECT 111,11,N'B1'
UNION ALL
SELECT 112,11,N'B2'
UNION ALL
SELECT 121,12,N'C1'

1 -1 Boss —字段

1 -1 Boss
11 1 A1
12 1 A2
13 1 A3
111 11 B1
112 11 B2
121 12 C1

17 CTE

Common Table Expression

with …as…

18 触发器

就是做了某些操作,自动触发的行为

触发器是自动执行的,当用户对表中数据作了某些操作之后立即被触发。

https://blog.csdn.net/weixin_48033173/article/details/111713117

优化

1 汇总

https://blog.csdn.net/hguisu/article/details/5731629

https://www.analyticsvidhya.com/blog/2021/10/a-detailed-guide-on-sql-query-optimization/

https://blog.devart.com/how-to-optimize-sql-query.html#sql-query-optimization-basics

2 减少全表扫描

https://www.cnblogs.com/feiling/p/3393356.html

https://www.jianshu.com/p/03968ac9d8ad

3 创建索引

https://blog.csdn.net/happyheng/article/details/53143345

https://www.runoob.com/mysql/mysql-index.html

https://blog.csdn.net/wangfeijiu/article/details/113409719

0 作用

可以提高查询效率

和主键的区别,主键是特殊的索引,索引不一定是主键

https://blog.csdn.net/krismile__qh/article/details/98477484

https://blog.csdn.net/weixin_33375360/article/details/113371197

既然有主键为啥还要索引,关键在于这是两个东西,一个是为了唯一表示,一个是为了提高查询效率,底层也不同

https://cache.one/read/17347789

1 索引分类

聚集索引与非聚集索引

2 索引设计

http://c.biancheng.net/view/7366.html

3 常见操作

1、创建索引

创建表时指定索引

1
2
3
4
5
6
7
8
9
drop TABLE if EXISTS s1;
create table s1(
id int ,

age int,
email varchar(30),
index(id)

);

创建表后

CREATE INDEX 索引名 ON 表名(列的列表);/ALTER TABLE 表名 ADD INDEX 索引名 (列名1,列名2,…);

在navicat也可以指定索引

2、删除索引

DROP INDEX index_name ON talbe_name
ALTER TABLE table_name DROP INDEX index_name

3、查看索引

SHOW INDEX FROM table_name;

4、使用索引

建立了索引,底层查询效率变高了

查询语句编写上和原来一样,没有区别

https://blog.csdn.net/lenux2017/article/details/80086265

4 索引失效

https://www.cnblogs.com/technologykai/articles/14172224.html

4 视图

https://blog.csdn.net/talentluke/article/details/6420197

http://m.biancheng.net/sql/create-view.html

视图为虚拟的表,包含的不是数据而是sql查询

视图和表的主要区别在于:

  • 表占用物理存储空间,也包含真正的数据;
  • 视图不需要物理存储空间(除非您为视图添加索引),也不包含真正的数据,它只是从表中引用数据。

作用

  • 简化数据访问,让复杂的 SQL 语句简单化。用户只需要对视图写简单的代码就能返回需要的数据,一些复杂的逻辑放在视图中完成。
  • 防止敏感的字段被选中,同时仍然提供对其它重要数据的访问。
  • 可以对视图添加一些额外的索引,来提高查询的效率。

使用视图的时候跟表一样

和cte的区别

https://blog.csdn.net/happyboy53/article/details/2731420

子查询包含的是数据,将数据存在内存,而视图包含的不是数据而是sql查询

5 存储过程

SQL 语言层面的代码封装与重用

https://www.runoob.com/w3cnote/mysql-stored-procedure.html

函数

1 单行函数和多行函数

单行函数:单入单出

多行函数:多入单出,最常见的就是聚合函数

应该不存在单入多出,多入多出可以简化为单入单出的多次

https://blog.csdn.net/lailai186/article/details/12570899

注意:

多行,单行结果组合返回一行

例子:select column1 ,count(column2) ,只返回一行

问题来了,多多不一样呢,比如3和2,应该不存在

2 用户自定义函数

https://blog.csdn.net/qq_23833037/article/details/53170789

3 聚合函数

顾名思义,将数据聚集返回单一的值

https://blog.csdn.net/qq_40456829/article/details/83657396

4 开窗函数(Window Function)

https://segmentfault.com/a/1190000040088969

https://www.51cto.com/article/639541.html

https://blog.csdn.net/weixin_43412569/article/details/107992998

作用

行数保持不变

输入多行(一个窗口)、返回一个值

计算过程

当前行-》分区-》排序-》范围-》计算-》结果填入当前行

语法

1
2
3
4
window_function ([expression]) OVER (
[ PARTITION BY part_list ]
[ ORDER BY order_list ]
[ { ROWS | RANGE } BETWEEN frame_start AND frame_end ] )
  • expression

  • PARTITION BY

    表示将数据按 part_list 进行分区, 不加partition by 默认用全部(一个分区)

    partition by columns1,columns2

  • ORDER BY

    表示将各个分区内的数据按 order_list进行排序

  • ROWS / RANGE 决定数据范围

    https://blog.csdn.net/qq_42374697/article/details/115109386

分类

https://www.cnblogs.com/52xf/p/4209211.html

可以分为以下 3 类:

  • 聚合(Aggregate):AVG(), COUNT(), MIN(), MAX(), SUM()
1
2
3
4
5
6
7
8
9
10
11
sum(frequency) over(partiton by   num )  --分组累加
sum(frequency) over() --total_frequency 全部累加
sum(frequency) over(order by num desc) --desc_frequency, 逆序累加
sum(frequency) over(order by num asc) --asc_frequency,正序累加
SUM(Salary) OVER (PARTITION BY Id ORDER BY Month asc range 2 PRECEDING) --range 2 PRECEDING 当前以及前面2行

avg(frequency) over() --total_frequency ,全部平均
avg(frequency) over(order by num desc) --desc_frequency, 逆序平均
avg(frequency) over(order by num asc) --asc_frequency, 正序平均


https://blog.csdn.net/qq_54494937/article/details/119537881

https://leetcode-cn.com/problems/find-median-given-frequency-of-numbers/

https://blog.csdn.net/qq_54494937/article/details/119537881

5 数学运算函数

https://blog.csdn.net/a_lllll/article/details/87880389

abs

1
2
3
4
select  distinct C1.seat_id as seat_id
from Cinema C1 join Cinema C2
on C1.free=1 and C2.free=1 and abs(C1.seat_id-C2.seat_id)=1
order by seat_id

power

1
2
3
4
# Write your MySQL query statement below
select round(min(Power(Power(P1.x-P2.x,2)+Power(P1.y-P2.y,2),0.5)),2) as shortest
from Point2D P1 join Point2D P2
on P1.x!=P2.x or P1.y!=P2.y

数据划分,rdd分区

https://www.jianshu.com/p/3aa52ee3a802

https://blog.csdn.net/hjw199089/article/details/77938688

https://blog.csdn.net/mys_35088/article/details/80864092

https://blog.csdn.net/dmy1115143060/article/details/82620715

https://blog.csdn.net/xxd1992/article/details/85254666

https://blog.csdn.net/m0_46657040/article/details/108737350

https://blog.csdn.net/heiren_a/article/details/111954523

https://blog.csdn.net/u011564172/article/details/53611109

https://blog.csdn.net/qq_22473611/article/details/107822168

https://www.jianshu.com/p/3aa52ee3a802

1 application,job,stage,task,record

0 application

任务

1 Job

一个action 一个job

2 Stage

一个job根据rdd的依赖关系构建dag,根据dag划分stage,一个job包含一个或者多个stage

3 Task

stage根据rdd的分区数决定task数量

4 record

一个task 对应很多record,也就是多少行数据

例子

这里应该是 rdd的分区数决定task数量,task数量决定inputsplit数量,然后决定block的组合

https://juejin.cn/post/6844903848536965134

2 rdd分区

为什么分区?

分区的主要作用是用来实现并行计算,提高效率

分区方式

Spark包含两种数据分区方式:HashPartitioner(哈希分区)和RangePartitioner(范围分区)

分区数设置

https://justdodt.github.io/2018/04/23/Spark-RDD-%E7%9A%84%E5%88%86%E5%8C%BA%E6%95%B0%E9%87%8F%E7%A1%AE%E5%AE%9A/

spark模块

整个Spark 框架模块包含:Spark Core、Spark SQL、Spark Streaming、Spark GraphX、Spark MLlib,而后四项的能力都是建立在核心引擎之上

Spark Core:Spark的核心,Spark核心功能均由Spark Core模块提供,是Spark运行的基础。Spark Core以RDD为数据抽象,提供Python、Java、Scala、R语言的API,可以编程进行海量离线数据批处理计算。
SparkSQL:基于SparkCore之上,提供结构化数据的处理模块。SparkSQL支持以SQL语言对数据进行处理,SparkSQL本身针对离线计算场景。同时基于SparkSQL,Spark提供了StructuredStreaming模块,可以以SparkSQL为基础,进行数据的流式计算。

数据抽象:dataset(Java、Scala) dataframe(Java、Scala、Python、R)
SparkStreaming:以SparkCore为基础,提供数据的流式计算功能。
MLlib:以SparkCore为基础,进行机器学习计算,内置了大量的机器学习库和API算法等。方便用户以分布式计算的模式进行机器学习计算。
GraphX:以SparkCore为基础,进行图计算,提供了大量的图计算API,方便用于以分布式计算模式进行图计算。

Spark vs MapReduce

对比

https://www.educba.com/mapreduce-vs-spark/

MapReduce Spark
Product’s Category From the introduction, we understood that MapReduce enables the processing of data and hence is majorly a data processing engine. Spark, on the other hand, is a framework that drives complete analytical solutions or applications and hence making it an obvious choice for data scientists to use this as a data analytics engine.
Framework’s Performance and Data Processing In the case of MapReduce, reading and writing operations are performed from and to a disk thus leading to slowness in the processing speed. In Spark, the number of read/write cycles is minimized along with storing data in memory allowing it to be 10 times faster. But spark may suffer a major degradation if data doesn’t fit in memory.
Latency As a result of lesser performance than Spark, MapReduce has a higher latency in computing. Since Spark is faster, it enables developers with low latency computing.
Manageability of framework MapReduce being only a batch engine, other components must be handled separately yet synchronously thus making it difficult to manage. Spark is a complete data analytics engine, has the capability to perform batch, interactive streaming, and similar component all under the same cluster umbrella and thus easier to manage!
Real-time Analysis MapReduce was built mainly for batch processing and hence fails when used for real-time analytics use cases. Data coming from real-time live streams like Facebook, Twitter, etc. can be efficiently managed and processed in Spark.
Interactive Mode MapReduce doesn’t provide the gamut of having interactive mode. In spark it is possible to process the data interactively
Security MapReduce has accessibility to all features of Hadoop security and as a result of this, it is can be easily integrated with other projects of Hadoop Security. MapReduce also supports ASLs. In Spark, the security is by default set to OFF which might lead to a major security fallback. In the case of authentication, only the shared secret password method is possible in Spark.
Tolerance to Failure In case of crash of MapReduce process, the process is capable of starting from the place where it was left off earlier as it relies on Hard Drives rather than RAMs In case of crash of Spark process, the processing should start from the beginning and hence becomes less fault-tolerant than MapReduce as it relies of RAM usage.

spark为什么比MapReduce快

https://blog.csdn.net/JENREY/article/details/84873874

1 spark基于内存 ,mapreduce基于磁盘

指的是中间结果

MapReduce:通常需要将计算的中间结果写入磁盘,然后还要读取磁盘,从而导致了频繁的磁盘IO

Spark:不需要每次将计算的中间结果写入磁盘

2 spark粗粒度资源申请,MapReduce细粒度资源申请

spark 执行task不需要自己申请资源,提交任务的时候统一申请了

MapReduce 执行task任务的时候,task自己申请

3 spark基于多线程,mapreduce基于多进程

spark配置

1.设置方式

https://blog.51cto.com/u_16213328/7866422

优先级

hadoop设置???

2.代码中设置(SparkSession、SparkContext、HiveContext、SQLContext)

https://blog.csdn.net/weixin_43648241/article/details/108917865

SparkSession > SparkContext > HiveContext > SQLContext

SparkSession包含SparkContext

SparkContext包含HiveContext

HiveContext包含SQLContext

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
SparkSession.builder.\
config("hive.metastore.uris", "thrift://xxx.xx.x.xx:xxxx").\
config("spark.pyspark.python", "/opt/dm_python3/bin/python").\
config('spark.default.parallelism ', 10 ).\
config('spark.sql.shuffle.partitions', 200 ).\
config("spark.driver.maxResultSize", "16g").\
config("spark.port.maxRetries", "100").\
config("spark.driver.memory","16g").\
config("spark.yarn.queue", "dcp" ).\
config("spark.executor.memory", "16g" ).\
config( "spark.executor.cores", 20).\
config("spark.files", addfile).\
config( "spark.executor.instances", 6 ).\
config("spark.speculation", False).\
config( "spark.submit.pyFiles", zipfile).\
appName("testing").\
master("yarn").\
enableHiveSupport().\
getOrCreate()![11](D:\blog\blog\source\_posts\context\11.JPG)

:D 一言句子获取中...