实时新增类指标标准化处理方案

实时新增类指标标准化处理方案

实时指标整个链路开发过程中的一些经验。

实时新增类指标

大体上可以将实时新增类指标以以下两种维度进行分类。

identity id 类型维度

identity id 类型 备注
number(long) 类型 identity id 数值类型 identity id 的好处在于可以使用 Bitmap 类组件做到精确去重。
字符类型 identity id 字符类型 identity id 去重相对复杂,有两种方式,在误差允许范围之内使用 BloomFilter 进行去重,或者使用 key-value 组件进行精确去重。

产出数据类型维度

产出数据类型 备注
明细类数据 此类数据一般是要求将新增的数据明细产出,uv 的含义是做过滤,产出的明细数据中的 identity id 不会有重复。输出明细数据的好处在于,我们可以在下游使用 OLAP 引擎对明细数据进行各种维度的聚合计算,从而很方便的产出不同维度下的 uv 数据。
聚合类数据 将一个时间窗口内的 uv 进行聚合,并且可以计算出分维度的 uv,其产出数据一般都是[维度 + uv_count],但是这里的维度一般情况下是都是固定维度。如果需要拓展则需要改动源码。

计算链路

因此新增产出的链路多数就是以上两种维度因子的相互组合。

number(long) 类型 identity id

使用 RoaringBitmap 的 uv 计算链路

代码示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
public interface RoaringBitmapDuplicateable<Model> {

long DEFAULT_DUPLICATE_MILLS = 24 * 3600 * 1000L;

BiPredicate<Long, Long> ROARING_BIT_MAP_CLEAR_BI_PREDICATE =
(start, end) -> end - start >= DEFAULT_DUPLICATE_MILLS;

// 初始化
default ValueState<Tuple2<Long, Roaring64NavigableMap>> getBitMapValueState(String name) {
return this.getRuntimeContext().getState(
new ValueStateDescriptor<>(name, TypeInformation.of(
new TypeHint<Tuple2<Long, Roaring64NavigableMap>>() { }))
);
}

RuntimeContext getRuntimeContext();

long getLongId(Model model);

Optional<Logger> getLogger();

default BiPredicate<Long, Long> roaringBitMapClearBiPredicate() {
return ROARING_BIT_MAP_CLEAR_BI_PREDICATE;
}

default List<Model> duplicateAndGet(List<Model> models, long windowStartTimestamp
, ValueState<Tuple2<Date, Roaring64NavigableMap>> bitMapValueState) throws IOException {

Tuple2<Date, Roaring64NavigableMap> bitMap = checkAndGetState(windowStartTimestamp, bitMapValueState);

Map<Long, Model> idModelsMap = models
.stream()
.collect(Collectors.toMap(this::getLongId, Function.identity(), (oldOne, newOne) -> oldOne));

Set<Long> ids = idModelsMap.keySet();

List<Model> newModels = Lists.newArrayList();
for (Long id : ids) {
if (!bitMap.f1.contains(id)) {
if (idModelsMap.containsKey(id)) {
newModels.add(idModelsMap.get(id));
}
}
}

newModels.stream()
.map(this::getLongId)
.forEach(bitMap.f1::add);
bitMapValueState.update(bitMap);
return newModels;
}

default long duplicateAndCount(List<Model> models, long windowStartTimestamp
, ValueState<Tuple2<Long, Roaring64NavigableMap>> bitMapValueState) throws IOException {

Tuple2<Long, Roaring64NavigableMap> bitMap = checkAndGetState(windowStartTimestamp, bitMapValueState);

Set<Long> ids = models
.stream()
.map(this::getLongId)
.collect(Collectors.toSet());

List<Long> newIds = Lists.newArrayList();
int count = 0;
for (Long id : ids) {
if (!bitMap.f1.contains(id)) {
newIds.add(id);
count++;
}
}

newIds.forEach(bitMap.f1::add);
bitMapValueState.update(bitMap);
return count;
}

default Tuple2<Long, Roaring64NavigableMap> checkAndGetState(long windowStartTimestamp
, ValueState<Tuple2<Long, Roaring64NavigableMap>> bitMapValueState) throws IOException {

Tuple2<Long, Roaring64NavigableMap> bitmap = bitMapValueState.value();

if (null == bitmap) {

this.getLogger().ifPresent(logger ->
logger.info("New RoaringBitMapValueState Timestamp={}", windowStartTimestamp));
Tuple2<Long, Roaring64NavigableMap> newBitMap = Tuple2.of(windowStartTimestamp, new Roaring64NavigableMap());
bitMapValueState.update(newBitMap);
return newBitMap;

} else if (this.roaringBitMapClearBiPredicate().test(bitmap.f0, windowStartTimestamp)) {

this.getLogger().ifPresent(logger ->
logger.info("Clear RoaringBitMapValueState, from start={} to end={}", bitmap.f0, windowStartTimestamp));

bitMapValueState.clear();
bitmap.f1.clear();
Tuple2<Long, Roaring64NavigableMap> newBitMap = Tuple2.of(windowStartTimestamp, new Roaring64NavigableMap());
bitMapValueState.update(newBitMap);
return newBitMap;
} else {
return bitmap;
}
}
}

字符类型 identity id

使用 flink state 的 uv 计算链路

使用 key-value 外存

使用 key-value 的 uv 计算链路

如果选用的是 Redis 作为 key-value 过滤,那么这里会有一个巧用 Redis bit 特性的优化。举一个一般场景下的方案与使用 Redis bit 特性的方案做对比:

场景:假如需要同一天有几十场活动,并且都希望计算出这几十场活动的 uv,那么我们就可以按照下图设计 Redis bit 结构。

通常方案:

使用 Redis 的 多 uv 指标计算链路

这种场景下,如果有 1 亿用户,需要同时计算 50 个活动或者 50 个不同维度下的 uv。那么理论上最大 key 数量为 1 亿 * 50 = 50 亿个 key。

Redis bit 方案:

使用 Redis bit 特性的 多 uv 指标计算链路

这样做的一个优点,就是这几十场活动的 uv 计算都使用了相同的 Redis key 来计算,可以大幅度减少 Redis 的容量占用。使用此方案的话,以上述相同的用户和活动场数,理论上最大
key 数量仅仅为 1 亿,只是 value 数量会多占几十个 bit。