datastream

1 转换算子 Transformation

function分类:普通的,rich

怎么写function:

  1. 自定义
  2. 匿名类
  3. lambda表达式
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
package com.atguigu.chapter05;

/**
* Copyright (c) 2020-2030 尚硅谷 All Rights Reserved
* <p>
* Project: FlinkTutorial
* <p>
* Created by wushengran
*/

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class TransReturnTypeTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);

DataStreamSource<Event> clicks = env.fromElements(
new Event("Mary", "./home", 1000L),
new Event("Bob", "./cart", 2000L)
);

// 想要转换成二元组类型,需要进行以下处理
// 1) 使用显式的 ".returns(...)"
DataStream<Tuple2<String, Long>> stream3 = clicks
.map( event -> Tuple2.of(event.user, 1L) )
.returns(Types.TUPLE(Types.STRING, Types.LONG));
stream3.print();


// 2) 使用类来替代Lambda表达式
clicks.map(new MyTuple2Mapper())
.print();

// 3) 使用匿名类来代替Lambda表达式
clicks.map(new MapFunction<Event, Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> map(Event value) throws Exception {
return Tuple2.of(value.user, 1L);
}
}).print();

env.execute();
}

// 自定义MapFunction的实现类
public static class MyTuple2Mapper implements MapFunction<Event, Tuple2<String, Long>>{
@Override
public Tuple2<String, Long> map(Event value) throws Exception {
return Tuple2.of(value.user, 1L);
}
}
}

max maxby 区别

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
DataStreamSource<Event> stream = env.fromElements(
new Event("Mary", "./home", 5000L),
new Event("Bob", "./cart", 2000L),
new Event("Mary", "./cart", 3000L),
new Event("ss", "./fav", 4000L),
new Event("Mary", "./fav", 10000L)
);

stream.keyBy(e -> e.user)
// .maxBy("timestamp")
.maxBy("timestamp") // 指定字段名称
.print("maxBy:");
stream.keyBy(e -> e.user)
// .("timestamp")
.max("timestamp") // 指定字段名称
.print("max:");


max:> Event{user='Mary', url='./home', timestamp=1970-01-01 08:00:05.0}
maxBy:> Event{user='Mary', url='./home', timestamp=1970-01-01 08:00:05.0}
max:> Event{user='Bob', url='./cart', timestamp=1970-01-01 08:00:02.0}
maxBy:> Event{user='Bob', url='./cart', timestamp=1970-01-01 08:00:02.0}
max:> Event{user='Mary', url='./home', timestamp=1970-01-01 08:00:05.0}
max:> Event{user='ss', url='./fav', timestamp=1970-01-01 08:00:04.0}
maxBy:> Event{user='Mary', url='./home', timestamp=1970-01-01 08:00:05.0}
max:> Event{user='Mary', url='./home', timestamp=1970-01-01 08:00:10.0}
maxBy:> Event{user='ss', url='./fav', timestamp=1970-01-01 08:00:04.0}
maxBy:> Event{user='Mary', url='./fav', timestamp=1970-01-01 08:00:10.0}


max部分替换,maxby全部替换

2 窗口

1 简介

窗口[0-10)中有11,12,但是11,12并不在窗口[0-10)处理,而是在对应的窗口[10,20)处理

2 窗口的分类

  1. 按照驱动类型分类

(1)时间窗口 Time Window

(2)计数窗口 Count Window

  1. 按照窗口分配数据的规则分类

(1)滚动窗口 Tumbling Windows

(2)滑动窗口 Sliding Windows

(3)会话窗口 Session Windows

(4)全局窗口 Global Windows

3 使用

4 迟到数据的处理

窗口中 的迟到数据默认会被丢弃,这导致计算结果不够准确

1 设置水位线延迟时间

1
2
3
4
5
6
7
assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(2))
.withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
@Override
public long extractTimestamp(Event element, long recordTimestamp) {
return element.timestamp;
}
}));

2 允许窗口处理迟到数据

1
.allowedLateness(Time.minutes(1))

3 将迟到数据放入侧输出流

收集关窗之后的迟到数据,然后手动处理

1
.sideOutputLateData(outputTag)
Author

Lavine Hu

Posted on

2022-04-24

Updated on

2022-05-10

Licensed under

Comments

:D 一言句子获取中...