flink cdc

CDC是 Change Data Capture(变更数据获取 )的简称。 核心思想是,监测并捕获数据库的变动(包括数据或数据表的插入 、 更新 以及 删除等),将这些变更按发生的顺序完整记录下来,写入到消息中间件中以供其他服务进行订阅及消费。

Flink社区开发了 flink-cdc-connectors 组件,这是一个可以直接从 MySQL、 PostgreSQL 等数据库直接 读取全量数据 和 增量变更数据 的 source 组件。

说白了就是连接数据库,然后实时监控变化

并行度设置

https://blog.csdn.net/hongzhen91/article/details/90812686

一个任务的并行实例(线程)数目就被称为该任务的并行度

并行度设置层次

1 Operator Level(算子层次)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
setParallelism

reduce(new ReduceFunction<Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> reduce(Tuple2<String, Long> value1, Tuple2<String, Long> value2) throws Exception {
// 将累加器更新为当前最大的pv统计值,然后向下游发送累加器的值
return value1.f1 > value2.f1 ? value1 : value2;
}
}).setParallelism(5)
.print();

(Mary,1)
(Bob,1)
(Mary,2)
(Bob,2)
(Mary,3)
(Bob,3)
(Mary,4)
(Bob,4)


keyBy(r -> true) // 为每一条数据分配同一个key,将聚合结果发送到一条流中去
.reduce(new ReduceFunction<Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> reduce(Tuple2<String, Long> value1, Tuple2<String, Long> value2) throws Exception {
// 将累加器更新为当前最大的pv统计值,然后向下游发送累加器的值
return value1.f1 > value2.f1 ? value1 : value2;
}
})
.print().setParallelism(5);


2> (Bob,2)
1> (Mary,2)
1> (Bob,4)
3> (Mary,3)
5> (Bob,1)
5> (Mary,4)
4> (Mary,1)
4> (Bob,3)

2Execution Environment Level(执行环境层次)

3Client Level(客户端层次)

4System Level(系统层次)

优先级1>2>3>4

物理分区

https://www.cnblogs.com/wdh01/p/16038278.html

首先和逻辑分区区别开,逻辑分区包括keyBy等算子

逻辑分区只不过将数据按照key分组,哪个key分到哪个task,系统自动控制,万一分配不均,会发生数据倾斜

物理分区就是按一定逻辑将数据分配到不同Task,可以缓解数据倾斜

source(1)-》不同物理分区方式(3)-》slot

分类

1 随机分区 random

2 轮询分区round-robin

3 重缩放分区 rescale

4 分局分区 global

5 自定义 custom

6 广播

不完全算物理分区方式

流批选择

之前版本

1
2
3
4
//流
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//批
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

现在版本

通过执行模式 execution mode选择

1 流处理 streaming 默认

2 批处理 batch

3 自动 automatic

(1) 通过命令行

1
flink run -Dexecution.runtime-mode=BATCH/../..

(2)代码

1
2
3
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

datastream

1 转换算子 Transformation

function分类:普通的,rich

怎么写function:

  1. 自定义
  2. 匿名类
  3. lambda表达式
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
package com.atguigu.chapter05;

/**
* Copyright (c) 2020-2030 尚硅谷 All Rights Reserved
* <p>
* Project: FlinkTutorial
* <p>
* Created by wushengran
*/

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class TransReturnTypeTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);

DataStreamSource<Event> clicks = env.fromElements(
new Event("Mary", "./home", 1000L),
new Event("Bob", "./cart", 2000L)
);

// 想要转换成二元组类型,需要进行以下处理
// 1) 使用显式的 ".returns(...)"
DataStream<Tuple2<String, Long>> stream3 = clicks
.map( event -> Tuple2.of(event.user, 1L) )
.returns(Types.TUPLE(Types.STRING, Types.LONG));
stream3.print();


// 2) 使用类来替代Lambda表达式
clicks.map(new MyTuple2Mapper())
.print();

// 3) 使用匿名类来代替Lambda表达式
clicks.map(new MapFunction<Event, Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> map(Event value) throws Exception {
return Tuple2.of(value.user, 1L);
}
}).print();

env.execute();
}

// 自定义MapFunction的实现类
public static class MyTuple2Mapper implements MapFunction<Event, Tuple2<String, Long>>{
@Override
public Tuple2<String, Long> map(Event value) throws Exception {
return Tuple2.of(value.user, 1L);
}
}
}

max maxby 区别

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
DataStreamSource<Event> stream = env.fromElements(
new Event("Mary", "./home", 5000L),
new Event("Bob", "./cart", 2000L),
new Event("Mary", "./cart", 3000L),
new Event("ss", "./fav", 4000L),
new Event("Mary", "./fav", 10000L)
);

stream.keyBy(e -> e.user)
// .maxBy("timestamp")
.maxBy("timestamp") // 指定字段名称
.print("maxBy:");
stream.keyBy(e -> e.user)
// .("timestamp")
.max("timestamp") // 指定字段名称
.print("max:");


max:> Event{user='Mary', url='./home', timestamp=1970-01-01 08:00:05.0}
maxBy:> Event{user='Mary', url='./home', timestamp=1970-01-01 08:00:05.0}
max:> Event{user='Bob', url='./cart', timestamp=1970-01-01 08:00:02.0}
maxBy:> Event{user='Bob', url='./cart', timestamp=1970-01-01 08:00:02.0}
max:> Event{user='Mary', url='./home', timestamp=1970-01-01 08:00:05.0}
max:> Event{user='ss', url='./fav', timestamp=1970-01-01 08:00:04.0}
maxBy:> Event{user='Mary', url='./home', timestamp=1970-01-01 08:00:05.0}
max:> Event{user='Mary', url='./home', timestamp=1970-01-01 08:00:10.0}
maxBy:> Event{user='ss', url='./fav', timestamp=1970-01-01 08:00:04.0}
maxBy:> Event{user='Mary', url='./fav', timestamp=1970-01-01 08:00:10.0}


max部分替换,maxby全部替换

2 窗口

1 简介

窗口[0-10)中有11,12,但是11,12并不在窗口[0-10)处理,而是在对应的窗口[10,20)处理

2 窗口的分类

  1. 按照驱动类型分类

(1)时间窗口 Time Window

(2)计数窗口 Count Window

  1. 按照窗口分配数据的规则分类

(1)滚动窗口 Tumbling Windows

(2)滑动窗口 Sliding Windows

(3)会话窗口 Session Windows

(4)全局窗口 Global Windows

3 使用

4 迟到数据的处理

窗口中 的迟到数据默认会被丢弃,这导致计算结果不够准确

1 设置水位线延迟时间

1
2
3
4
5
6
7
assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(2))
.withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
@Override
public long extractTimestamp(Event element, long recordTimestamp) {
return element.timestamp;
}
}));

2 允许窗口处理迟到数据

1
.allowedLateness(Time.minutes(1))

3 将迟到数据放入侧输出流

收集关窗之后的迟到数据,然后手动处理

1
.sideOutputLateData(outputTag)

Table API和SQL

https://blog.csdn.net/weixin_45366499/article/details/115449175

0 原理

1 动态表

flink中的表是动态表

静态表:hive,mysql等

动态表:不断更新

2 持续查询

1 简介

Apache Flink 有两种关系型 API 来做流批统一处理:Table API 和 SQL。

Table API 是用于 Scala 和 Java 语言的查询 API,它可以用一种非常直观的方式来 组合使用选取、过滤、join 等关系型算子。

1
2
3
Table maryClickTable = eventTable
.where($("user").isEqual("alice"))
.select($("url"), $("user"));

SQL 是基于 Apache Calcite 来实现的标准 SQL

1
2
3
Table urlCountTable = tableEnv.sqlQuery(
"SELECT user, COUNT(url) FROM EventTable GROUP BY user"
);

2 框架

表环境和流执行环境不同

3 流表相互转化

stream 《——》table

1
2
3
4
5
6
tableEnv表环境
// 将数据流eventstream转换成表eventTable
Table eventTable = tableEnv.fromDataStream(eventstream);

// 将表visitTable转换成数据流,打印输出
tableEnv.toDataStream(visitTable).print();

4 连接外部系统

可以在创建表的时候用 WITH子句指定连接器connector

5 客户端

./bin/sql client.sh

6 时间属性

事件事件、处理事件

  1. 在创建表的 DDL中定义
  2. 在数据流转换为表时定义

7 窗口

状态编程

0 状态管理机制

1 算子任务分类

1 无状态

2 有状态

2 状态分类

Flink 有两种状态:托管状态(Managed State)和原始状态(Raw State)。一般情况使用托管状态,只有在托管状态无法实现特殊需求,才会使用原始转态,一般情况不使用。

托管状态分类:算子状态(Operator State)和按键分区状态(Keyed State)

1 按键分区状态

2 算子状态

3 广播状态 Broadcast State

特殊的算子状态

3 状态持久化

对状态进行持久化( persistence)保存,这样就可以在发生故障后进行重启恢复。

flink状态持久化方式:写入一个“检查点”( checkpoint)或者保存点 savepoint
保存到外部存储系统中。具体的存储介质,一般是分布式文件系统( distributed file system)。

4 状态后端 State Backends

在Flink中,状态的存储、访问以及维护,都是由一个可插拔的组件决定的,这个组件就
叫作状态后端( state backend)。状态后端主要负责两件事:一是本地的状态管理,二是将检查
点( checkpoint)写入远程的 持久化存储。

flink cep

0 简介

类似的多个事件的组合,我们把它叫作“复杂事件”。对于复杂时间的处理,由于涉及到事件的严格顺序,有时还有时间约束,我们很难直接用 SQL或DataStream API来完成。于是只好放大招 派底层的处理函数( process function)上阵了。处理函数确实可以搞定这些需求,不过对于非常复杂的组合事件,我们可能需要设置很多状态、定时器,并在代码中定义各种条件分支( if else)逻辑来处理,复杂度会非常高,很可能会使代码失去可读性。怎 样处理这类复杂事件呢? Flink为我们提供了专门用于处理复杂事件的库 CEP,可以让我们更加轻松地解决这类棘手的问题。这在企业的实时风险控制中有非常重要的作用。

Complex Event Processing,flink 专门用来处理复杂事件的库

1 原理

cep底层是状态机

复杂事件可以通过设计状态机来处理,用户自己写容易出错,cep帮我们封装好,用户写顶层逻辑就可以了

2 核心步骤

总结起来,复杂事件处理(CEP)的流程可以分成三个步骤
(1)定义一个匹配规则
(2)将匹配规则应用到事件流上,检测满足规则的复杂事件
(3)对检测到的 复杂事件进行处理,得到结果进行输出

多流转换

简单划分的话,多流转换可以分为“分流”和“合流”两大类。在 Flink中,分流操作可以通过处理函数的侧输出流( side output)很容易地实现
而合流则提供不同层级的各种 API

任务:

stream数量 ,每个stream可以有多个子任务(并行度)

keyby只能算分组,不算分流

资源:

task manager数量,slot数量

1 分流

所谓“分流”,就是将一条数据流拆分成完全独立的两条、甚至多条流。也就是基于一个
DataStream,得到完全平等的多个子 DataStream,如图 8-1所示。一般来说,我们会定义一些
筛选条件,将符合条件的数据拣选出来放到对应的流里。

在Flink 1.13版本中,已经弃用了 .split()方法,取而代之的是直接用处理函数( process function)的侧输出流 (side output)。

2 合流

既然一条流可以分开,自然多条流就可以合并。在实际应用中,我们经常会遇到来源不同
的多条流,需要将它们的数据进行联合处理。所以 Flink中合流的操作会更加普遍,对应的
API也更加丰富。

2.1 基本合流

1 联合( Union)

可以多条(大于2)合并,数据类型必须一致

2 连接( Connect)

必须两条,数据类型可以不同

2.2 双流联结 join

对于两条流的合并,很多情况我们并不是简单地将所有数据放在一起,而是希望根据某个
字段的值将它们联结起来,“配对”去做处理。

1 窗口联结( Window Join

2 间隔联结( Interval Join

3 窗口同组联结( Window CoGroup

处理函数(process funtion)

处理函数位于底层,操作麻烦,但是使用更加灵活,是flink的“核武器”,轻易不用,但是一定行。

在处理函数中,我们直面的就是数据流中最基本的元素:数据事件(event)、状态 state
以及时间( time)。

https://blog.51cto.com/u_15349018/3698518

1 分类

8种不同的处理函数

每个处理函数使得的时候注意两个关键函数

1 processElement

必须

元素基本处理

2 onTimer()

非必须

就是设置定时器,然后触发操作

2 侧输出流( Side Output)

1 主流

collect

2 分流

处理函数的processElement或者onTimer中使用.output (outputTag,数据)

获取侧输出流

1
Stream.getSideOutput(outputTag)

:D 一言句子获取中...