Apache Flink 学习:BoundedOutOfOrdernessTimestampExtractor

Apache Flink 学习:BoundedOutOfOrdernessTimestampExtractor

源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.api.functions.timestamps;

import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.time.Time;

/**
* This is a {@link AssignerWithPeriodicWatermarks} used to emit Watermarks that lag behind the element with
* the maximum timestamp (in event time) seen so far by a fixed amount of time, <code>t_late</code>. This can
* help reduce the number of elements that are ignored due to lateness when computing the final result for a
* given window, in the case where we know that elements arrive no later than <code>t_late</code> units of time
* after the watermark that signals that the system event-time has advanced past their (event-time) timestamp.
* */
public abstract class BoundedOutOfOrdernessTimestampExtractor<T> implements AssignerWithPeriodicWatermarks<T> {

private static final long serialVersionUID = 1L;

/** The current maximum timestamp seen so far. */
/** 数据流的最大时间戳 */
private long currentMaxTimestamp;

/** The timestamp of the last emitted watermark. */
/** 最后一次已提交的最新 [水印](当前批次水印) */
private long lastEmittedWatermark = Long.MIN_VALUE;

/**
* The (fixed) interval between the maximum seen timestamp seen in the records
* and that of the watermark to be emitted.
* 最大乱序时间间隔
* 将要被提交的 [水印] 和 [数据流的最大时间戳] 的固定时间间隔
* 如果 [数据流的最大时间戳] - [当前批次水印] > [最大乱序时间间隔]
* 则就会打上一个新的 [水印]
*/
private final long maxOutOfOrderness;

public BoundedOutOfOrdernessTimestampExtractor(Time maxOutOfOrderness) {
if (maxOutOfOrderness.toMilliseconds() < 0) {
throw new RuntimeException("Tried to set the maximum allowed " +
"lateness to " + maxOutOfOrderness + ". This parameter cannot be negative.");
}
this.maxOutOfOrderness = maxOutOfOrderness.toMilliseconds();
this.currentMaxTimestamp = Long.MIN_VALUE + this.maxOutOfOrderness;
}

public long getMaxOutOfOrdernessInMillis() {
return maxOutOfOrderness;
}

/**
* Extracts the timestamp from the given element.
* 从当前数据流元素中获取 [时间戳] 字段,需要用户根据业务自定义
*
* @param element The element that the timestamp is extracted from.
* @return The new timestamp.
*/
public abstract long extractTimestamp(T element);

/**
* 如果 [当前数据流最大时间戳] - [最大乱序时间间隔] >= [最后一次已提交的时间戳]
* 则更新 [最后一次已提交的时间戳]
*/
@Override
public final Watermark getCurrentWatermark() {
// this guarantees that the watermark never goes backwards.
long potentialWM = currentMaxTimestamp - maxOutOfOrderness;
if (potentialWM >= lastEmittedWatermark) {
lastEmittedWatermark = potentialWM;
}
return new Watermark(lastEmittedWatermark);
}

/**
* 获取数据流中当前最大时间戳
*/
@Override
public final long extractTimestamp(T element, long previousElementTimestamp) {
long timestamp = extractTimestamp(element);
if (timestamp > currentMaxTimestamp) {
currentMaxTimestamp = timestamp;
}
return timestamp;
}
}

****

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* Sets the time characteristic for all streams create from this environment, e.g., processing
* time, event time, or ingestion time.
*
* <p>If you set the characteristic to IngestionTime of EventTime this will set a default
* watermark update interval of 200 ms. If this is not applicable for your application
* you should change it using {@link ExecutionConfig#setAutoWatermarkInterval(long)}.
*
* @param characteristic The time characteristic.
*/
@PublicEvolving
public void setStreamTimeCharacteristic(TimeCharacteristic characteristic) {
this.timeCharacteristic = Preconditions.checkNotNull(characteristic);
if (characteristic == TimeCharacteristic.ProcessingTime) {
getConfig().setAutoWatermarkInterval(0);
} else {
getConfig().setAutoWatermarkInterval(200);
}
}

window触发机制

window的触发机制,是先按照自然时间将window划分,如果window大小是3秒,那么1分钟内会把window划分为如下的形式:

1
2
3
4
[00:00:00,00:00:03)
[00:00:03,00:00:06)
...
[00:00:57,00:01:00)

如果window大小是10秒,则window会被分为如下的形式:

1
2
3
4
[00:00:00,00:00:10)
[00:00:10,00:00:20)
...
[00:00:50,00:01:00)