Apache Flink 学习:DataStream Api 中 Operators(算子)——Joining

Apache Flink 学习:DataStream Api 中 Operators(算子)——Joining

Window Join

Window Join 可以将两个流中相同key并且在同一个窗口中的元素进行链接。窗口可以使用 Window Assigner 进行定义,并且对来自不同的流的元素进行计算。

1
2
3
4
5
stream.join(otherStream)
.where(<KeySelector>)
.equalTo(<KeySelector>)
.window(<WindowAssigner>)
.apply(<JoinFunction>)

关于一些语义的解释:
1.两个流的成对组合的过程类似于 Inner Join,意味着如果一个流中的元素没有另一个流的元素要与之连接,则不会发出这些元素。

2.那些被连接的元素的时间戳是位于相应窗口中的最大时间戳。例如,以[5,10)为边界的窗口,则进行连接的元素的时间戳为9。

Tumbling Window Join

执行 Tumbling Window Join,在相同 key,相同时间窗口内的元素会进行笛卡尔积组合,这种组合类似于 inner join,如果一个流的对应流的同一窗口中没有元素,则这个流的当前窗口数据不会发出去。

tumbling-window-join

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

...

DataStream<Integer> orangeStream = ...
DataStream<Integer> greenStream = ...

orangeStream.join(greenStream)
.where(<KeySelector>)
.equalTo(<KeySelector>)
.window(TumblingEventTimeWindows.of(Time.milliseconds(2)))
.apply (new JoinFunction<Integer, Integer, String> (){
@Override
public String join(Integer first, Integer second) {
return first + "," + second;
}
});

Sliding Window Join

执行 Sliding Window Join,具有公共 key 和公共滑动窗口的所有元素都作为成对组合进行连接,并传递给 JoinFunction 或 FlatJoinFunction。也是 inner join,当前流窗口匹配不到对应流窗口的元素则不会发送数据到下游!请注意,某些元素可能在一个滑动窗口中连接,在另一个滑动窗口中不会进行连接!

sliding-window-join

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

...

DataStream<Integer> orangeStream = ...
DataStream<Integer> greenStream = ...

orangeStream.join(greenStream)
.where(<KeySelector>)
.equalTo(<KeySelector>)
.window(SlidingEventTimeWindows.of(Time.milliseconds(2) /* size */, Time.milliseconds(1) /* slide */))
.apply (new JoinFunction<Integer, Integer, String> (){
@Override
public String join(Integer first, Integer second) {
return first + "," + second;
}
});

Session Window Join

执行 Session Window Join,具有相同 key 的所有元素(当“组合”满足会话条件时)将以成对组合联接,并传递给JoinFunction或FlatJoinFunction。同样,也是 inner join,当前流窗口匹配不到对应流窗口的元素则不会发送数据到下游!

session-window-join

Interval Join

interval join 用一个公共 key 连接两个流的元素(流A和流B),其中流B的元素具有与流A中元素的时间戳相对时间间隔内的时间戳,那么这个时间间隔内两个流的元素就会 join。

即:b.timestamp ∈ [a.timestamp + lowerBound; a.timestamp + upperBound] or a.timestamp + lowerBound <= b.timestamp <= a.timestamp + upperBound

其中 lowerBound 和 upperBound 可正可负,只要 lowerBound <= upperBound。

interval-join

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.time.Time;

...

DataStream<Integer> orangeStream = ...
DataStream<Integer> greenStream = ...

orangeStream
.keyBy(<KeySelector>)
.intervalJoin(greenStream.keyBy(<KeySelector>))
.between(Time.milliseconds(-2), Time.milliseconds(1))
.process (new ProcessJoinFunction<Integer, Integer, String(){

@Override
public void processElement(Integer left, Integer right, Context ctx, Collector<String> out) {
out.collect(first + "," + second);
}
});