流批选择

之前版本

1
2
3
4
//流
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//批
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

现在版本

通过执行模式 execution mode选择

1 流处理 streaming 默认

2 批处理 batch

3 自动 automatic

(1) 通过命令行

1
flink run -Dexecution.runtime-mode=BATCH/../..

(2)代码

1
2
3
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

hql增删改查

1增

insert

load

2删

1 表

Drop 表结构都没有了

1
DROP TABLE IF EXISTS employee;

2 记录

没有DELETE

TRUNCATE

所有记录

truncate table employees;

INSERT OVERWRITE

1
2
INSERT OVERWRITE TABLE dpc_test SELECT * FROM dpc_test WHERE age is not null;

3改

1 update

针对记录

1
update student set id='444' where name='tom';

2 Alter

表结构

4查

select

建表

1
2
3
4
5
6
7
CREATE TABLE IF NOT EXISTS `runoob_tbl`(
`runoob_id` INT UNSIGNED AUTO_INCREMENT,
`runoob_title` VARCHAR(100) NOT NULL,
`runoob_author` VARCHAR(40) NOT NULL,
`submission_date` DATE,
PRIMARY KEY ( `runoob_id` )
)ENGINE=InnoDB DEFAULT CHARSET=utf8;

字段数据类型

https://www.w3school.com.cn/sql/sql_datatypes.asp

array

https://www.educba.com/array-in-sql/

约束(Constraints)

  • NOT NULL - 指示某列不能存储 NULL 值。
  • UNIQUE - 保证某列的每行必须有唯一的值。
  • PRIMARY KEY - NOT NULL 和 UNIQUE 的结合。确保某列(或两个列多个列的结合)有唯一标识,有助于更容易更快速地找到表中的一个特定的记录。
  • FOREIGN KEY - 保证一个表中的数据匹配另一个表中的值的参照完整性。
  • CHECK - 保证列中的值符合指定的条件。
  • DEFAULT - 规定没有给列赋值时的默认值。

自增字段

AUTO INCREMENT

1
2
3
4
5
6
7
8
9
CREATE TABLE Persons
(
ID int NOT NULL AUTO_INCREMENT,
LastName varchar(255) NOT NULL,
FirstName varchar(255),
Address varchar(255),
City varchar(255),
PRIMARY KEY (ID)
)

开始值是 1,每条新记录递增 1

datastream

1 转换算子 Transformation

function分类:普通的,rich

怎么写function:

  1. 自定义
  2. 匿名类
  3. lambda表达式
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
package com.atguigu.chapter05;

/**
* Copyright (c) 2020-2030 尚硅谷 All Rights Reserved
* <p>
* Project: FlinkTutorial
* <p>
* Created by wushengran
*/

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class TransReturnTypeTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);

DataStreamSource<Event> clicks = env.fromElements(
new Event("Mary", "./home", 1000L),
new Event("Bob", "./cart", 2000L)
);

// 想要转换成二元组类型,需要进行以下处理
// 1) 使用显式的 ".returns(...)"
DataStream<Tuple2<String, Long>> stream3 = clicks
.map( event -> Tuple2.of(event.user, 1L) )
.returns(Types.TUPLE(Types.STRING, Types.LONG));
stream3.print();


// 2) 使用类来替代Lambda表达式
clicks.map(new MyTuple2Mapper())
.print();

// 3) 使用匿名类来代替Lambda表达式
clicks.map(new MapFunction<Event, Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> map(Event value) throws Exception {
return Tuple2.of(value.user, 1L);
}
}).print();

env.execute();
}

// 自定义MapFunction的实现类
public static class MyTuple2Mapper implements MapFunction<Event, Tuple2<String, Long>>{
@Override
public Tuple2<String, Long> map(Event value) throws Exception {
return Tuple2.of(value.user, 1L);
}
}
}

max maxby 区别

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
DataStreamSource<Event> stream = env.fromElements(
new Event("Mary", "./home", 5000L),
new Event("Bob", "./cart", 2000L),
new Event("Mary", "./cart", 3000L),
new Event("ss", "./fav", 4000L),
new Event("Mary", "./fav", 10000L)
);

stream.keyBy(e -> e.user)
// .maxBy("timestamp")
.maxBy("timestamp") // 指定字段名称
.print("maxBy:");
stream.keyBy(e -> e.user)
// .("timestamp")
.max("timestamp") // 指定字段名称
.print("max:");


max:> Event{user='Mary', url='./home', timestamp=1970-01-01 08:00:05.0}
maxBy:> Event{user='Mary', url='./home', timestamp=1970-01-01 08:00:05.0}
max:> Event{user='Bob', url='./cart', timestamp=1970-01-01 08:00:02.0}
maxBy:> Event{user='Bob', url='./cart', timestamp=1970-01-01 08:00:02.0}
max:> Event{user='Mary', url='./home', timestamp=1970-01-01 08:00:05.0}
max:> Event{user='ss', url='./fav', timestamp=1970-01-01 08:00:04.0}
maxBy:> Event{user='Mary', url='./home', timestamp=1970-01-01 08:00:05.0}
max:> Event{user='Mary', url='./home', timestamp=1970-01-01 08:00:10.0}
maxBy:> Event{user='ss', url='./fav', timestamp=1970-01-01 08:00:04.0}
maxBy:> Event{user='Mary', url='./fav', timestamp=1970-01-01 08:00:10.0}


max部分替换,maxby全部替换

2 窗口

1 简介

窗口[0-10)中有11,12,但是11,12并不在窗口[0-10)处理,而是在对应的窗口[10,20)处理

2 窗口的分类

  1. 按照驱动类型分类

(1)时间窗口 Time Window

(2)计数窗口 Count Window

  1. 按照窗口分配数据的规则分类

(1)滚动窗口 Tumbling Windows

(2)滑动窗口 Sliding Windows

(3)会话窗口 Session Windows

(4)全局窗口 Global Windows

3 使用

4 迟到数据的处理

窗口中 的迟到数据默认会被丢弃,这导致计算结果不够准确

1 设置水位线延迟时间

1
2
3
4
5
6
7
assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(2))
.withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
@Override
public long extractTimestamp(Event element, long recordTimestamp) {
return element.timestamp;
}
}));

2 允许窗口处理迟到数据

1
.allowedLateness(Time.minutes(1))

3 将迟到数据放入侧输出流

收集关窗之后的迟到数据,然后手动处理

1
.sideOutputLateData(outputTag)

flink优化

https://shopify.engineering/optimizing-apache-flink-applications-tips

https://cloud.tencent.com/developer/article/1897249

1 广播

https://blog.csdn.net/weixin_44318830/article/details/107678101

广播是一种操作

如果不使用广播,每一个 Task 都会拷贝一份数据集,造成内存资源浪费 ; 广播后,每个节点存一份,不同的Task 都可以在节点上获取到

1 广播变量

https://blog.csdn.net/yang_shibiao/article/details/118662134

2 广播流

BroadcastStream

3 广播状态

BroadcastState


:D 一言句子获取中...