Apache Flink 学习:hdfs作为sink

Apache Flink 学习:hdfs作为source和sink的demo

依赖项

1
2
3
4
5
6
7
8
9
10
11
<properties>
<flink.version>1.9.0</flink.version>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-filesystem_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependencies>

demo代码

使用了kafka作为source,hdfs作为sink
运行之前需要运行kafka集群,hadoop集群(zookeeper集群)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/******************************* define dag *******************************/
// create the environment to create streams and configure execution
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// make parameters available in the web interface
env.getConfig().setGlobalJobParameters(params);
DataStream<String> sentenceStream = env.addSource(source);
DataStream<Tuple2<String, Integer>> wordCountStream = sentenceStream
.flatMap(new LineSplitter())
.keyBy(0)
.sum(1);
wordCountStream.print();

DataStream<String> kafkaSinkStream = wordCountStream
.map(new WordBuilder());

/******************************* hdfs sink *******************************/

BucketingSink<String> bucketingSink = new BucketingSink<>("/user/xxx/flink/from-kafka"); //hdfs上的路径
bucketingSink.setWriter(new StringWriter<>())
.setBatchSize(1024 * 1024L)
.setBatchRolloverInterval(2000)
.setInactiveBucketThreshold(1000);

kafkaSinkStream.addSink(bucketingSink);

上面例子将创建一个 Sink,写入遵循下面格式的分桶文件中:

1
/base/path/{date-time}/_part-{parallel-task}-{count}

date-time
是从setBucketer()自定义的日期/时间格式的字符串,如果不进行设置,默认Bucketer是DateTimeBucketer,默认值是yyyy-MM-dd–HH(DateTimeBucketer.DEFAULT_FORMAT_STRING)

_part-{parallel-task}-{count}:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private static final String DEFAULT_VALID_PREFIX = "_";
private static final String DEFAULT_PART_PREFIX = "part";
private static final String DEFAULT_PENDING_SUFFIX = ".pending";

private void openNewPartFile(Path bucketPath, BucketState<T> bucketState) throws Exception {
Path partPath = assemblePartPath(bucketPath, subtaskIndex, bucketState.partCounter);
Path inProgressPath = getInProgressPathFor(partPath);
}

private Path assemblePartPath(Path bucket, int subtaskIndex, int partIndex) {
String localPartSuffix = partSuffix != null ? partSuffix : "";
return new Path(bucket, String.format("%s-%s-%s%s", partPrefix, subtaskIndex, partIndex, localPartSuffix));
}

private Path getInProgressPathFor(Path path) {
return new Path(path.getParent(), inProgressPrefix + path.getName()).suffix(inProgressSuffix);
}

查看hdfs文件

1
2
3
4
5
6
7
8
9
10
11
$ ./hdfs dfs -ls /user/xxx/flink/from-kafka/2019-10-19--19
2019-10-19 20:08:31,244 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Found 15 items
-rw-r--r-- 1 xxx supergroup 4 2019-10-19 19:45 /user/xxx/flink/from-kafka/2019-10-19--19/_part-0-0.pending
-rw-r--r-- 1 xxx supergroup 2 2019-10-19 19:45 /user/xxx/flink/from-kafka/2019-10-19--19/_part-1-0.pending
-rw-r--r-- 1 xxx supergroup 5 2019-10-19 19:44 /user/xxx/flink/from-kafka/2019-10-19--19/_part-2-0.pending
-rw-r--r-- 1 xxx supergroup 11 2019-10-19 19:45 /user/xxx/flink/from-kafka/2019-10-19--19/_part-2-1.pending
-rw-r--r-- 1 xxx supergroup 10 2019-10-19 19:44 /user/xxx/flink/from-kafka/2019-10-19--19/_part-3-0.pending
-rw-r--r-- 1 xxx supergroup 13 2019-10-19 19:45 /user/xxx/flink/from-kafka/2019-10-19--19/_part-3-1.pending
-rw-r--r-- 1 xxx supergroup 9 2019-10-19 19:45 /user/xxx/flink/from-kafka/2019-10-19--19/_part-4-0.pending
-rw-r--r-- 1 xxx supergroup 10 2019-10-19 19:44 /user/xxx/flink/from-kafka/2019-10-19--19/_part-5-0.pending