Apache Flink 零基础入门(四):DataStream API 编程 学习心得

学习心得

DataStream

RichParallelSourceFunction

用户通过实现SourceFunction自定义DataSource

如果设置了并行度,则会产生指定并行度个数的DataSource消费客户端去消费DataSource

1
StreamExecutionEnvironment.setParallelism(int)

举例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
public class GroupedProcessingTimeWindow {

private static final Logger LOGGER = LoggerFactory.getLogger(GroupedProcessingTimeWindow.class);

private static class DataSource extends RichParallelSourceFunction<Tuple2<String, Integer>> {

private volatile boolean isRunning = true;

@Override
public void run(SourceContext<Tuple2<String, Integer>> ctx) throws Exception {
Random random = new Random();
while (isRunning) {
TimeUnit.MILLISECONDS.sleep((getRuntimeContext().getIndexOfThisSubtask() + 1) * 1000 * 5);
String key = "类别" + (char) ('A' + random.nextInt(3));
int value = random.nextInt(10) + 1;
LOGGER.info("Thread: {}, key: {}, value: {}, dataSource object: {})"
, Thread.currentThread().getName()
, key
, value
, this);
ctx.collect(new Tuple2<>(key, value));
}
}

@Override
public void cancel() {
isRunning = false;
}
}

public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);

DataSource dataSource = new DataSource();
LOGGER.info("dataSource object: {}", dataSource);

DataStream<Tuple2<String, Integer>> ds = env.addSource(dataSource);
KeyedStream<Tuple2<String, Integer>, Tuple> keyedStream = ds.keyBy(0);

// KeyedStream<Tuple2<String, Integer>, Tuple> keyedStream = ds.keyBy("f0"); 通过指定字段名 f0

keyedStream
.sum(1)
// .sum("f1") 通过制定字段名 f1
.keyBy((KeySelector<Tuple2<String, Integer>, Object>) stringIntegerTuple2 -> StringUtils.EMPTY)
.fold(new HashMap<String, Integer>(),
new FoldFunction<Tuple2<String, Integer>, HashMap<String, Integer>>() {
@Override
public HashMap<String, Integer> fold(HashMap<String, Integer> accumulator,
Tuple2<String, Integer> value) throws Exception {
accumulator.put(value.f0, value.f1);
return accumulator;
}
})
.addSink(new SinkFunction<HashMap<String, Integer>>() {
@Override
public void invoke(HashMap<String, Integer> value, Context context) throws Exception {
// 每个类型的商品成交量
LOGGER.info("{}"
, value);
// 商品成交总量
LOGGER.info("{}"
, value.values().stream().mapToInt(v -> v).sum());
}
});

env.execute();
}

}

通过查看dataSource object:的log就会发现上面这个例子中国产生了3个DataSource实例。

Evictor

CountEvictor:保持窗口内元素数量符合用户指定数量,如果多于用户指定的数量,从窗口缓冲区的开头丢弃剩余的元素。
DeltaEvictor:使用 DeltaFunction和 一个阈值,计算窗口缓冲区中的最后一个元素与其余每个元素之间的 delta 值,并删除 delta 值大于或等于阈值的元素。
TimeEvictor:以毫秒为单位的时间间隔作为参数,对于给定的窗口,找到元素中的最大的时间戳max_ts,并删除时间戳小于max_ts - interval的所有元素。

keyedStream

KeyedStream.fold(R initialValue, FoldFunction<T, R> folder)

添加一个合并key分组的算子,FoldFunction会接收到同一key的value,只有key相同的值才会被分发到同一个folder。

可能出现的问题

错误场景:
在用户定义DAG图算子的时候,可能会出现不支持lambda表达式的情况

原因:
为了执行程序,Flink需要知道要处理的值的类型,因为它需要序列化和反序列化数据。
Flink的类型系统基于描述数据类型的TypeInformation进行序列化和反序列化,会将Java中的基本类型以及Object类型与TypeInformation进行映射。
当您指定一个函数时,Flink会尝试推断该函数的返回类型。
但是某些Lambda函数由于类型擦除而丢失了此信息(可以自己编译后再对编译成的.class文件进行反编译,然后查看函数签名,发现函数签名具体类型被擦除),
因此Flink无法通过此自动推断类型。
Flink Java Lambda表达式

因此,必须显式声明返回类型。

解决方案1:用户自己定义返回类型

1
2
3
4
5
6
7
DataStream<String> wordDataStream = dataStream.flatMap(
(String sentence, Collector<String> out) -> {
for(String word: sentence.split("\\W+")) {
out.collect(word); // collect objects of type String
}
}
).returns(Types.STRING);

解决方案2:显示声明返回类型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
DataStream<String> wordDataStream = dataStream.flatMap(
new FlatMapFunction<String, String>() {
@Override
public void flatMap(String sentence, Collector<String> out) {
// normalize and split the line
String[] words = sentence.toLowerCase().split("\\W+");

// emit the pairs
for (String word : words) {
if (word.length() > 0) {
out.collect(word);
}
}
}
}
)