目录
目录
先带大家串一遍本篇文章的思路什么是状态?
发散思维的去思考状态,我们所理解的状态不仅仅只限于 flink 的状态,让大家了解到状态是一个无处不在的东西什么是全局一致性快照?和状态有什么关系?
全局一致性快照的一些生活、工作中应用的例子为什么需要一致性快照?全局一致性快照和 flink 的关系?
jvm GC,分布式应用做故障恢复(比如 flink),死锁检测等全局一致性快照的分布式应用举例
通过一个简单分布式应用介绍一下全局一致性状态是每时每刻都存在的。时间轴上的每一个时刻都存在一个全局一致性快照(类似拍照片)。flink 做 cp,sp,类似于每隔固定的时间从时间轴上的一个点拿出来这个时间点对应的一个全局一致性状态全局一致性快照的标准定义
假如说有两个事件,a 和 b,在绝对时间下,如果 a 发生在 b 之前,且 b 被包含在快照当中,那么则 a 事件或者其对快照产生的影响也被包含在快照当中怎么实现全局一致性快照?
同步去做,包括时钟同步、Stop-the-world,但是这两种方法都不可接受;
既然同步无法做,那如果异步能做出相同的全局一致性状态也可以分布式应用的全局一致性快照其 Process 状态和 Channel 状态到底需要记录什么?怎么记录 Channel 的状态?
不是必须要在同一时刻嘛,为啥还能异步去做?只要异步做出来的状态和同步做出来的状态效果一致也可以。并且详细分析了 process 和 channel 中的状态包括什么,以及记录 channel 状态的方法。分布式应用全局一致性快照算法流程总结
通过第 8 章节的分析,总结出一套分布式应用的通用异步全局一致性快照算法Chandy-Lamport 算法流程、例子
介绍 Chandy-Lamport 算法流程并以一个例子介绍其执行过程,并且说明和第 9 章节分析得出的方法之间的关系,两者之间互相是不冲突的flink 实现的全局一致性快照介绍
大致介绍 flink 的全局一致性快照,并且也说明了和第 9 章节得出的方法之间的关系,两者之间也是不冲突的分布式应用异步全局一致性快照方法、Chandy-Lamport 算法、flink 全局一致性快照之间的关系
Chandy-Lamport 算法、flink 全局一致性快照也满足第 8 章节的结论;并且 Chandy-Lamport 算法、flink 全局一致性快照是分布式应用异步全局一致性快照方法一种特殊形式参考文章
本文编写过程中参考的文章
什么是状态?(了解状态)
目标
首先想让大家发散思维的去思考状态?我们所理解的状态不仅仅只限于 flink 的状态。让大家了解到状态是一个普遍存在的东西
定义
就是当前计算需要依赖到之前计算的结果,那么之前计算的结果就是状态
举例
比如生活中的例子:为什么我知道我的面前放着一台电脑,因为眼睛接收到外界的图案,然后我的大脑接收到这个图案后,拿记忆中存储的图案进行对比,匹配得到这是电脑,那么记忆中存储的图案就是状态;还有比如日久生情,为什么感情会越来越深,因为今天的感情 = 今天积累的感情 + 以前积累的感情,以前积累的感情就是状态。这其中都存在状态
比如 web server 应用中的状态:打开 github 页面,列表展示了我的归属仓库。其中就是 web client 发了查询我的归属仓库请求,web server 接收到请求之后,然后去存储引擎中进行查询匹配返回。那么存储引擎中存储的内容就是状态
比如 flink 应用中的状态:要做当天 uid 去重,就要存储所有的 uid;要获取当前最大值,那么历史最大值就是状态
什么是全局一致性快照?(了解全局一致性快照)
- 全局:代表是一个分布式的
- 一致性快照:代表绝对时间的同一时刻的状态
- 相当于打开上帝视角,去观察同一时刻的应用所有的状态;这里的快照 = 状态,文章之后我可能会把这两个词混用,大家明白他们的意思一致即可
- 比如生活中的例子:比如拍了一个照片,那么照片的内容就是当时的一个全局一致性快照;每一个首脑都是一个进程,所有的进程的状态在同一时刻的组合就是一个全局一致性快照
- 比如分布式应用的例子:首先是一个分布式应用,它有多个进程分布在多个服务器上;其次,它在应用内部有自己的处理逻辑和状态;第三,应用间是可以互相通信的;第四,在这种分布式的应用,有内部状态,硬件可以通信的情况下;某一时刻的全局状态,就叫做全局的快照。
分布式应用某一时刻的全局一致性快照 = 各个 process 的本地状态 + channel 中正在传递的消息
介绍完了几个例子之后,我们来看看我们为什么需要一致性快照?
为什么需要一致性快照?全局一致性快照和 flink 的关系?
实时案例
- 做检查点(全局一致性快照)用来故障恢复,重点!!!重点!!!重点!!!就在于我们不必要从历史起点开始重跑所有的数据(其实这就是我们需要检查点的目的!!!);因为
- 流式应用的上游存储介质一般都不支持存储历史所有数据(比如上游为 kafka,kafka 不可能存储历史所有数据)
- 重跑时效性不能满足时效性要求(重跑历史数据的情况下,时效性是达不到要求的)
- 可以做任务的死锁检测
全局一致性快照和 flink 的关系
flink 的 cp 和 sp 实际上就是全局一致性快照在分布式应用中 flink 的一个具体实现。
全局一致性快照的分布式应用案例?
通过一个简单分布式应用介绍一下全局一致性状态是每时每刻都存在的。时间轴上的每一个时刻都存在一个全局一致性快照(可以用拍照片去类比)。
示例
下面分布式应用的一个示例:
每时每刻都存在全局一致性快照
上面这个只是四个时刻的四个快照,其实应用的每一个时刻都存在一个全局一致性快照。
全局一致性快照的标准定义
定义
假如说有两个事件,a 和 b,在绝对时间下,如果 a 发生在 b 之前,且 b 被包含在快照当中,那么则 a 也被包含在快照当中。满足这个条件的全局快照,就称为全局一致性快照。
楼主理解
就是如果将做了绝对时刻 T 的一个快照,那么这个绝对时刻 T 之前发生的所有事件以及其影响都会被包含在这个快照中
上文已经介绍了全局一致性快照的定义以及为什么我们需要全局一致性快照,那我们来放眼实际应用中,怎么才能做出一个满足生产实际要求的全局一致性快照呢?
怎么实现全局一致性快照?
实现方式主要分为同步实现方式和异步实现方式两类。
同步实现方式
- NTP: NTP服务器[Network Time Protocol(NTP)]是用来使计算机时间同步化的一种协议,它可以使计算机对其服务器或时钟源(如石英钟,GPS等等)做同步化,它可以提供高精准度的时间校正(LAN上与标准间差小于1毫秒,WAN上几十毫秒)
结论:无法实现 - Stop-The-World
结论:不满足需求,无法采用
如果同步实现方式不满足需求,那么能使用异步方式做到同步相同的快照也是可以满足需求的。
异步实现方式
在介绍 Chandy-Lamport 算法之前,我们先介绍一些理论和数学上的概念铺垫铺垫,帮助我们理解 Chandy-Lamport 算法。
这些概念主要就是介绍分布式应用的 Process 和 Channel 应该存储什么内容?以及怎样去存储这些内容?只有我们知道了要存储什么东西才好去设计和介绍具体算法嘛~
楼主也是看了很多博客才总结得到了这些条件!!!
分布式应用的全局一致性快照其 Process 状态和 Channel 状态记录了什么?怎么记录 Channel 的状态?
分布式应用要记录的状态
如下图案例 Single-Token conservation,是一个分布式应用,有 p 和 q 两个进程,p 可以通过 Channel pq(记为 Cpq) 向 q 发消息,q 可以通过 Channel qp(记为 Cqp) 向 p 发消息,其中有一个叫 token 的消息,在这个系统中一直不停的流转。
如之前所述,分布式应用的全局一致性快照包含 Process 状态和 Channel 状态
那么上图 Single-Token conservation 示例中的全局一致性快照 S = S(p) + S(Cpq) + S(q) + S(Cqp)
其中:
- S:全局一致性快照
- S(p):p 进程的状态
- S(Cpq):p 进程到 q 进程的 Channel 状态
- S(q):q 进程的状态
- S(Cqp):q 进程到 p 进程的 Channel 状态
问题:
这里就碰到了我们要分析的关键问题:做全局一致性快照时,其中 S(p),S(q) 好理解,但是 S(Cpq),S(Cqp) 到底记录了什么东西?应该记录什么东西?接下来详细讲讲我的理解
Process 状态应该记录什么内容?
记录和用户业务需求相关的状态内容,用到了关于状态的地方,进行记录就好了,这部分是好理解的。
举例:uid 去重就存储历史所有的 uid 就可以了
Channel 状态应该记录什么内容?
Single-Token conservation 的全局一致性快照
token 在 p 时(对应第一张图),这时的全局一致性快照为:
S(token-in-p) = S(p-token-in-p) + S(Cpq-token-in-p) + S(q-token-in-p) + S(Cqp-token-in-p)
其中:
- S(token-in-p):token 在 p 时,做的全局一致性快照
- S(p-token-in-p):token 在 p 时,p 进程的状态
- S(Cpq-token-in-p):token 在 p 时,p 进程到 q 进程的 Channel 状态
- S(q-token-in-p):token 在 p 时,q 进程的状态
- S(Cqp-token-in-p):token 在 p 时,q 进程到 p 进程的 Channel 状态
提出问题
注意,上述这个表达式其实是结论,这个结论是很好理解的,但是你有想过站在实际应用的角度去思考下面的问题吗?
问题1:为什么第一张图的全局一致性状态是 Process 和 Channel 做的快照都有 token-in-p 呢?
根据之前的拍照片的类比,当前这个绝对时刻做快照时,token 在 p;那么所有的 process 和 channel 记录状态时,token 都应该在 p。问题2:S(p-token-in-p) 好理解,在这个时刻 token 还没有从 p 发出去,p 做快照时肯定知道 token 还在 p;但是站在 Cpq 做状态的角度来说:
- Cpq 做状态时,怎么保障 Cpq 知道 token in p?需要我们探索下有什么方法怎么让 Cpq 在做状态时知道 token in p?
- 站在实际在应用中实现的角度时,满足怎样的数学条件(我们要开始实现一个真实的全局一致性快照啦,肯定会涉及到一些数据知识,别急,往后看,用到的数学知识并不复杂)才能做出一个 S(Cpq) ?
解决问题
分析 S(Cpq) 记录了什么内容?
这里我们简单先理解下,S(Cpq) 其实就是在 S(p) 和 S(q) 自己的状态做成时,还在 Channel pq 之间发送的那些消息。
那么我们怎么用数学的方式理解 Cpq 记录的这些消息以及 Process 和 Channel 做状态时需要满足的条件呢?让我们往下看
变量定义
- n:在 p 的状态记录前,p 记录的 p 发往 Cpq 的 msg 数;
- n′:在 Cpq 的状态记录前,Cpq 记录的 p 发往 Cpq 的 msg 数;
- m:在 q 的状态记录前,q 记录的 q 从 Cpq 中接收到的 msg 数;
- m′:在 Cpq 的状态记录前,Cpq 记录的 q 从 Cpq 中接收到的 msg 数;
结论
Cpq 记录 S(Cpq) 时,必然会有 n = n’ ≥ m = m’;(注意这是充分必要条件喔~)
即一个 Channel 要记录的状态是,它 sender 记录自己状态之前它所接收到的 msg 列表,再减去 receiver 记录自己状态之前它已经收到的 msg 列表,减去的之后的数据列表就是还在通道中的数据列表,这个列表是需要 Channel 作为状态记录下来的。
而如果 n′ = m′,那么 Channel c 中要记录的 msg 列表就是 empty 列表。如果 n′ > m′,那么要记录的列表是 (m′+1),…n′ 号消息对应的 msg 列表。
证明
首先是 n = n’,利用反证法:如果 n != n’,则会有两种情况:
- n > n’ 时:
- 可能会出现 n = 10(p 记录状态前,p 记录 p 发往 Cpq msg 数为 10(msg 编号 1 - 10));
- n’ = 7(Cpq 记录状态前,Cpq 记录 p 发往 Cpq 的 msg 数为 7(msg 编号 1 - 7));
- 那么假设 token 的编号为 9,就会出现 p 记录的状态为 S(p-token-in-Cpq),Cpq 记录的状态为 S(p-token-in-p),实际是不可能出现的;
- n < n’ 时:
- 可能会出现 n = 7(p 记录状态前,p 记录 p 发往 Cpq msg 数为 7(编号 1 - 7));
- n’ = 10(Cpq 记录状态前,Cpq 记录 p 发往 Cpq 的 msg 数为 10(编号 1 - 10));
- 那么假设 token 的编号为 9,就会出现 p 记录的状态为 S(p-token-in-p),Cpq 记录的状态为 S(p-token-in-Cpq),实际是不可能出现的;
- n = n’ 时:保障了无论什么情况下,只要 p 做出 S(p-token-in-p) 的状态时,因为 n = n’,代表 p 没有把 token 发出去,Cpq 也没有接受到 token,就能让 Cpq 也做出 S(Cpq-token-in-p);
然后是 m = m’,同样利用反证法
- m > m’ 时:
- 可能会出现 n = n’ = m > m’,q 记录状态前,Cpq 记录 q 从 Cpq 接收到的 msg 数为 10(编号 1 - 10,因为 n = n’ = m 也即 Cpq 记录的 p 发往 Cpq 的那些 msg);
- Cpq记录状态前,Cpq 记录的 q 从 Cpq 接收到的 msg 数为 7(编号 1 - 7);
- 那么假设 token 的编号为 9,就会出现 Cpq 记录的状态为 S(Cpq-token-in-Cpq),q 记录的状态为 S(q-token-in-p),实际是不可能出现的;
最后是 n’ ≥ m’ and n ≥ m:在任何一种情况下,做全局一致性快照时,都会有 Cpq 下游接收到的 msg 数不可能超过 p 发送给 Channel 的 msg 条数,即:n’ ≥ m’ 以及 n ≥ m(也可使用反证法证明)
分析到这里,上节提的两个问题也就被解决了。
为了帮大家更容易的理解全局一致性快照包含的内容,接下来我用伪代码描述一下,会比文字更好理解~
来段伪代码描述全局一致快照包含的内容
伪代码
1 | // S_all 即全局一致性快照 |
怎样去记录 S(Cpq)?
通过上面的分析,我们已经讨论得到了 S(Cpq) 都包含了什么内容,并且其之间要满足什么样的数学关系。但是在现实实际生活中,消息在 Channel 上乱飞时,我们是无法记录这些消息作为 Channel 的状态的。
但是这些消息终究会到达目的地,我们可以在消息的目的地去记录这些消息作为 Channel 的状态。即我们可以在 q 中记录 Channel pq 的 S(Cpq),在 p 中记录 Channel pq 的 S(Cqp)。
伪代码
顺便那么上面那段伪代码就可以简化为下面这样:
1 | // S_all 即全局一致性快照 |
记录 S(Cpq) 需要满足的条件
重点重点重点!!!
分析上面的伪代码后,我们可以发现,要得到 S_all,其中只有一个变量在进程做快照时不知道的,那就是 n_j_i(即第 i 个 channel 做快照前,接受到 j(上游) 的消息个数),别忘了 n = n‘,即也可以定义为 j 做快照前,j 发往 channel 的消息个数。那么实际上这个值 j process 是知道的,就代表 i 进程需要知道 j 告诉他 n_j_i 的值是多少。重点来了,当 i process 做完快照之后,直接发一个 marker 下去,这个 marker 不会对计算有任何影响(即不会对状态产生任何影响),marker 只是一个标识,j process 做完自己的快照之后,直到接收到 marker 之间的消息就是Channel ij 的状态。i 就是通过 marker 来告诉 j process n_j_i 的值是多少的。(其他的变量为什么都知道就不详细分析了,很容易理解)
分布式应用全局一致性快照算法流程总结
算法流程总结
- 发起快照:有一个 manager process(这个 manager 可以是所有 process 中的任意一个 process,也可以是一个单独的中央管理者)告诉所有的 process 说可以开始做状态了;
- 执行快照:所有 process 就开始记录自己本地的状态(非所有 input channel)了,记录完本地状态,然后发 marker 给下游所有的 channel,然后开始记录上游所有 input channel 的消息(直到接收到上游所有的 marker);
- 执行快照:每个 process 对于每一个 input channel 来说,都将自己做完状态后直到收到 marker 之间的消息记录下来,作为这个 input channel 的状态;
- 执行快照:当收到上游所有 marker 之后,这个 process 要记录的状态就全部得到了,然后告诉 manager process 说做完状态了;
- 终止快照:manager process 接收到所有 process 做完的消息之后,就标记所有的状态以及完成了。
算法流程示例
发起快照
- 有一个 manager process(这个 manager 可以是所有 process 中的任意一个 process,也可以是一个单独的中央管理者)告诉所有的 process 说可以开始做状态了;
执行快照
- 所有 process 就开始记录自己本地的状态(非所有 input channel)了,记录完本地状态,然后发 marker 给下游所有的 channel,然后开始记录上游所有 input channel 的消息(直到接收到上游所有的 marker);
- 每个 process 对于每一个 input channel 来说,都将自己做完状态后直到收到 marker 之间的消息记录下来,作为这个 input channel 的状态;
- 当收到上游所有 marker 之后,这个 process 要记录的状态就全部得到了,然后告诉 manager process 说做完状态了;
终止快照
- manager process 接收到所有 process 做完的消息之后,就标记所有的状态以及完成了。
Chandy-Lamport 算法流程、示例
算法流程
发起快照
解读:
- 本次快照的起始点,先把起始点的快照给做了,然后发出 marker(这个 marker 消息是干啥用的呢?没错,就是我们之前分析的结论),开始记录 input channel
执行快照
解读:
- Pi 记录本地快照,标记 Cki 为空:因为从 Cki 接收到了 marker,这时的状态是 Pk 刚刚做完快照,Pk 做完快照发往 Cki 的消息个数 = Pi 做完快照从 Cki 接收到的消息个数。即 n = n’ = m’ = m;即 Cki = [Empty];
- Pi 开始向所有 output channel 发 marker,开始记录除 Cki 之外的 input channel 消息,因为本地快照已经做完了;然后上游还有部分进程没有做完快照,为了记录除 Cki 之外的 input Channel 消息,
解读:
- 结合前一张图说的开始记录 input channel 消息,Pi 停止记录 Cki 的消息,同时将此前记录所有 Cki 收到的消息作为本次快照中的最终状态;n’ > m’,在 Pi 这里记录了 Cki 的状态,即 Cki = [m‘ + 1, m’ + 2…n]
终止快照
示例
Chandy-Lamport 与上节分布式应用全局一致性快照算法的异同
Chandy-Lamport 就是上节分布式应用全局一致性快照算法的其中一种特殊形式;分布式应用全局一致性快照算法中说的是每个 process 在接收到 manager 做快照的消息之后就直接可以开始记录状态了,而 Chandy-Lamport 其实就是把这个 manager 的消息用接收到的第一个 marker 消息给代替了,用数学表达式表示就是接收到第一个 marker 的 channel 的 n = n’ = m = m’,剩余的 channel 满足 n = n’ ≥ m = m’,并且 Chandy-Lamport 算法也都满足第 8 章节介绍的各种条件。
flink 实现的全局一致性快照介绍(flink 容错机制)
Chandy-Lamport 与 Flink之间的关系
Flink 是分布式系统,所以 Flink 会采用全局一致性快照的方式形成检查点,来支持故障恢复。Flink 的异步全局一致性快照算法跟 Chandy-Lamport 算法的区别主要有以下几点:
- 第一,Chandy-Lamport 支持强连通图,而 Flink支持弱连通图;
- 第二,Flink采用的是裁剪的(Tailored)Chandy-Lamport 异步快照算法;
- 第三,Flink的异步快照算法在DAG场景下不需要存储 Channel state,从而极大减少快照的存储空间。
flink 的容错机制
端到端的Exactly once
Exactly once 的意思是,作业结果总是正确的,但是很可能产出多次;所以它的要求是需要有可重放的 source。
端到端的 Exactly once,是指作业结果正确且只会被产出一次,它的要求除了有可重放的 source 外,还要求有事务型的 sink 和可以接收幂等的产出结果。
flink 的全局一致性快照
Barrier 对齐
flink 的全局一致性快照与上节分布式应用全局一致性快照算法的异同
flink 的全局一致性快照就是上节分布式应用全局一致性快照算法的其中一种特殊形式;分布式应用全局一致性快照算法中说的是每个 process 在接收到 manager 做快照的消息之后就直接可以开始记录状态了,而 flink 其实就是将各个 process 开始做状态的时间点设为了接收到上游 input channel 所有的 barrier,这样一个好处就是由于各个 process 是接收到了上游所有 barrier 之后开始的,用数学表达式表示其实就满足了 n = n’ = m = m’,就没有必要存储 channel 中的状态了;并且 flink 算法也都满足第 8 章节介绍的各种条件。
状态后端
JVM Heap
第一种,JVM Heap,它里面的数据是以Java对象形式存在的,读写也是以对象形式去完成的,所以速度很快。但是也存在两个弊端:第一个弊端,以对象方式存储所需的空间是磁盘上序列化压缩后的数据大小的很多倍,所以占用的内存空间很大;第二个弊端,虽然读写不用做序列化,但是在形成 snapshot 时需要做序列化,所以它的异步 snapshot 过程会比较慢。
RocksDB
第二种,RocksDB,这个类型在读写时就需要做序列化,所以它读写的速度比较慢。但是它有一个好处,基于LSM的数据结构在快照之后会形成 sst 文件,它的异步 checkpoint 过程就是文件拷贝的过程,CPU 消耗会比较低。