Apache Flink 学习:hbase作为sink

Apache Flink 学习:hbase作为sink的demo

依赖项

1
2
3
4
5
6
7
8
9
10
11
<properties>
<hbase.version>2.0.5</hbase.version>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>${hbase.version}</version>
</dependency>
</dependencies>

demo代码

使用了hbase作为source,hbase作为sink

运行之前需要运行hadoop集群(zookeeper集群),hbase集群
flink根据部署的集群信息(比如zookeeper的ip:port为127.0.0.1:2181等的信息)去连接hbase

hbase-site.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
<?xml version="1.0" encoding="UTF-8"?>

<!--Autogenerated by Cloudera Manager-->
<configuration>
<!-- zk configuration -->
<property>
<name>zookeeper.session.timeout</name>
<value>60000</value>
</property>
<property>
<name>zookeeper.znode.parent</name>
<value>/hbase</value>
</property>
<property>
<name>hbase.zookeeper.quorum</name>
<value>127.0.0.1</value>
</property>
<property>
<name>hbase.zookeeper.property.clientPort</name>
<value>2181</value>
</property>
</configuration>

HBaseReader

HBase作为source

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
public static class HBaseReader extends RichSourceFunction<String> {

private static final Logger LOGGER = LoggerFactory.getLogger(HBaseReader.class);

private transient HBaseClient hBaseClient;

private static final String DEFAULT_HBASE_SOURCE_TABLE_NAME = "student";

private static final String DEFAULT_HBASE_SOURCE_START_ROW = "row1";

private static final String DEFAULT_HBASE_SOURCE_STOP_ROW = "row1";

@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
if (Objects.isNull(hBaseClient)) {
synchronized (this) {
if (Objects.isNull(hBaseClient)) {
hBaseClient = new HBaseClient();
hBaseClient.initialize();
}
}
}
}

@Override
public void run(SourceContext<String> ctx) throws Exception {
List<byte[]> results = hBaseClient.scan(
DEFAULT_HBASE_SOURCE_TABLE_NAME
, DEFAULT_HBASE_SOURCE_START_ROW
, DEFAULT_HBASE_SOURCE_STOP_ROW);
results.forEach(result -> ctx.collect(new String(result)));
}

@Override
public void cancel() {
if (Objects.isNull(hBaseClient)) {
synchronized (this) {
if (Objects.isNull(hBaseClient)) {
try {
hBaseClient.destroy();
} catch (IOException e) {
LOGGER.error("", e);
}
}
}
}
}
}

HBaseWriter

HBase作为sink

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public static class HBaseWriter extends RichSinkFunction<Tuple2<String, Integer>> {

private static final Logger LOGGER = LoggerFactory.getLogger(HBaseWriter.class);

private transient HBaseClient hBaseClient;

@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
if (Objects.isNull(hBaseClient)) {
synchronized (this) {
if (Objects.isNull(hBaseClient)) {
hBaseClient = new HBaseClient();
hBaseClient.initialize();
}
}
}
}

@Override
public void invoke(Tuple2<String, Integer> value, Context context) throws IOException {
String result = value.toString();
Put put = hBaseClient.createPut("row2");
hBaseClient.addValueOnPut(put, "description", "age", "19");
hBaseClient.put("student", put);
}

@Override
public void close() throws Exception {
super.close();
if (Objects.isNull(hBaseClient)) {
synchronized (this) {
if (Objects.isNull(hBaseClient)) {
try {
hBaseClient.destroy();
} catch (IOException e) {
LOGGER.error("", e);
}
}
}
}
}
}

定义dag

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public static void main(String[] args) throws Exception {

final SourceFunction<String> source;
final ParameterTool params = ParameterTool.fromArgs(args);

/******************************* hbase source *******************************/
source = new HBaseReader();

/******************************* define dag *******************************/
// create the environment to create streams and configure execution
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// make parameters available in the web interface
env.getConfig().setGlobalJobParameters(params);
DataStream<String> sentenceStream = env.addSource(source);
DataStream<Tuple2<String, Integer>> wordCountStream = sentenceStream
.flatMap(new LineSplitter())
.keyBy(0)
.sum(1);

/******************************* hbase sink *******************************/
wordCountStream.addSink(new HBaseWriter());
wordCountStream.print();

env.execute("Java hbase Word Count");
}

查看hbase文件

1
2
3
4
5
6
7
8
9
10
hbase(main):001:0> scan 'student'
ROW COLUMN+CELL
row1 column=description:age, timestamp=1571460125600, value=18
row1 column=description:name, timestamp=1571460129987, value=li
u
# 记录以及被写入hbase
row2 column=description:age, timestamp=1571576517072, value=19
2 row(s) in 0.2010 seconds

hbase(main):002:0>

发现columnFamilyName为description,columnName为age,rowkey为row2,value为19的记录已经被写入hbase