生产实践 | Flink + 直播(三)| 如何建设当前正在直播 xx 数?

生产实践 | Flink + 直播(三)| 如何建设当前正在直播 xx 数?

本系列每篇文章都是从一些实际的 case 出发,分析一些生产环境中经常会遇到的问题,抛砖引玉,以帮助小伙伴们解决一些实际问题。本篇文章主要介绍直播间生产侧指标的建设过程,如果对小伙伴有帮助的话,欢迎点赞 + 再看~

整体架构

本文主要介绍生产侧指标的建设,比如当前正在直播直播间数,或者主播数等。在介绍生产侧指标的建设过程之前,我们先回顾下上一节的架构图。

架构

而本篇要介绍的生产侧指标的数据链路主要对应以下几个模块。

  • 数据源:读取直播生产,比如开播,关播等 kafka 数据源日志;
  • 数据处理:使用生产侧数据源 + 实时画像维表 + flink 建设生产侧实时指标;
  • 数据汇:将处理完成的指标数据写入到 kafka 中。

我用另一张图进行了标注,图中标红模块为生产侧指标的数据链路涉及到的模块。

生产侧架构

其中直播间实时画像维表的介绍已经在上节进行了介绍,感兴趣的话可以点击以下链接,跳转到上节进行阅读~

本小节就不针对生产侧指标的建设中所有涉及指标的建设过程进行详细介绍了,我们主要以当前分钟正在开播直播间数作为生产侧指标建设的一个代表性案例,介绍这个指标的整个建设过程。
来为大家还原生产侧指标的业务过程以及技术方案。

Question

仍然从几个问题入手,介绍当前分钟正在开播直播间数的建设过程。

  • 当前分钟正在开播直播间数的定义什么?业务过程是怎么样的?举例?
  • 怎样去建设这个指标?整体的指标计算流程?

1.聊聊定义?

当前分钟正在开播直播间数,其定义就是整个平台中,当前分钟正在开播的直播间数 + 单层维度下钻的当前分钟正在开播的直播间数。

举例:

现在的时间点是 2020-11-11 12:42,真实直播的直播间数为 3000 个(平台维度下钻:IOS 平台为 1500,安卓平台为 1500)

到了 12:43 时,有 200 个直播间进行了关播(其中 100 个为 IOS,100 个为安卓),有 100 个直播间开播(全部为 IOS),则当前正在直播的直播间数为 2900(平台维度下钻:IOS 平台为 1500,安卓平台为 1400)。

其中 2020-11-11 12:42 的 3000 以及 2020-11-11 12:43 的 2900 以及按照平台下钻的数值就为当前时间正在开播的直播间数。

因此根据上述定义和分析,我们可以直接将数据源和数据汇的 schema 定义下来,主体信息如下。

数据源 schema

字段 备注
live_stream_id 直播间 id
author_id 主播 id
start_or_end 开播还是关播
timestamp 时间戳

数据汇 schema

字段 备注
timestamp 时间戳,汇总到分钟粒度
metric_name 指标名,举例:开播直播间数
metric_value 指标值,举例:3000(开播直播间数)
dim_name 维度名,举例:平台,版本
dim_value 维度值,举例:IOS,8.1

Notes:

metric_name 和 metric_value

这两个字段是为了之后进行指标扩充时进行的设计。比如后续如果需要加入开播主播数,开播时长等指标,不用修改数据汇 schema,只需要加一种 metric_name,就可以使用原有 schema 进行数据产出。

dim_name 和 dim_value

目前我们建设的指标只提供了进行单维度下钻的能力,所以设计了 dim_name 和 dim_value 两个字段,可满足用户查看平台为 IOS 的当前开播直播间数或者使用开播软件版本为 8.1 的当前开播直播间数。
如果后续业务场景需要多维下钻能力,可以在字段上面进行扩充。或者也可以提供明细数据在 OLAP 中进行多维下钻。

2.怎样建设?

对于当前分钟正在开播直播间数来说,其计算方式很简单,就是下面这个数学公式:

当前分钟正在开播直播间数 = 上一分钟正在开播直播间数 + 当前分钟开播直播间数 - 当前分钟关播直播间数

可以从上面的公式可以看出,对于当前分钟正在开播直播间数的计算来说,是依赖上下文信息的,即上一分钟正在开播直播间数,这也就是我们所说的状态

指标处理逻辑

从获取到数据源,到产出指标的整体处理逻辑如下图所示。这里就不进行赘述了。

技术架构

其中标为粉色的模块为任务中的状态,即任务中一直存储的当前分钟正在开播直播间数。

状态

上述指标涉及到了,状态,那么我这里讲一下我对状态的理解。如有错误,请在文末讨论中进行指出,我会和大家讨论。

状态其实就是一个记录上下文信息的东西,如果当前的计算过程依赖到上次计算的结果,那么上次计算的结果就是状态。举几个🌰;

  • 流处理:如本节介绍的当前分钟正在开播直播间数的计算,就是依赖上一分钟的正在开播直播间数(状态)进行的计算。
    可能有小伙伴会说,我不依赖上一分钟,我从头开始计算可以不?答案是可以的,但是从头开始计算,也需要将所有历史数据进行存储,这些历史数据其实也就是状态,只不过我们将其优化为了上一分钟开播直播间数。

  • 批处理:今天的全量表 = 昨天全量表(状态) + 今天的增量表。

  • 数据库存储:最常见的 mysql 主键自增,unique key 等。
    为什么新插入一条数据主键会自增?因为 mysql 存储了主键的上一个值(状态)。
    为什么插入相同数据时,由于 unique key 会导致报错,就是因为 mysql 存储了所有 unique key 的字段的数据(状态)。

  • 生活:当前的手机电量 = 上一分钟的手机电量(状态) + (充电/用电量)。
    为什么你越来越喜欢你的另一半?因为你对她的感觉 = 前一秒你对她的感觉(状态) + 当前这一秒她亲了你一下。

生活中随处可见状态,即使你不是程序员,我相信也都可以理解状态的概念。

指标计算代码示例

按照最简单的实现方式举例如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
public class LiveStreamRealtimeMetricProdProcessorJob {

public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<SourceModel> source = SourceFactory.getSourceDataStream(...);

DataStream<SinkModel> result = source
.keyBy(new KeySelector<SourceModel, Long>() {
@Override
public Long getKey(SourceModel commonModel) throws Exception {
return commonModel.getLiveStreamId() % 1000;
}
})
.timeWindow(Time.seconds(60))
.process(new ProcessWindowFunction<SourceModel, SinkModel, Long, TimeWindow>() {

private ValueState<Long> playingLiveStreamNumberValueState;

@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
this.playingLiveStreamNumberValueState = getRuntimeContext().getState(...);
}

@Override
public void process(Long bucket, Context context, Iterable<SourceModel> iterable,
Collector<SinkModel> collector) throws Exception {
Long playingLiveStreamNumber = this.playingLiveStreamNumberValueState.value();

if (null == playingLiveStreamNumber) {
playingLiveStreamNumber = 0L;
}

List<SourceModel> sourceModels = (List<SourceModel>) iterable;

for (SourceModel sourceModel : sourceModels) {
if (BizType.I == sourceModel.getBizType()) {
playingLiveStreamNumber++;
} else {
playingLiveStreamNumber--;
}
}

this.playingLiveStreamNumberValueState.update(playingLiveStreamNumber);

collector.collect(
SinkModel.builder().build()
);
}
});

SinkFactory.setSinkDataStream(...);

env.execute();
}

@Data
@Builder
static class SourceModel {
// 直播间id
private Long liveStreamId;
// 开播时间,关播时间
private Long time;
// 主播id
private Long authorId;
// binlog 时间戳
private long binlogTimestamp;
// 开播,关播
private BizType bizType;
}

enum BizType {
I, // 开播
D, // 关播
;
}

@Data
@Builder
static class SinkModel {
// 时间戳,汇总到分钟粒度
private Long timestamp;
// 指标名
private String metricName;
// 指标值
private double metricValue;
// 维度名
private String dimName;
// 维度值
private String dimValue;
}
}

总结

本文衔接上文,主要介绍直播间生产侧指标的建设,以当前分钟正在开播直播间数为代表举例。提出定义以及建设过程相关的问题,以这两个个问题出发,引出了以下两小节。

第一节简单介绍了当前分钟正在开播直播间数的定义。

第二节主要介绍了当前分钟正在开播直播间数的建设逻辑以及过程,并对状态这个概念进行了一个拓展介绍。

最后一节对本文进行了总结。

如果你也有相同的指标建设需求,或者存在一些指标建设过程中的问题,欢迎关注博主公众号,或者添加博主微信,互相交流~

记得点赞 + 再看喔~