flink cep

0 简介

类似的多个事件的组合,我们把它叫作“复杂事件”。对于复杂时间的处理,由于涉及到事件的严格顺序,有时还有时间约束,我们很难直接用 SQL或DataStream API来完成。于是只好放大招 派底层的处理函数( process function)上阵了。处理函数确实可以搞定这些需求,不过对于非常复杂的组合事件,我们可能需要设置很多状态、定时器,并在代码中定义各种条件分支( if else)逻辑来处理,复杂度会非常高,很可能会使代码失去可读性。怎 样处理这类复杂事件呢? Flink为我们提供了专门用于处理复杂事件的库 CEP,可以让我们更加轻松地解决这类棘手的问题。这在企业的实时风险控制中有非常重要的作用。

Complex Event Processing,flink 专门用来处理复杂事件的库

1 原理

cep底层是状态机

复杂事件可以通过设计状态机来处理,用户自己写容易出错,cep帮我们封装好,用户写顶层逻辑就可以了

2 核心步骤

总结起来,复杂事件处理(CEP)的流程可以分成三个步骤
(1)定义一个匹配规则
(2)将匹配规则应用到事件流上,检测满足规则的复杂事件
(3)对检测到的 复杂事件进行处理,得到结果进行输出

多流转换

简单划分的话,多流转换可以分为“分流”和“合流”两大类。在 Flink中,分流操作可以通过处理函数的侧输出流( side output)很容易地实现
而合流则提供不同层级的各种 API

任务:

stream数量 ,每个stream可以有多个子任务(并行度)

keyby只能算分组,不算分流

资源:

task manager数量,slot数量

1 分流

所谓“分流”,就是将一条数据流拆分成完全独立的两条、甚至多条流。也就是基于一个
DataStream,得到完全平等的多个子 DataStream,如图 8-1所示。一般来说,我们会定义一些
筛选条件,将符合条件的数据拣选出来放到对应的流里。

在Flink 1.13版本中,已经弃用了 .split()方法,取而代之的是直接用处理函数( process function)的侧输出流 (side output)。

2 合流

既然一条流可以分开,自然多条流就可以合并。在实际应用中,我们经常会遇到来源不同
的多条流,需要将它们的数据进行联合处理。所以 Flink中合流的操作会更加普遍,对应的
API也更加丰富。

2.1 基本合流

1 联合( Union)

可以多条(大于2)合并,数据类型必须一致

2 连接( Connect)

必须两条,数据类型可以不同

2.2 双流联结 join

对于两条流的合并,很多情况我们并不是简单地将所有数据放在一起,而是希望根据某个
字段的值将它们联结起来,“配对”去做处理。

1 窗口联结( Window Join

2 间隔联结( Interval Join

3 窗口同组联结( Window CoGroup

处理函数(process funtion)

处理函数位于底层,操作麻烦,但是使用更加灵活,是flink的“核武器”,轻易不用,但是一定行。

在处理函数中,我们直面的就是数据流中最基本的元素:数据事件(event)、状态 state
以及时间( time)。

https://blog.51cto.com/u_15349018/3698518

1 分类

8种不同的处理函数

每个处理函数使得的时候注意两个关键函数

1 processElement

必须

元素基本处理

2 onTimer()

非必须

就是设置定时器,然后触发操作

2 侧输出流( Side Output)

1 主流

collect

2 分流

处理函数的processElement或者onTimer中使用.output (outputTag,数据)

获取侧输出流

1
Stream.getSideOutput(outputTag)

watermark(水位线)

https://blog.csdn.net/lmalds/article/details/52704170

https://blog.csdn.net/lightupworld/article/details/116697831

1 介绍

在实际应用中,一般会采用事件时间语义

水位线可以看作在数据流中加入一个时钟标记,记录当前的事件时间;这个标记可以直接广播到下游,当下游任务收到这个标记,就可以更新自己的时钟了。

2 分类

1 有序流的水位线

方框是事件时间,虚线是水位线,箭头表示数据到达的顺序,比如事件时间为2的数据第一个到,事件时间为5的数据第二个到

2 乱序流的水位线

比水位线小的数据在后面出现,也就是说本应该在水位线之前出现的数据晚到了,就是迟到数据,迟到数据是丢弃的,比如w(9)后面的8,9

3 如何生成水位线

1 原则

我们知道,完美的水位线是“绝对正确”的,也就是一个水位线一旦出现,就表示这个时间之前的数据已经全部到齐、之后再也不会出现了。而完美的东西总是可望不可即,我们只能尽量去保证水位线的正确。

“等” 或者说“延迟”

为了均衡实时性(少等,会引入大量迟到数据)和准确性(减少迟到数据,多等)

2 怎么写

https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/dev/datastream/event-time/generating_watermarks/

1 assignTimestampsAndWatermarks

1
Datastream <event>    withTimestampsAndWatermarks = stream.assignTimestampsAndWatermarks(WatermarkStrategy)

WatermarkStrategy

内置

1
2
WatermarkStrategy.forMonotonousTimestamps
WatermarkStrategy.forBoundedOutOfOrderness

自定义

先实现接口WatermarkGenerator,然后改写一些东西

2 自定义数据源中发送水位线

注意:自定义数据源也可以用assignTimestampsAndWatermarks,这里是指在自定义数据源中实现自己的水位线发送,不用assignTimestampsAndWatermarks

4 水位线传递

上游并行度为4,下游并行度为3

策略:木桶原理,取得是最小的

为什么木桶原则,以准为第一要义,举个例子图2,如果不是3,是4,那么3不就迟到了吗

时间语义(Notions of Time)

https://blog.csdn.net/lomodays207/article/details/109642581

在Flink中,由于处理时间比较简单,早期版本默认的时间语义是处理时间;而考虑到事件时间在实际应用中更为广泛,从 1.12版本 开始 Flink已经将 事件时间作为了默认的时间语义。

1.处理时间( Processing Time)

处理时间的概念非常简单,就是指执行处理操作的机器的系统时间。

2.事件时间( Event Time)

事件时间,是指每个事件在对应的设备上发生的时间,也就是数据生成的时间。

3 摄入时间( Ingestion Time)

它是指数据进入 Flink数据流的时间,也就是 Source算子读入数据的时间。

处理无界和有界数据

https://flink.apache.org/zh/flink-architecture.html

任何类型的数据都可以形成一种事件流。信用卡交易、传感器测量、机器日志、网站或移动应用程序上的用户交互记录,所有这些数据都形成一种流。

数据可以被作为无界或者有界流来处理。

  1. 无界流 有定义流的开始,但没有定义流的结束。它们会无休止地产生数据。无界流的数据必须持续处理,即数据被摄取后需要立刻处理。我们不能等到所有数据都到达再处理,因为输入是无限的,在任何时候输入都不会完成。处理无界数据通常要求以特定顺序摄取事件,例如事件发生的顺序,以便能够推断结果的完整性。
  2. 有界流 有定义流的开始,也有定义流的结束。有界流可以在摄取所有数据后再进行计算。有界流所有数据可以被排序,所以并不需要有序摄取。有界流处理通常被称为批处理

1

Apache Flink 擅长处理无界和有界数据集精确的时间控制和状态化使得 Flink 的运行时(runtime)能够运行任何处理无界流的应用。有界流则由一些专为固定大小数据集特殊设计的算法和数据结构进行内部处理,产生了出色的性能。

Flink程序构成部分

⚫ 获取执行环境( execution environment)
⚫ 读取数据源( source)
⚫ 定义基于数据的转换操作( transformations)
⚫ 定义计算结果的输出位置( sink)

1 source

1 从集合中读取数据

最简单的读取数据的方式,就是在代码中直接创建一个Java集合,然后调用执行环境fromCollection方法进行读取。这相当于将数据临时存储到内存中,形成特殊的数据结构后,作为数据源使用,一般用于测试。

2 从文件读取数据

真正的实际应用中,自然不会直接将数据写在代码中。通常情况下,我们会从存储介质中获取数据,一个比较常见的方式就是读取日志文件。这也是批处理中最常见的读取方式。

1
DataStream<String> stream = env.readTextFile("clicks .csv“);

3 socket

https://www.jianshu.com/p/cb26a0f6c622

socket文本流的读取需要配置两个参数:发送端主机名和端口

文本流数据的发送,可以通过 Linux系统自带的 netcat工具进行模拟。

1
nc -lk 7777

4 kafka

5 自定义 Source

3 sink

1 输出到文件

2 输出到 Kafka

3 输出到 Redis

4 输出到 Elasticsearch

5 输出到 MySQL (JDBC)

6 自定义 Sink输出


:D 一言句子获取中...