Apache Flink 学习:DataStream Api 中 Operators(算子)——窗口
Keyed Windows
1 | stream |
Non-Keyed Windows
1 | stream |
Window 生命周期
简而言之,当属于该窗口的第一个元素到达时,将会创建一个窗口,并且当时间(Event Time 或者 Processing Time)超过其结束时间戳加上用户指定的允许延迟时间时,将完全删除该窗口,注意窗口都是左开右闭,比如:[0, 5)。
Flink 保证只删除基于时间的窗口,而不删除其他类型的窗口,例如 Global Window。例如,使用基于事件时间的窗口,并且创建一个窗口大小为5分钟的滚动(Tumbing)窗口,并且允许延迟1分钟。当时间戳属于12:00到12:05之间的第一个元素到达时,Flink 将创建一个新窗口,当 Watermark 通过12:06时间戳时,就会把这个窗口删除。
此外,每个窗口都包含一个 Trigger 和一个函数(ProcessWindowFunction, ReduceFunction, AggregateFunction or FoldFunction)。函数包含了要应用于窗口内容的计算,而 Trigger 制定了什么情况下才触发执行这些函数。比如,触发策略可能类似于“当窗口中的元素数超过4时”或“当 WaterMark 通过窗口结束时”进行触发。Trigger 还可以决定什么时候删除窗口中的元素。
除上述内容外,您还可以指定一个 Evictor,该 Evictor 将能够在 Trigger 触发后、应用函数之前和/或之后从窗口中移除元素。
下面例子中的窗口都是按照 Event Time 或者 Processing Time进行指定。
Keyed vs Non-Keyed Windows
首先要指定的是是否应该为流设置 key。使用keyBy(…)可以将无限流拆分为逻辑 keyed 流。
在 Keyed Stream 的情况下,传入 event 的任何属性都可以用作键。拥有一个 Keyed Stream 将允许您的窗口计算由多个任务并行执行,因为每个逻辑 Keyed Stream 都可以独立于其他任务进行处理。所有引用同一个键的 event 都将被发送到同一个并行任务(通过 partitioner 完成)。
如果不是 Keyed Stream,则不会将原始流拆分为多个逻辑流,所有窗口逻辑将由单个任务执行,即并行度为1。
Tumbling Windows
滚动窗口,如果你指定的滚动窗口大小为一天计算一次,并且你需要更具你本地的时间的 00:00:00 开始,则必须按照时区来指定窗口。
可以看到上图中,无论是多少个 key,每个 key 的窗口的起始和截止时间都相同。
1 | /** |
Sliding Windows
滑动窗口,如果你指定窗口大小和滑动步长一样,那么和滚动窗口的作用一样,滑动窗口的一个明显的特征就是:窗口可能会重叠,即同一个元素可能会属于不同的窗口。
和滚动窗口相同,如果你指定的滚动窗口大小为一天计算一次,你需要更具你本地的时间的 00:00:00 开始,则必须按照时区来指定窗口。
1 | public static void main(String[] args) { |
Session Windows
会话窗口,根据会话来指定窗口,与滚动和滑动窗口相比,会话窗口不重叠,并且没有固定的开始和结束时间。会话窗口可以指定静态指定会话间隔,或者可以让用户动态指定会话间隔。
1 | DataStream<T> input = ...; |
Global Windows
全局窗口(即无窗口),代表所有元素斗数以一个全局窗口,如果你不指定 Trigger,那么永远也不会生产出数据,因为全局窗口没有窗口开始和窗口结束的概念。
1 | DataStream<T> input = ...; |
窗口函数
在定义 window assigner 之后,我们需要指定要在每个窗口上执行的计算。当系统确定窗口准备好处理时(Trigger决定),这些窗口函数就可以用于处理每个(Keyed / Non-Keyed)窗口的元素。
窗口函数可以是ReduceFunction、AggregateFunction、FoldFunction或ProcessWindowFunction之一。前两个函数执行起来会更高效,因为 Flink 可以在每个窗口中的元素到达时递增地聚合元素。ProcessWindowFunction获取包含在窗口中的所有元素的Iterable,以及有关元素所属窗口的其他元信息。
使用ProcessWindowFunction的窗口函数不能像其他函数那样高效地执行,因为Flink在调用函数之前必须在内部缓冲窗口的所有元素。这可以通过将ProcessWindowFunction与ReduceFunction、AggregateFunction或FoldFunction组合使用来提高效率,从而使得其他窗口元数据或者窗口元素的进行增量聚合。
ReduceFunction
变量:两个输入生成一个输出,三个变量的类型必须相同。
触发时间:在每个窗口的元素到来的时候进行增量聚合。
1 | DataStream<Tuple2<String, Long>> input = ...; |
AggregateFunction
AggregateFunction 是 ReduceFunction 的一个扩展版本。
变量:三个变量,一个是输入,一个是 accumulator 累加器,还有一个是输出,三个变量的类型可以不同。
触发时间:在每个窗口的元素到来的时候进行增量聚合。
1 | /** |
FoldFunction
FoldFunction 是 AggregateFunction 的一个简易版本。
变量:三个个变量,一个是输出值的初始化值,一个是输入,还有一个是输出,三个变量中输入和输出的类型可以不同,但是输出和输出初始化值必须相同。
触发时间:在每个窗口的元素到来的时候进行增量聚合。
FoldFunction 指定如何将窗口的输入元素与输出类型的元素组合。对添加到窗口的每个元素和当前输出值增量调用 FoldFunction。
1 | DataStream<Tuple2<String, Long>> input = ...; |
ProcessWindowFunction
ProcessWindowFunction 可以得到一个包含窗口的所有元素的迭代器,以及一个访问时间和状态信息的上下文对象,使得它能够提供比其他窗口函数更好的灵活性。
但是这是以性能和资源消耗为代价的,因为元素不能增量聚合,而是需要在内部缓冲,直到窗口可以处理为止。
变量:四个变量,一个是窗口的key,一个是包含了窗口信息的上下文,一个是窗口内所有元素的迭代器,一个是输出数据的收集器。
触发时间:窗口内有数据并且 Watermark 到达了窗口结束时间时触发。
1 | public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window> implements Function { |
1 | DataStream<Tuple2<String, Long>> input = ...; |
使用 ProcessWindowFunction 进行元素 count 是非常低效的,下面会讲到怎样将 ReduceFunction 或者 AggregateFunction 与 ProcessWindowFunction 结合使用将增量的数据的与 ProcessWindowFunction 配合进行使用。
为什么需要用到 ProcessWindowFunction:如果必要的话,一般的业务逻辑是没必要使用到 ProcessWindowFunction 的,但是有的需求需要获取到当前元素时间戳,窗口开始结束等等的信息,这时就需要使用 ProcessWindowFunction 来获取这些信息了。
ProcessWindowFunction 与 ReduceFunction
1 | DataStream<SensorReading> input = ...; |
ProcessWindowFunction 与 AggregateFunction
1 | DataStream<Tuple2<String, Long>> input = ...; |
ProcessWindowFunction 与 FoldFunction
1 | DataStream<SensorReading> input = ...; |
ProcessWindowFunction 中使用 state
WindowFunction(遗留)
在一些可以使用 ProcessWindowFunction 的地方,你也可以使用 WindowFunction,这是较旧版本的 ProcessWindowFunction,它提供的上下文信息较少,并且没有一些高级功能,例如 per-window keyed state。
1 | public interface WindowFunction<IN, OUT, KEY, W extends Window> extends Function, Serializable { |
1 | DataStream<Tuple2<String, Long>> input = ...; |
Triggers
触发器决定了窗口函数什么时候处理窗口中的数据。每一个 WindowAssigner 都会带有一个默认的 Trigger,如果默认的 Trigger 不符合需求,你可以使用 trigger(…) 指定你需要的触发器。
一个 Trigger 接口有五个方法,可以通过编写函数指定如何对不同的 event 做出相应。
Function | 作用 |
---|---|
TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) | 每向窗口添加一个元素时触发一次。 |
TriggerResult onEventTime(long time, W window, TriggerContext ctx) | Event Time timer 触发时调用。 |
TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) | Processing Time timer 调用时触发。 |
void onMerge(W window, OnMergeContext ctx) | 方法与有状态 Trigger 相关,并在两个触发器的相应窗口合并时合并它们的状态,例如在使用会话窗口时(会话窗口每添加一个元素就会产生一个窗口)。 |
void clear(W window, TriggerContext ctx) | 将当前窗口的 state 清除。 |
前三个函数通过 TriggerResult 决定如何处理它们的调用事件。
TriggerResult | 作用 |
---|---|
TriggerResult.CONTINUE | 什么都不做。 |
TriggerResult.FIRE | 触发计算。 |
TriggerResult.PURGE | 清除窗口内的元素。 |
TriggerResult.FIRE_AND_PURGE | 触发计算,并且在此之后清除窗口内的元素。 |
触发运算并且清除元素
WindowAssigners 的默认 Triggers
很多 WindowAssigners 的默认 Triggers是适用于很多场景的。例如,所有的 event-time window assigners 都将 EventTimeTrigger 作为默认的 Trigger。这个 Trigger 的作用就是当 Watermark 到达了窗口结束时间时就触发。
提示1:GlobalWindow 的默认 Trigger 是永远不会触发的 NeverTrigger。
提示2:通过使用 trigger() 指定触发器,您将覆盖 WindowAssigner 的默认触发器。例如,如果为 TumblingEventTimeWindows 指定 CountTrigger,则不会再根据时间进度而仅按 count 触发窗口。
通用的 Triggers
Trigger | 作用 |
---|---|
EventTimeTrigger | 根据由 Watermark 计算的 Event Time 进度触发。 |
ProcessingTimeTrigger | 根据 Processing Time 触发。 |
CountTrigger | 在窗口中的元素数超过给定限额时触发。 |
PurgingTrigger | 将另一个 Trigger 作为参数,并将其转换为清除触发器。 |
Evictors
Flink 的窗口模型中允许指定除 WindowAssigner 和 Trigger 之外的可选逐出器(Evictor)。可以使用exictor(…)方法指定。Exictor 能够在触发器触发之后,并在使用窗口函数之前或者之后移除元素。为此,逐出器接口有两个方法:
1 | /** |
Evictor | 作用 |
---|---|
CountEvictor | 保持窗口内元素数量符合用户指定数量,如果多于用户指定的数量,从窗口缓冲区的开头丢弃剩余的元素。 |
DeltaEvictor | 使用 DeltaFunction 和一个阈值,计算窗口缓冲区中的最后一个元素与其余每个元素之间的 delta 值,并删除 delta 值大于或等于阈值的元素。 |
TimeEvictor | 以毫秒为单位的时间间隔作为参数,对于给定的窗口,找到元素中的最大的时间戳max_ts,并删除时间戳小于max_ts - interval的所有元素。 |
默认情况下,都只会在执行 WindowFunction 之前执行 Evictor。
提示:Flink 不能保证窗口中元素的顺序。这意味着尽管逐出器可能会从窗口的开头移除元素,但这些元素不一定是最先到达或最后到达的元素。
Allowed Lateness
概念
使用 Event Time 窗口时,可能会发生元素到达晚的情况,即 Flink 用于跟踪 Event Time 进度的 Watermark 已超过元素所属窗口的结束时间戳。
默认情况下,当发现 Watermark 已经超过到达的元素所属的窗口结束时间时,将删除这个延迟元素。但是 Flink 可以给窗口算子指定一个最大允许延迟时间。Allowed lateness 指定元素在被删除之前可以延迟多少时间,其默认值为0。
在迟到元素到达时,如果 Watermark 大于其所属窗口的结束时间时,并且 Watermark 小于窗口结束时间加上 allowed lateness,这个迟到的元素仍然可以被加到这个窗口内进行运算。有的触发器会出现在延迟但是没有丢弃的元素到达时,使得窗口再次计算,比如 EventTimeTrigger。
提示1:在 assignTimestampsAndWatermarks 时有一个 maxOutOfOrderness 的概念,maxOutOfOrderness 是生成 Watermark 所需要的,是指元素最大无序时间。而 Allowed Lateness 是指在 Watermark 到达窗口结束时间之后允许延迟多长时间,两个概念不一样。
提示2:
a.通过 watermark 机制来处理 out-of-order 的问题,属于第一层防护,属于全局性的防护,通常说的乱序问题的解决办法,就是指这类;
b.通过窗口上的 allowedLateness 机制来处理 out-of-order 的问题,属于第二层防护,属于特定 window operator 的防护,late element 的问题就是指这类
1 | DataStream<T> input = ...; |
提示:当使用 GlobalWindows 时,没有元素会被认为是迟到的,因为这个窗口哦的结束时间时 Long.MAX_VALUE。
迟到的数据做旁路输出(side output)
1 | final OutputTag<T> lateOutputTag = new OutputTag<T>("late-data"){}; |
拓展思考
当指定 Allowed Lateness > 0 时,在 Watermark 通过窗口结束时间后,将保留窗口及其内容。在这种情况下,当一个延迟但未被丢弃的元素到达时,它可能会再次触发窗口运算。这些被触发运算的被称为 late firing。在使用会话窗口时,它们可能会将两个预先存在的未合并窗口进行合并,下面是一个例子。
比如:有一个会话窗口且 Gap 为3分钟,现在有两个窗口,第一个窗口起始和结束时间为(01:00:00,01:00:05),第二个窗口起始和结束时间为(01:00:09,01:00:15),如果我们在此时不设置 Allowed Lateness 时,那么如果不保存第一个窗口的数据,运算第二个窗口的数据时,不会有什么问题,但是如果我们设置了 Allowed Lateness = 5 min,那么这时就会有问题了,比如有迟到元素01:00:15才到达,元素自己的时间戳为01:00:07,这样这个元素就可以将两个窗口的数据结合为一个窗口。
提示:延迟数据触发的运算应该将之前的计算结果更新,所以如果下游 sink 使用了 kafka,则这种情况不是很适用(除非消费 kafka 的是一些 updateable dfs),否则,你将会得到很多的对相同组数据计算的结果。
窗口结果的使用
窗口计算的结果也会转化为一个数据流,这份结果中不会包含窗口操作的任何信息,所以如果后续计算中需要这些信息,你必须使用 ProcessWindowFunction 将这些信息通过编码传输进去。
窗口和 Watermark 的联系
当 Watermark 到达窗口算子处时,会触发两个事件:
1.Watermark 会触发所有的窗口中的最大时间戳(窗口结束时间戳 - 1)< 到达的最新 Watermark的窗口运算。
2.将 Watermark 发送到下游算子。
Intuitively, a watermark “flushes” out any windows that would be considered late in downstream operations once they receive that watermark.
连续的窗口算子
设置窗口时产生的状态大小的注意事项
窗口大小可以定义的很大(如天、周或月),因此可能会累积非常大的状态(state)。所以在估计窗口计算的存储需求时,需要记住以下几个规则:
1.Flink 会为每个元素所属的窗口创建一个副本。因此,滚动的窗口保留每个元素的一个副本(一个元素只属于一个窗口)。相反,滑动窗口可能会创建每个元素中的几个副本。因此,1天大小的窗口,1秒滑动步长的滑动窗口可能不是一个好主意。
2.ReduceFunction、AggregateFunction 和 FoldFunction 可以显著减少存储需求,因为它们在元素到达时就聚合元素,并且每个窗口只存储一个值。相反,仅仅使用 ProcessWindowFunction 就需要累积所有元素。
3.使用 Evictor 可防止任何预聚合,因为在应用计算之前,窗口的所有元素都必须通过 Evictor 传递。