Apache Flink 学习:DataStream Api 中 Operators(算子)——窗口

Apache Flink 学习:DataStream Api 中 Operators(算子)——窗口

Keyed Windows

1
2
3
4
5
6
7
8
9
stream
.keyBy(...) <- keyed versus non-keyed windows
.window(...) <- required: "assigner"
[.trigger(...)] <- optional: "trigger" (else default trigger)
[.evictor(...)] <- optional: "evictor" (else no evictor)
[.allowedLateness(...)] <- optional: "lateness" (else zero)
[.sideOutputLateData(...)] <- optional: "output tag" (else no side output for late data)
.reduce/aggregate/fold/apply() <- required: "function"
[.getSideOutput(...)] <- optional: "output tag"

Non-Keyed Windows

1
2
3
4
5
6
7
8
stream
.windowAll(...) <- required: "assigner"
[.trigger(...)] <- optional: "trigger" (else default trigger)
[.evictor(...)] <- optional: "evictor" (else no evictor)
[.allowedLateness(...)] <- optional: "lateness" (else zero)
[.sideOutputLateData(...)] <- optional: "output tag" (else no side output for late data)
.reduce/aggregate/fold/apply() <- required: "function"
[.getSideOutput(...)] <- optional: "output tag"

Window 生命周期

简而言之,当属于该窗口的第一个元素到达时,将会创建一个窗口,并且当时间(Event Time 或者 Processing Time)超过其结束时间戳加上用户指定的允许延迟时间时,将完全删除该窗口,注意窗口都是左开右闭,比如:[0, 5)
Flink 保证只删除基于时间的窗口,而不删除其他类型的窗口,例如 Global Window。例如,使用基于事件时间的窗口,并且创建一个窗口大小为5分钟的滚动(Tumbing)窗口,并且允许延迟1分钟。当时间戳属于12:00到12:05之间的第一个元素到达时,Flink 将创建一个新窗口,当 Watermark 通过12:06时间戳时,就会把这个窗口删除。

此外,每个窗口都包含一个 Trigger 和一个函数(ProcessWindowFunction, ReduceFunction, AggregateFunction or FoldFunction)。函数包含了要应用于窗口内容的计算,而 Trigger 制定了什么情况下才触发执行这些函数。比如,触发策略可能类似于“当窗口中的元素数超过4时”或“当 WaterMark 通过窗口结束时”进行触发。Trigger 还可以决定什么时候删除窗口中的元素。

除上述内容外,您还可以指定一个 Evictor,该 Evictor 将能够在 Trigger 触发后、应用函数之前和/或之后从窗口中移除元素。

下面例子中的窗口都是按照 Event Time 或者 Processing Time进行指定。

Keyed vs Non-Keyed Windows

首先要指定的是是否应该为流设置 key。使用keyBy(…)可以将无限流拆分为逻辑 keyed 流。

在 Keyed Stream 的情况下,传入 event 的任何属性都可以用作键。拥有一个 Keyed Stream 将允许您的窗口计算由多个任务并行执行,因为每个逻辑 Keyed Stream 都可以独立于其他任务进行处理。所有引用同一个键的 event 都将被发送到同一个并行任务(通过 partitioner 完成)。

如果不是 Keyed Stream,则不会将原始流拆分为多个逻辑流,所有窗口逻辑将由单个任务执行,即并行度为1。

Tumbling Windows

滚动窗口

滚动窗口,如果你指定的滚动窗口大小为一天计算一次,并且你需要更具你本地的时间的 00:00:00 开始,则必须按照时区来指定窗口。
可以看到上图中,无论是多少个 key,每个 key 的窗口的起始和截止时间都相同。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
/**
* Creates a new {@code TumblingEventTimeWindows} {@link WindowAssigner} that assigns
* elements to time windows based on the element timestamp and offset.
*
* 可以根据 时间戳 以及 偏移量 来指定 窗口的范围
*
* <p>For example, if you want window a stream by hour,but window begins at the 15th minutes
* of each hour, you can use {@code of(Time.hours(1),Time.minutes(15))},then you will get
* time windows start at 0:15:00,1:15:00,2:15:00,etc.
*
* 比如,如果需要一个窗口大小为一个小时,从每个小时的第15分钟开始计数的窗口,则可以使用下面的代码实现
* of(Time.hours(1),Time.minutes(15))
* 这样获得的窗口就是 0:15:00,1:15:00,2:15:00 ...
*
* <p>Rather than that,if you are living in somewhere which is not using UTC±00:00 time,
* such as China which is using UTC+08:00,and you want a time window with size of one day,
* and window begins at every 00:00:00 of local time,you may use {@code of(Time.days(1),Time.hours(-8))}.
* The parameter of offset is {@code Time.hours(-8))} since UTC+08:00 is 8 hours earlier than UTC time.
*
* 除此之外,如果您的时区不是 UTC±00:00 时间,比如在 中国(时区是 UTC+08:00),并且你需要一个一天大小的窗口,
* 并且窗口时间是本地 00:00:00开始,则可以使用下面的代码实现
* of(Time.days(1), Time.hours(-8))
*
* @param size The size of the generated windows.
* @param offset The offset which window start would be shifted by.
* @return The time policy.
*/
public static TumblingEventTimeWindows of(Time size, Time offset) {
return new TumblingEventTimeWindows(size.toMilliseconds(), offset.toMilliseconds());
}

public static void main(String[] args) {
// daily tumbling event-time windows offset by -8 hours.
input
.keyBy(<key selector>)
.window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
.<windowed transformation>(<window function>);
}

Sliding Windows

滑动窗口

滑动窗口,如果你指定窗口大小和滑动步长一样,那么和滚动窗口的作用一样,滑动窗口的一个明显的特征就是:窗口可能会重叠,即同一个元素可能会属于不同的窗口

和滚动窗口相同,如果你指定的滚动窗口大小为一天计算一次,你需要更具你本地的时间的 00:00:00 开始,则必须按照时区来指定窗口。

1
2
3
4
5
6
7
public static void main(String[] args) {
// sliding processing-time windows offset by -8 hours
input
.keyBy(<key selector>)
.window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8)))
.<windowed transformation>(<window function>);
}

Session Windows

会话窗口

会话窗口,根据会话来指定窗口,与滚动和滑动窗口相比,会话窗口不重叠,并且没有固定的开始和结束时间。会话窗口可以指定静态指定会话间隔,或者可以让用户动态指定会话间隔。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
DataStream<T> input = ...;

// event-time session windows with static gap
input
.keyBy(<key selector>)
.window(EventTimeSessionWindows.withGap(Time.minutes(10)))
.<windowed transformation>(<window function>);

// event-time session windows with dynamic gap
input
.keyBy(<key selector>)
.window(EventTimeSessionWindows.withDynamicGap((element) -> {
// determine and return session gap
}))
.<windowed transformation>(<window function>);

Global Windows

全局窗口

全局窗口(即无窗口),代表所有元素斗数以一个全局窗口,如果你不指定 Trigger,那么永远也不会生产出数据,因为全局窗口没有窗口开始和窗口结束的概念。

1
2
3
4
5
6
DataStream<T> input = ...;

input
.keyBy(<key selector>)
.window(GlobalWindows.create())
.<windowed transformation>(<window function>);

窗口函数

在定义 window assigner 之后,我们需要指定要在每个窗口上执行的计算。当系统确定窗口准备好处理时(Trigger决定),这些窗口函数就可以用于处理每个(Keyed / Non-Keyed)窗口的元素。

窗口函数可以是ReduceFunction、AggregateFunction、FoldFunction或ProcessWindowFunction之一。前两个函数执行起来会更高效,因为 Flink 可以在每个窗口中的元素到达时递增地聚合元素。ProcessWindowFunction获取包含在窗口中的所有元素的Iterable,以及有关元素所属窗口的其他元信息。

使用ProcessWindowFunction的窗口函数不能像其他函数那样高效地执行,因为Flink在调用函数之前必须在内部缓冲窗口的所有元素。这可以通过将ProcessWindowFunction与ReduceFunction、AggregateFunction或FoldFunction组合使用来提高效率,从而使得其他窗口元数据或者窗口元素的进行增量聚合。

ReduceFunction

变量:两个输入生成一个输出,三个变量的类型必须相同。
触发时间:在每个窗口的元素到来的时候进行增量聚合。

1
2
3
4
5
6
7
8
9
10
DataStream<Tuple2<String, Long>> input = ...;

input
.keyBy(<key selector>)
.window(<window assigner>)
.reduce(new ReduceFunction<Tuple2<String, Long>> {
public Tuple2<String, Long> reduce(Tuple2<String, Long> v1, Tuple2<String, Long> v2) {
return new Tuple2<>(v1.f0, v1.f1 + v2.f1);
}
});

AggregateFunction

AggregateFunction 是 ReduceFunction 的一个扩展版本。

变量:三个变量,一个是输入,一个是 accumulator 累加器,还有一个是输出,三个变量的类型可以不同。
触发时间:在每个窗口的元素到来的时候进行增量聚合。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
/**
* The accumulator is used to keep a running sum and a count. The {@code getResult} method
* computes the average.
*/
private static class AverageAggregate
implements AggregateFunction<Tuple2<String, Long>, Tuple2<Long, Long>, Double> {
@Override
public Tuple2<Long, Long> createAccumulator() {
return new Tuple2<>(0L, 0L);
}

@Override
public Tuple2<Long, Long> add(Tuple2<String, Long> value, Tuple2<Long, Long> accumulator) {
return new Tuple2<>(accumulator.f0 + value.f1, accumulator.f1 + 1L);
}

@Override
public Double getResult(Tuple2<Long, Long> accumulator) {
return ((double) accumulator.f0) / accumulator.f1;
}

@Override
public Tuple2<Long, Long> merge(Tuple2<Long, Long> a, Tuple2<Long, Long> b) {
return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);
}
}

DataStream<Tuple2<String, Long>> input = ...;

input
.keyBy(<key selector>)
.window(<window assigner>)
.aggregate(new AverageAggregate());

FoldFunction

FoldFunction 是 AggregateFunction 的一个简易版本。

变量:三个个变量,一个是输出值的初始化值,一个是输入,还有一个是输出,三个变量中输入和输出的类型可以不同,但是输出和输出初始化值必须相同。
触发时间:在每个窗口的元素到来的时候进行增量聚合。

FoldFunction 指定如何将窗口的输入元素与输出类型的元素组合。对添加到窗口的每个元素和当前输出值增量调用 FoldFunction。

1
2
3
4
5
6
7
8
9
10
DataStream<Tuple2<String, Long>> input = ...;

input
.keyBy(<key selector>)
.window(<window assigner>)
.fold("", new FoldFunction<Tuple2<String, Long>, String>> {
public String fold(String acc, Tuple2<String, Long> value) {
return acc + value.f1;
}
});

ProcessWindowFunction

ProcessWindowFunction 可以得到一个包含窗口的所有元素的迭代器,以及一个访问时间和状态信息的上下文对象,使得它能够提供比其他窗口函数更好的灵活性。
但是这是以性能和资源消耗为代价的,因为元素不能增量聚合,而是需要在内部缓冲,直到窗口可以处理为止。

变量:四个变量,一个是窗口的key,一个是包含了窗口信息的上下文,一个是窗口内所有元素的迭代器,一个是输出数据的收集器。
触发时间:窗口内有数据并且 Watermark 到达了窗口结束时间时触发。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window> implements Function {

/**
* Evaluates the window and outputs none or several elements.
*
* @param key The key for which this window is evaluated.
* @param context The context in which the window is being evaluated.
* @param elements The elements in the window being evaluated.
* @param out A collector for emitting elements.
*
* @throws Exception The function may throw exceptions to fail the program and trigger recovery.
*/
public abstract void process(
KEY key,
Context context,
Iterable<IN> elements,
Collector<OUT> out) throws Exception;

/**
* The context holding window metadata.
*/
public abstract class Context implements java.io.Serializable {
/**
* Returns the window that is being evaluated.
*/
public abstract W window();

/** Returns the current processing time. */
public abstract long currentProcessingTime();

/** Returns the current event-time watermark. */
public abstract long currentWatermark();

/**
* State accessor for per-key and per-window state.
*
* <p><b>NOTE:</b>If you use per-window state you have to ensure that you clean it up
* by implementing {@link ProcessWindowFunction#clear(Context)}.
*/
public abstract KeyedStateStore windowState();

/**
* State accessor for per-key global state.
*/
public abstract KeyedStateStore globalState();
}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
DataStream<Tuple2<String, Long>> input = ...;

input
.keyBy(t -> t.f0)
.timeWindow(Time.minutes(5))
.process(new MyProcessWindowFunction());

/* ... */

public class MyProcessWindowFunction
extends ProcessWindowFunction<Tuple2<String, Long>, String, String, TimeWindow> {

@Override
public void process(String key, Context context, Iterable<Tuple2<String, Long>> input, Collector<String> out) {
long count = 0;
for (Tuple2<String, Long> in: input) {
count++;
}
out.collect("Window: " + context.window() + "count: " + count);
}
}

使用 ProcessWindowFunction 进行元素 count 是非常低效的,下面会讲到怎样将 ReduceFunction 或者 AggregateFunction 与 ProcessWindowFunction 结合使用将增量的数据的与 ProcessWindowFunction 配合进行使用。

为什么需要用到 ProcessWindowFunction:如果必要的话,一般的业务逻辑是没必要使用到 ProcessWindowFunction 的,但是有的需求需要获取到当前元素时间戳,窗口开始结束等等的信息,这时就需要使用 ProcessWindowFunction 来获取这些信息了。

ProcessWindowFunction 与 ReduceFunction

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
DataStream<SensorReading> input = ...;

input
.keyBy(<key selector>)
.timeWindow(<duration>)
.reduce(new MyReduceFunction(), new MyProcessWindowFunction());

// Function definitions

private static class MyReduceFunction implements ReduceFunction<SensorReading> {

public SensorReading reduce(SensorReading r1, SensorReading r2) {
return r1.value() > r2.value() ? r2 : r1;
}
}

private static class MyProcessWindowFunction
extends ProcessWindowFunction<SensorReading, Tuple2<Long, SensorReading>, String, TimeWindow> {

public void process(String key,
Context context,
Iterable<SensorReading> minReadings,
Collector<Tuple2<Long, SensorReading>> out) {
SensorReading min = minReadings.iterator().next();
out.collect(new Tuple2<Long, SensorReading>(context.window().getStart(), min));
}
}

ProcessWindowFunction 与 AggregateFunction

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
DataStream<Tuple2<String, Long>> input = ...;

input
.keyBy(<key selector>)
.timeWindow(<duration>)
.aggregate(new AverageAggregate(), new MyProcessWindowFunction());

// Function definitions

/**
* The accumulator is used to keep a running sum and a count. The {@code getResult} method
* computes the average.
*/
private static class AverageAggregate
implements AggregateFunction<Tuple2<String, Long>, Tuple2<Long, Long>, Double> {
@Override
public Tuple2<Long, Long> createAccumulator() {
return new Tuple2<>(0L, 0L);
}

@Override
public Tuple2<Long, Long> add(Tuple2<String, Long> value, Tuple2<Long, Long> accumulator) {
return new Tuple2<>(accumulator.f0 + value.f1, accumulator.f1 + 1L);
}

@Override
public Double getResult(Tuple2<Long, Long> accumulator) {
return ((double) accumulator.f0) / accumulator.f1;
}

@Override
public Tuple2<Long, Long> merge(Tuple2<Long, Long> a, Tuple2<Long, Long> b) {
return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);
}
}

private static class MyProcessWindowFunction
extends ProcessWindowFunction<Double, Tuple2<String, Double>, String, TimeWindow> {

public void process(String key,
Context context,
Iterable<Double> averages,
Collector<Tuple2<String, Double>> out) {
Double average = averages.iterator().next();
out.collect(new Tuple2<>(key, average));
}
}

ProcessWindowFunction 与 FoldFunction

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
DataStream<SensorReading> input = ...;

input
.keyBy(<key selector>)
.timeWindow(<duration>)
.fold(new Tuple3<String, Long, Integer>("",0L, 0), new MyFoldFunction(), new MyProcessWindowFunction())

// Function definitions

private static class MyFoldFunction
implements FoldFunction<SensorReading, Tuple3<String, Long, Integer> > {

public Tuple3<String, Long, Integer> fold(Tuple3<String, Long, Integer> acc, SensorReading s) {
Integer cur = acc.getField(2);
acc.setField(cur + 1, 2);
return acc;
}
}

private static class MyProcessWindowFunction
extends ProcessWindowFunction<Tuple3<String, Long, Integer>, Tuple3<String, Long, Integer>, String, TimeWindow> {

public void process(String key,
Context context,
Iterable<Tuple3<String, Long, Integer>> counts,
Collector<Tuple3<String, Long, Integer>> out) {
Integer count = counts.iterator().next().getField(2);
out.collect(new Tuple3<String, Long, Integer>(key, context.window().getEnd(),count));
}
}

ProcessWindowFunction 中使用 state

WindowFunction(遗留)

在一些可以使用 ProcessWindowFunction 的地方,你也可以使用 WindowFunction,这是较旧版本的 ProcessWindowFunction,它提供的上下文信息较少,并且没有一些高级功能,例如 per-window keyed state。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public interface WindowFunction<IN, OUT, KEY, W extends Window> extends Function, Serializable {

/**
* Evaluates the window and outputs none or several elements.
*
* @param key The key for which this window is evaluated.
* @param window The window that is being evaluated.
* @param input The elements in the window being evaluated.
* @param out A collector for emitting elements.
*
* @throws Exception The function may throw exceptions to fail the program and trigger recovery.
*/
void apply(KEY key, W window, Iterable<IN> input, Collector<OUT> out) throws Exception;
}
1
2
3
4
5
6
DataStream<Tuple2<String, Long>> input = ...;

input
.keyBy(<key selector>)
.window(<window assigner>)
.apply(new MyWindowFunction());

Triggers

触发器决定了窗口函数什么时候处理窗口中的数据。每一个 WindowAssigner 都会带有一个默认的 Trigger,如果默认的 Trigger 不符合需求,你可以使用 trigger(…) 指定你需要的触发器。

一个 Trigger 接口有五个方法,可以通过编写函数指定如何对不同的 event 做出相应。

Function 作用
TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) 每向窗口添加一个元素时触发一次。
TriggerResult onEventTime(long time, W window, TriggerContext ctx) Event Time timer 触发时调用。
TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) Processing Time timer 调用时触发。
void onMerge(W window, OnMergeContext ctx) 方法与有状态 Trigger 相关,并在两个触发器的相应窗口合并时合并它们的状态,例如在使用会话窗口时(会话窗口每添加一个元素就会产生一个窗口)。
void clear(W window, TriggerContext ctx) 将当前窗口的 state 清除。

前三个函数通过 TriggerResult 决定如何处理它们的调用事件。

TriggerResult 作用
TriggerResult.CONTINUE 什么都不做。
TriggerResult.FIRE 触发计算。
TriggerResult.PURGE 清除窗口内的元素。
TriggerResult.FIRE_AND_PURGE 触发计算,并且在此之后清除窗口内的元素。

触发运算并且清除元素

WindowAssigners 的默认 Triggers

很多 WindowAssigners 的默认 Triggers是适用于很多场景的。例如,所有的 event-time window assigners 都将 EventTimeTrigger 作为默认的 Trigger。这个 Trigger 的作用就是当 Watermark 到达了窗口结束时间时就触发。
提示1:GlobalWindow 的默认 Trigger 是永远不会触发的 NeverTrigger。
提示2:通过使用 trigger() 指定触发器,您将覆盖 WindowAssigner 的默认触发器。例如,如果为 TumblingEventTimeWindows 指定 CountTrigger,则不会再根据时间进度而仅按 count 触发窗口。

通用的 Triggers

Trigger 作用
EventTimeTrigger 根据由 Watermark 计算的 Event Time 进度触发。
ProcessingTimeTrigger 根据 Processing Time 触发。
CountTrigger 在窗口中的元素数超过给定限额时触发。
PurgingTrigger 将另一个 Trigger 作为参数,并将其转换为清除触发器。

Evictors

Flink 的窗口模型中允许指定除 WindowAssigner 和 Trigger 之外的可选逐出器(Evictor)。可以使用exictor(…)方法指定。Exictor 能够在触发器触发之后,并在使用窗口函数之前或者之后移除元素。为此,逐出器接口有两个方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/**
* Optionally evicts elements. Called before windowing function.
* 在执行窗口函数之前执行。
*
* @param elements The elements currently in the pane.
* @param size The current number of elements in the pane.
* @param window The {@link Window}
* @param evictorContext The context for the Evictor
*/
void evictBefore(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);

/**
* Optionally evicts elements. Called after windowing function.
* 在执行窗口函数之后执行。
*
* @param elements The elements currently in the pane.
* @param size The current number of elements in the pane.
* @param window The {@link Window}
* @param evictorContext The context for the Evictor
*/
void evictAfter(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);
Evictor 作用
CountEvictor 保持窗口内元素数量符合用户指定数量,如果多于用户指定的数量,从窗口缓冲区的开头丢弃剩余的元素。
DeltaEvictor 使用 DeltaFunction 和一个阈值,计算窗口缓冲区中的最后一个元素与其余每个元素之间的 delta 值,并删除 delta 值大于或等于阈值的元素。
TimeEvictor 以毫秒为单位的时间间隔作为参数,对于给定的窗口,找到元素中的最大的时间戳max_ts,并删除时间戳小于max_ts - interval的所有元素。

默认情况下,都只会在执行 WindowFunction 之前执行 Evictor。

提示:Flink 不能保证窗口中元素的顺序。这意味着尽管逐出器可能会从窗口的开头移除元素,但这些元素不一定是最先到达或最后到达的元素。

Allowed Lateness

概念

使用 Event Time 窗口时,可能会发生元素到达晚的情况,即 Flink 用于跟踪 Event Time 进度的 Watermark 已超过元素所属窗口的结束时间戳。

默认情况下,当发现 Watermark 已经超过到达的元素所属的窗口结束时间时,将删除这个延迟元素。但是 Flink 可以给窗口算子指定一个最大允许延迟时间。Allowed lateness 指定元素在被删除之前可以延迟多少时间,其默认值为0。

在迟到元素到达时,如果 Watermark 大于其所属窗口的结束时间时,并且 Watermark 小于窗口结束时间加上 allowed lateness,这个迟到的元素仍然可以被加到这个窗口内进行运算。有的触发器会出现在延迟但是没有丢弃的元素到达时,使得窗口再次计算,比如 EventTimeTrigger。

提示1:在 assignTimestampsAndWatermarks 时有一个 maxOutOfOrderness 的概念,maxOutOfOrderness 是生成 Watermark 所需要的,是指元素最大无序时间。而 Allowed Lateness 是指在 Watermark 到达窗口结束时间之后允许延迟多长时间,两个概念不一样。
提示2:
a.通过 watermark 机制来处理 out-of-order 的问题,属于第一层防护,属于全局性的防护,通常说的乱序问题的解决办法,就是指这类;
b.通过窗口上的 allowedLateness 机制来处理 out-of-order 的问题,属于第二层防护,属于特定 window operator 的防护,late element 的问题就是指这类

1
2
3
4
5
6
7
DataStream<T> input = ...;

input
.keyBy(<key selector>)
.window(<window assigner>)
.allowedLateness(<time>)
.<windowed transformation>(<window function>);

提示:当使用 GlobalWindows 时,没有元素会被认为是迟到的,因为这个窗口哦的结束时间时 Long.MAX_VALUE。

迟到的数据做旁路输出(side output)

1
2
3
4
5
6
7
8
9
10
11
12
final OutputTag<T> lateOutputTag = new OutputTag<T>("late-data"){};

DataStream<T> input = ...;

SingleOutputStreamOperator<T> result = input
.keyBy(<key selector>)
.window(<window assigner>)
.allowedLateness(<time>)
.sideOutputLateData(lateOutputTag)
.<windowed transformation>(<window function>);

DataStream<T> lateStream = result.getSideOutput(lateOutputTag);

拓展思考

当指定 Allowed Lateness > 0 时,在 Watermark 通过窗口结束时间后,将保留窗口及其内容。在这种情况下,当一个延迟但未被丢弃的元素到达时,它可能会再次触发窗口运算。这些被触发运算的被称为 late firing。在使用会话窗口时,它们可能会将两个预先存在的未合并窗口进行合并,下面是一个例子。

比如:有一个会话窗口且 Gap 为3分钟,现在有两个窗口,第一个窗口起始和结束时间为(01:00:00,01:00:05),第二个窗口起始和结束时间为(01:00:09,01:00:15),如果我们在此时不设置 Allowed Lateness 时,那么如果不保存第一个窗口的数据,运算第二个窗口的数据时,不会有什么问题,但是如果我们设置了 Allowed Lateness = 5 min,那么这时就会有问题了,比如有迟到元素01:00:15才到达,元素自己的时间戳为01:00:07,这样这个元素就可以将两个窗口的数据结合为一个窗口。

提示:延迟数据触发的运算应该将之前的计算结果更新,所以如果下游 sink 使用了 kafka,则这种情况不是很适用(除非消费 kafka 的是一些 updateable dfs),否则,你将会得到很多的对相同组数据计算的结果。

窗口结果的使用

窗口计算的结果也会转化为一个数据流,这份结果中不会包含窗口操作的任何信息,所以如果后续计算中需要这些信息,你必须使用 ProcessWindowFunction 将这些信息通过编码传输进去。

窗口和 Watermark 的联系

当 Watermark 到达窗口算子处时,会触发两个事件:
1.Watermark 会触发所有的窗口中的最大时间戳(窗口结束时间戳 - 1)< 到达的最新 Watermark的窗口运算。
2.将 Watermark 发送到下游算子。
Intuitively, a watermark “flushes” out any windows that would be considered late in downstream operations once they receive that watermark.

连续的窗口算子

设置窗口时产生的状态大小的注意事项

窗口大小可以定义的很大(如天、周或月),因此可能会累积非常大的状态(state)。所以在估计窗口计算的存储需求时,需要记住以下几个规则:

1.Flink 会为每个元素所属的窗口创建一个副本。因此,滚动的窗口保留每个元素的一个副本(一个元素只属于一个窗口)。相反,滑动窗口可能会创建每个元素中的几个副本。因此,1天大小的窗口,1秒滑动步长的滑动窗口可能不是一个好主意。

2.ReduceFunction、AggregateFunction 和 FoldFunction 可以显著减少存储需求,因为它们在元素到达时就聚合元素,并且每个窗口只存储一个值。相反,仅仅使用 ProcessWindowFunction 就需要累积所有元素。

3.使用 Evictor 可防止任何预聚合,因为在应用计算之前,窗口的所有元素都必须通过 Evictor 传递。