flink cdc

CDC是 Change Data Capture(变更数据获取 )的简称。 核心思想是,监测并捕获数据库的变动(包括数据或数据表的插入 、 更新 以及 删除等),将这些变更按发生的顺序完整记录下来,写入到消息中间件中以供其他服务进行订阅及消费。

Flink社区开发了 flink-cdc-connectors 组件,这是一个可以直接从 MySQL、 PostgreSQL 等数据库直接 读取全量数据 和 增量变更数据 的 source 组件。

说白了就是连接数据库,然后实时监控变化

并行度设置

https://blog.csdn.net/hongzhen91/article/details/90812686

一个任务的并行实例(线程)数目就被称为该任务的并行度

并行度设置层次

1 Operator Level(算子层次)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
setParallelism

reduce(new ReduceFunction<Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> reduce(Tuple2<String, Long> value1, Tuple2<String, Long> value2) throws Exception {
// 将累加器更新为当前最大的pv统计值,然后向下游发送累加器的值
return value1.f1 > value2.f1 ? value1 : value2;
}
}).setParallelism(5)
.print();

(Mary,1)
(Bob,1)
(Mary,2)
(Bob,2)
(Mary,3)
(Bob,3)
(Mary,4)
(Bob,4)


keyBy(r -> true) // 为每一条数据分配同一个key,将聚合结果发送到一条流中去
.reduce(new ReduceFunction<Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> reduce(Tuple2<String, Long> value1, Tuple2<String, Long> value2) throws Exception {
// 将累加器更新为当前最大的pv统计值,然后向下游发送累加器的值
return value1.f1 > value2.f1 ? value1 : value2;
}
})
.print().setParallelism(5);


2> (Bob,2)
1> (Mary,2)
1> (Bob,4)
3> (Mary,3)
5> (Bob,1)
5> (Mary,4)
4> (Mary,1)
4> (Bob,3)

2Execution Environment Level(执行环境层次)

3Client Level(客户端层次)

4System Level(系统层次)

优先级1>2>3>4

物理分区

https://www.cnblogs.com/wdh01/p/16038278.html

首先和逻辑分区区别开,逻辑分区包括keyBy等算子

逻辑分区只不过将数据按照key分组,哪个key分到哪个task,系统自动控制,万一分配不均,会发生数据倾斜

物理分区就是按一定逻辑将数据分配到不同Task,可以缓解数据倾斜

source(1)-》不同物理分区方式(3)-》slot

分类

1 随机分区 random

2 轮询分区round-robin

3 重缩放分区 rescale

4 分局分区 global

5 自定义 custom

6 广播

不完全算物理分区方式

流批选择

之前版本

1
2
3
4
//流
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//批
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

现在版本

通过执行模式 execution mode选择

1 流处理 streaming 默认

2 批处理 batch

3 自动 automatic

(1) 通过命令行

1
flink run -Dexecution.runtime-mode=BATCH/../..

(2)代码

1
2
3
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

datastream

1 转换算子 Transformation

function分类:普通的,rich

怎么写function:

  1. 自定义
  2. 匿名类
  3. lambda表达式
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
package com.atguigu.chapter05;

/**
* Copyright (c) 2020-2030 尚硅谷 All Rights Reserved
* <p>
* Project: FlinkTutorial
* <p>
* Created by wushengran
*/

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class TransReturnTypeTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);

DataStreamSource<Event> clicks = env.fromElements(
new Event("Mary", "./home", 1000L),
new Event("Bob", "./cart", 2000L)
);

// 想要转换成二元组类型,需要进行以下处理
// 1) 使用显式的 ".returns(...)"
DataStream<Tuple2<String, Long>> stream3 = clicks
.map( event -> Tuple2.of(event.user, 1L) )
.returns(Types.TUPLE(Types.STRING, Types.LONG));
stream3.print();


// 2) 使用类来替代Lambda表达式
clicks.map(new MyTuple2Mapper())
.print();

// 3) 使用匿名类来代替Lambda表达式
clicks.map(new MapFunction<Event, Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> map(Event value) throws Exception {
return Tuple2.of(value.user, 1L);
}
}).print();

env.execute();
}

// 自定义MapFunction的实现类
public static class MyTuple2Mapper implements MapFunction<Event, Tuple2<String, Long>>{
@Override
public Tuple2<String, Long> map(Event value) throws Exception {
return Tuple2.of(value.user, 1L);
}
}
}

max maxby 区别

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
DataStreamSource<Event> stream = env.fromElements(
new Event("Mary", "./home", 5000L),
new Event("Bob", "./cart", 2000L),
new Event("Mary", "./cart", 3000L),
new Event("ss", "./fav", 4000L),
new Event("Mary", "./fav", 10000L)
);

stream.keyBy(e -> e.user)
// .maxBy("timestamp")
.maxBy("timestamp") // 指定字段名称
.print("maxBy:");
stream.keyBy(e -> e.user)
// .("timestamp")
.max("timestamp") // 指定字段名称
.print("max:");


max:> Event{user='Mary', url='./home', timestamp=1970-01-01 08:00:05.0}
maxBy:> Event{user='Mary', url='./home', timestamp=1970-01-01 08:00:05.0}
max:> Event{user='Bob', url='./cart', timestamp=1970-01-01 08:00:02.0}
maxBy:> Event{user='Bob', url='./cart', timestamp=1970-01-01 08:00:02.0}
max:> Event{user='Mary', url='./home', timestamp=1970-01-01 08:00:05.0}
max:> Event{user='ss', url='./fav', timestamp=1970-01-01 08:00:04.0}
maxBy:> Event{user='Mary', url='./home', timestamp=1970-01-01 08:00:05.0}
max:> Event{user='Mary', url='./home', timestamp=1970-01-01 08:00:10.0}
maxBy:> Event{user='ss', url='./fav', timestamp=1970-01-01 08:00:04.0}
maxBy:> Event{user='Mary', url='./fav', timestamp=1970-01-01 08:00:10.0}


max部分替换,maxby全部替换

2 窗口

1 简介

窗口[0-10)中有11,12,但是11,12并不在窗口[0-10)处理,而是在对应的窗口[10,20)处理

2 窗口的分类

  1. 按照驱动类型分类

(1)时间窗口 Time Window

(2)计数窗口 Count Window

  1. 按照窗口分配数据的规则分类

(1)滚动窗口 Tumbling Windows

(2)滑动窗口 Sliding Windows

(3)会话窗口 Session Windows

(4)全局窗口 Global Windows

3 使用

4 迟到数据的处理

窗口中 的迟到数据默认会被丢弃,这导致计算结果不够准确

1 设置水位线延迟时间

1
2
3
4
5
6
7
assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(2))
.withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
@Override
public long extractTimestamp(Event element, long recordTimestamp) {
return element.timestamp;
}
}));

2 允许窗口处理迟到数据

1
.allowedLateness(Time.minutes(1))

3 将迟到数据放入侧输出流

收集关窗之后的迟到数据,然后手动处理

1
.sideOutputLateData(outputTag)

flink优化

https://shopify.engineering/optimizing-apache-flink-applications-tips

https://cloud.tencent.com/developer/article/1897249

1 广播

https://blog.csdn.net/weixin_44318830/article/details/107678101

广播是一种操作

如果不使用广播,每一个 Task 都会拷贝一份数据集,造成内存资源浪费 ; 广播后,每个节点存一份,不同的Task 都可以在节点上获取到

1 广播变量

https://blog.csdn.net/yang_shibiao/article/details/118662134

2 广播流

BroadcastStream

3 广播状态

BroadcastState

Table API和SQL

https://blog.csdn.net/weixin_45366499/article/details/115449175

0 原理

1 动态表

flink中的表是动态表

静态表:hive,mysql等

动态表:不断更新

2 持续查询

1 简介

Apache Flink 有两种关系型 API 来做流批统一处理:Table API 和 SQL。

Table API 是用于 Scala 和 Java 语言的查询 API,它可以用一种非常直观的方式来 组合使用选取、过滤、join 等关系型算子。

1
2
3
Table maryClickTable = eventTable
.where($("user").isEqual("alice"))
.select($("url"), $("user"));

SQL 是基于 Apache Calcite 来实现的标准 SQL

1
2
3
Table urlCountTable = tableEnv.sqlQuery(
"SELECT user, COUNT(url) FROM EventTable GROUP BY user"
);

2 框架

表环境和流执行环境不同

3 流表相互转化

stream 《——》table

1
2
3
4
5
6
tableEnv表环境
// 将数据流eventstream转换成表eventTable
Table eventTable = tableEnv.fromDataStream(eventstream);

// 将表visitTable转换成数据流,打印输出
tableEnv.toDataStream(visitTable).print();

4 连接外部系统

可以在创建表的时候用 WITH子句指定连接器connector

5 客户端

./bin/sql client.sh

6 时间属性

事件事件、处理事件

  1. 在创建表的 DDL中定义
  2. 在数据流转换为表时定义

7 窗口


:D 一言句子获取中...