hdfs

1.原理

0 读写

https://blog.csdn.net/whdxjbw/article/details/81072207

1 文件格式

https://www.cnblogs.com/wqbin/p/14635480.html

Hadoop中的文件格式大致上分为面向行和面向列两类

面向行:同一行的数据存储在一起,即连续存储。SequenceFile,MapFile,Avro Datafile。采用这种方式,如果只需要访问行的一小部分数据,亦需要将整行读入内存,推迟序列化一定程度上可以缓解这个问题,但是从磁盘读取整行数据的开销却无法避免。面向行的存储适合于整行数据需要同时处理的情况。

面向列:整个文件被切割为若干列数据,每一列数据一起存储。Parquet , RCFile,ORCFile。面向列的格式使得读取数据时,可以跳过不需要的列,适合于只处于行的一小部分字段的情况。但是这种格式的读写需要更多的内存空间,因为需要缓存行在内存中(为了获取多行中的某一列)。同时不适合流式写入,因为一旦写入失败,当前文件无法恢复,而面向行的数据在写入失败时可以重新同步到最后一个同步点,所以Flume采用的是面向行的存储格式。

2.使用

上传文件

1.页面

2.命令行

https://blog.csdn.net/tandelin/article/details/89514784

3.问题

spark使用总结

小技巧

1.调试

先local 然后集群

可以用本地ide 或者jupyter 调试

2.把数据想象成表,本质就是对行操作

3.多少task就是多少砖,多少并行度就是有多少工人,需要设置合理

日志

如果是yarn client模式

1.driver

客户端可以查看

2.executor

yarn

web ui可以查看

命令行查看 yarn logs —applicationId XXX

spark

web ui可以查看

不同行保存到不同文件

增加字段 区分不同行

落盘的时候 partitionBy(“field1”,”field2”)

mysql

开机自启

配置Binlog

flinkcdc需要用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
1 sudo vim /etc/my.cnf
2 写入
server id = 1
log-bin=mysql-bin
binlog_format=row
binlog-do-db=gmall2021
binlog-do-db=gmall2021_realtime
注意:binlog-do-db 根据自己的情况进行修 改,指定具体要同步的数据库,多个就写多行
3 重启mysql
sudo systemctl restart mysqld
4 查看有没有生效
cd /var/lib
sudo ls -l mysql | grep bin
修改mysql库里数据,观察"mysql-bin.最新"文件大小的变化

问题

1 Job for mysqld.service failed because the control process exited with error code

https://blog.csdn.net/qq_41179691/article/details/104598293

2 navicat连不上服务器的mysql

https://blog.csdn.net/u014264373/article/details/85564524

https://blog.csdn.net/MTbaby/article/details/56836986

flink cdc

CDC是 Change Data Capture(变更数据获取 )的简称。 核心思想是,监测并捕获数据库的变动(包括数据或数据表的插入 、 更新 以及 删除等),将这些变更按发生的顺序完整记录下来,写入到消息中间件中以供其他服务进行订阅及消费。

Flink社区开发了 flink-cdc-connectors 组件,这是一个可以直接从 MySQL、 PostgreSQL 等数据库直接 读取全量数据 和 增量变更数据 的 source 组件。

说白了就是连接数据库,然后实时监控变化

nginx

0 原理

概念

正向代理:代理的是客户端

反向代理:代理的是服务器端

作用

https://zhuanlan.zhihu.com/p/54793789

1、静态HTTP服务器

2、反向代理服务器

3、负载均衡

ranker-> nginx ->qr

4、虚拟主机

5、FastCGI

1 使用

配置

conf/vhosts

监听老的端口 生成新的端口

启动

./nginx

问题

1 nginx: [emerg] bind() to 0.0.0.0:80 failed (98: Address already in use)

1
2
lsof -i :80 |grep nginx |grep -v grep|awk '{print $2}'
kill -9 XXX XXX

https://blog.csdn.net/fmwind/article/details/120786375

并行度设置

https://blog.csdn.net/hongzhen91/article/details/90812686

一个任务的并行实例(线程)数目就被称为该任务的并行度

并行度设置层次

1 Operator Level(算子层次)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
setParallelism

reduce(new ReduceFunction<Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> reduce(Tuple2<String, Long> value1, Tuple2<String, Long> value2) throws Exception {
// 将累加器更新为当前最大的pv统计值,然后向下游发送累加器的值
return value1.f1 > value2.f1 ? value1 : value2;
}
}).setParallelism(5)
.print();

(Mary,1)
(Bob,1)
(Mary,2)
(Bob,2)
(Mary,3)
(Bob,3)
(Mary,4)
(Bob,4)


keyBy(r -> true) // 为每一条数据分配同一个key,将聚合结果发送到一条流中去
.reduce(new ReduceFunction<Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> reduce(Tuple2<String, Long> value1, Tuple2<String, Long> value2) throws Exception {
// 将累加器更新为当前最大的pv统计值,然后向下游发送累加器的值
return value1.f1 > value2.f1 ? value1 : value2;
}
})
.print().setParallelism(5);


2> (Bob,2)
1> (Mary,2)
1> (Bob,4)
3> (Mary,3)
5> (Bob,1)
5> (Mary,4)
4> (Mary,1)
4> (Bob,3)

2Execution Environment Level(执行环境层次)

3Client Level(客户端层次)

4System Level(系统层次)

优先级1>2>3>4

物理分区

https://www.cnblogs.com/wdh01/p/16038278.html

首先和逻辑分区区别开,逻辑分区包括keyBy等算子

逻辑分区只不过将数据按照key分组,哪个key分到哪个task,系统自动控制,万一分配不均,会发生数据倾斜

物理分区就是按一定逻辑将数据分配到不同Task,可以缓解数据倾斜

source(1)-》不同物理分区方式(3)-》slot

分类

1 随机分区 random

2 轮询分区round-robin

3 重缩放分区 rescale

4 分局分区 global

5 自定义 custom

6 广播

不完全算物理分区方式


:D 一言句子获取中...