memberlist源码分析
December 6, 2020
gossip简介
Gossip protocol 也叫 Epidemic Protocol (流行病协议),实际上它还有很多别名,比如:“流言算法”、“疫情传播算法”等。这个协议的作用就像其名字表示的意思一样,非常容易理解,它的方式其实在我们日常生活中也很常见,比如电脑病毒的传播,森林大火,细胞扩散等等。
Gossip的基本思想一个节点想要分享一些信息给网络中的其他的一些节点。于是,它周期性的随机选择一些节点,并把信息传递给这些节点。这些收到信息的节点接下来会做同样的事情,即把这些信息传递给其他一些随机选择的节点。一般而言,信息会周期性的传递给N个目标节点,而不只是一个。这个N被称为fan-out(扇出)。
数据一致性问题的背景&目标
问题:
- 如何在多网络中将多个节点实现并保持数据一致性。
- 能够随着节点数量的增加而优雅地扩展。
考量的因素:
- 单节点信息变更后,更新传播到所有其他节点所需的时间
- 传播单个更新时生成的网络流量
机制
在《Epidemic Algorithms for Replicated Database Maintenance》论文中主要论述了直接邮寄(direct mail)、反熵传播(anti-entropy)、谣言传播(rumor mongering)三种机制来实现数据更新**。Gossip协议主要是通过反熵传播(anti-entropy)、谣言传播(rumor mongering)实现的**。
直接邮寄(direct mail)
每个节点更新都会立即从其变更节点邮寄通知到所有其他节点。
机制 | 直接邮寄(direct mail) |
时间复杂度 | O(n),n为节点数 |
网络流量 | m*n,m为更新消息数,n为节点数 |
优点 | 更新效率高 |
缺点 | 不完全可靠,存在信息传递丢失风险 |
当节点有数据更新便开始遍历节点池,遍历发送其他所有节点消息来通知自身节点数据的更新情况,实现算法较为简单。由于是一次性遍历通知,在遇到网络通信故障、节点宕机之后恢复等现实情况时没有办法容错和补偿,这是较为致命性的地方,因此极端情况下它是无法保证分布式环境下各节点数据一致性的。
反熵传播(anti-entropy)
每个节点都会定期随机选择节点池中的一些节点,通过交换数据内容来解决两者之间的任何差异。节点只有两种状态,病原(Suspective)和感染(Infective),因此称作SI模型,一般叫做简易流行病(simple epidemics) 。
在 SI model 下,一个节点会把所有的数据都跟其他节点共享,以便消除节点之间数据的任何不一致,它可以保证最终、完全的一致。由于在 SI model 下消息会不断反复的交换,因此消息数量是非常庞大的,无限制的(unbounded),这对一个系统来说是一个巨大的开销。通常只用于新加入节点的数据初始化。
机制 | 反熵传播(anti-entropy) |
时间复杂度 | O(log2n),n为节点数 |
网络流量 | O((m*n)t),n为节点数,m为更新消息数,t为周期数 |
优点 | 1.可靠 2.定时重复 3.可容错 |
缺点 | 1.消息冗余 2.消息延迟 3.网络流量耗费较多 |
反熵(anti-entropy) 这种和直接邮寄(direct mail)相比的最大特点就是解决了消息丢失无法补偿容错导致的数据无法保持一致的致命问题。它通过单点的定时随机通知周边节点进行数据交互的方式保持各节点之间数据的一致性。这里需要注意的是,一致性的保持是在节点数据变更后一段时间内通过节点间的数据交互逐渐完成的最终一致,并且由于每个节点都定期广播数据到周边随机的一部分节点,因此在数据交互上是存在冗余和延迟的。
谣言传播(rumor mongering)
交互如下:
- 所有的节点在最开始没有产生数据变更时都假设是未知状态,它是不知道任何谣言信息的
- 当节点收到其他节点更新数据通知时,相当于听到了一条谣言,并将其视为热门开始传播给周边节点
- 当某个节点谣言盛行时,它会定期随机选择其他节点,并确保另一个节点知道
- 当某个节点发现周边节点都知道这个谣言时,该节点将停止将该谣言视为热点,并保留更新,而不会进一步传播
谣言传播(rumor mongering)中的节点状态有Suspective(病原)、Infective(感染)、Removed(愈除),因此称作SIR模型,一般叫做复杂流行病(complex epidemics) 。
- 消息生产节点即为Suspective(病原)状态
- 消息接收节点即为Infective(感染)状态,会进行消息传播
- 节点接收消息后即为Removed(愈除)状态,不再进行传播
Rumor Mongering(SIR Model) 模型下,消息可以发送得更频繁,因为消息只包含最新 update,体积更小。而且,一个 Rumor 消息在某个时间点之后会被标记为 removed,并且不再被传播,因此,SIR model 下,系统有一定的概率会不一致。
而由于,SIR Model 下某个时间点之后消息不再传播,因此消息是有限的,系统开销小。
机制 | 谣言传播(rumor mongering) |
时间复杂度 | O(log2n),n为节点数 |
网络流量 | O((m*n)t),n为节点数(递减),m为更新消息数,t为周期数 |
优点 | 1.可靠 2.定时重复 3.可容错 |
缺点 | 1.消息冗余 2.消息延迟 3.网络流量耗费较多 |
总结
Gossip协议通过反熵传播(anti-entropy)和谣言传播(rumor mongering)两种机制进行实现并保证节点数据的最终一致性。
机制 | 工作原理 | 工作时机 |
反熵传播(anti-entropy) | 以固定的概率传播所有的数据 | 新加入节点初始化 |
谣言传播(rumor mongering) | 仅传播新到达的数据 | 节点增量数据 |
这篇文章对gossip协议更详细的描述。
另外gossip-simulator可能会对协议理解更有帮助。
通信模式
拉方式(pull)
- ① A仅将数据 key, version 推送给 B
- ② B 将本地比 A 新的数据(Key, value, version)推送给 A
- ③ 回传 B 数据给 A
- ④ A 更新本地
推方式(push)
- ① 节点 A 将数据 (key,value,version) 及对应的版本号推送给 B 节点
- ② B 节点更新 A 中比自己新的数据
推拉方式(push&pull)
- ① A仅将数据 key, version 推送给 B
- ② B 将本地比 A 新的数据(Key, value, version)推送给 A
- ③ 回传 B 数据给 A
- ④ A 更新本地
- ⑤ 推送比 B 新的数据
- ⑥ 更新 B
从消息复杂度来看,Push(1次) < Pull(2次) < Pull&Push(3次)
从时间复杂度来看,Pull > Push
从算法实现上来看,无论是基于何种模式进行数据交互,Gossip都是基于平方概率计算进行最终一致性收敛的
从运行效果上来看,推拉模式(Push/Pull) 最好,理论上一个周期内可以使两个节点完全一致,收敛速度也是最快的,消息通信损耗也是最多的。
memberlist实现
功能
memberlist是一个基于gossip(变种)协议的实现,主要使用场景:
- 分布式网络的分区节点管理和故障探测。如服务发现组件Consul使用
- 分布式网络的数据同步
memberlist提供了一些Deligate API,可以用作节点间的数据同步。用法可以参考这个asim/memberlist。实际上关注到memberlist也是因为在预研分布式任务调度系统dkron过程中了解到它有使用memberlist来实现节点间的数据同步。
源码分析
goroutine分布
从goroutine分布可以大致了解memberlist的工作原理。goroutine主要分为两类:
定时器协程
定时器协程主要用于memberlist生命过程中一些常规的定时任务,memberlist.schedule() 会启动三个主要的定时器协程,分别介绍如下:
- Probe timer:执行探测逻辑(Ping)。UDP,也可以强制启用TCP。固定周期。每次只会挑选一个节点进行探测。
probe的目标是探测节点的存活状态。大致流程:
- 每次挑选一个节点,组装一个唯一sequence的ping包
- 设置超时callback,避免对端节点无响应。
- 执行ping,并等待应答
- 根据应答or超时结果,执行indirect ping、tcp ping
- 根据ack数量,更新节点delta和状态。detal反映了自身的健康程度,delta越大,自己可能出问题的概率越大,ping等待应答的时间越长。
- pushPullTrigger timer:用于两个节点间的全量数据同步。TCP协议+动态实时调整。请求时携带本地全部节点数据,响应中也会携带对端所拥有的节点信息。这样一次交互就实现了一次完整的数据同步。对应gossip协议中的反熵传播(anti-entropy)实现。
工作流程:
- 挑选一个其他节点,准备执行pushPullNode操作。
- 创建tcp连接
- 发送本地全部nodes列表(包含状态)给到对方,并取对方返回nodes列表及数据
- 处理pullPushMsg,将对端返回的nodes列表状态与本地进行合并,详见下方tcpListen()中pushPullMsg的处理流程。
- Gossip timer:挑选部分随机节点进行广播推送。采用UDP。对应协议中的谣言传播(rumor mongering)。
周期GossipInterval执行gossip,大致流程:
- 挑选若干(m.config.GossipNodes)随机节点(过滤left节点)
- 组件广播并批量发送。不需要等待应答?
监听协程
程序在启动之初(NewNetTransport()),会创建一个tcp和一个udp的监听协程,TCP主要用作节点间的全量状态数据同步。UDP主要用作节点间状态探测。其中一部分消息类型(比如ping)在两条通道上都有可能出现。:
- go tcpListen(): TCP监听,用于接收新的node连接,并写入到独立协程去处理。
TCP连接消息的处理见handleConn(),主要处理的是pushPullTrigger timer的节点状态同步消息。里面也包括了用户侧的全量用户数据。消息类型以及处理流程简介:
- userMsg:用户消息要用作上层业务的用户数据传递。
asim/memberlist项目中可以看到如何利用broadcast传播用户数据。实际上在将用户数据通过broadcasts.QueueBroadcast(data)写入时并没有触发实时的网络数据同步,而是写入了一个有限优先级队列TransmitLimitedQueue中,然后通过两种方式携带出去:网络探测消息或者gossip定时器,整个过程都是异步的。
问题:异步如何保证数据不会丢失?
broadcast消息无法完全保证。udp本就有一定的丢失率,但是只要有一个节点成功,数据就会同步到其他节点。如果100%丢失就不行了。如果需要可以调用其他保证可靠性的接口。broadcast一般用作增量数据的同步,memberlist还提供了同步全量数据的能力,通过pushPullMsg的tcp连接同步对端节点来保证可靠性,以达到最终一致性的目的。
- pushPullMsg: 用作全量节点状态同步,由对端pushPullTrigger time定期触发。
收到对端的pushPullMsg之后,首先会将本地的nodes状态也应答给对方,这样就完整了一次完整的全量数据交换。此过程中可以携带用户数据,一般用作调用方全量数据同步。
然后会将本地nodes状态与对端nodes状态逐一聚合。聚合流程跟节点当前状态有关,大致如下:
stateAlive:插入本地nodes列表,同时对stateSuspect、stateDead状态的节点进行特殊处理。
stateLeft:如果为本机需要refute操作,否则广播通知其他节点。
stateDead / stateSuspect:首次会广播给其他节点。同时也会对通知源节点数计数,一段时间内超过一定阈值(实时计算)会将节点设置为dead。
- pingMsg
tcp的ping。默认是关闭的,可以在config中启用,供UDP异常网络环境时使用。
- userMsg:用户消息要用作上层业务的用户数据传递。
- go udpListen():udp监听,处理节点探测的消息和gossip消息。
udp类型的消息比较多,为了提升处理性能考虑,避免各类相互影响,拆分为packetListen、packetHandler两部分。各种探测消息类型简介如下:
- pingMsg
Probe timer发起,探测节点是否正常。收到ping之后会立即回复ackRespMsg,并携带广播(其实基本大部分消息都会携带广播)。
- ackRespMsg
ping消息的应答。
- indirectPingMsg
当ping无应答时,会挑选若干节点进发送indirectPingMsg,携带目标节点,以佐证是否本地路由问题。
本机收到indirectPing指令时,会向指定的目标节点发送ping消息:如果有收到回包,则发送ackRespMsg给源节点。如果超时则回复nackRespMsg消息给到源节点,表明自己导目标节点的状态异常。
- nackRespMsg
收到nackRespMsg到一定阈值时,会将节点置未_stateSuspect_状态,并告知其他节点。
- suspectMsg
收到对端传来的suspectMsg表明节点很可能异常了。如果节点为自己,执行refute操作。否则转发,并判断是否进入stateDead状态。
- aliveMsg
合入本地节点,并broadcast其他节点。
- deadMsg
跟suspectMsg处理流程一致。
- userMsg
通知上层处理增量数据。
- pingMsg
状态合并
节点状态:集群中每个节点可能存在以下4种状态。
- stateAlive:节点存活。
- stateLeft:节点已主动离开
- stateDead / stateSuspect:节点已经挂了。Suspect只是一个中间态,当两个节点间路由异常时,可能会出现误判的情形,需要其他节点来一起佐只有满足条件才能确定为dead。
介绍几种典型的转换流程:
stateAlive -> stateSuspect
- 节点A(本地)启动,初试状态为stateAlive。通过制定目标节点B(目标)加入集群成功,节点B在A中状态为stateAlive
- A定期发送pingMsg指令,且超时时间内没有收到应答ackRespMsg。
- 随机挑选N个其他节点,发送indirectPingMsg,并等待应答nackRespMsg
- 当收到一定数量的nackRespMsg时,可以确定目标节点B已经异常。于是将B设置stateSuspect状态,并广播给其他节点。
stateSuspect -> stateDead
- 在A中节点B刚进入stateSuspect状态之初,会启动一个对应的定时器suspectTimer
- 将B已进入suspect消息广播给其他节点。
- 如果B收到了suspect消息,需要执行refute操作,告知其他节点B已恢复正常。如果中间过程中有其他节点通过Ping消息主动发现B已经恢复正常,也会广播告知。
- suspectTimer超时,节点B如果状态及更新时间都没有变化,则认为节点已经处于stateDead状态。
- 删除对应suspectTimer,广播告知其他节点
一些细节
memberlist的整体流程及状态转换其实很简单,但在一些细节的处理上考虑比较多。
广播:广播类数据的占比比较高,且很多为重复消息,为了尽可能减少网络交互,做了一些细节上的优化:
- 优先队列:控制了整体的报文数据量,当超过最大值时会丢弃数据。另外对报文也区分了优先级。
- compoundMsg+异步:内部交互协议上做了优化,在package长度有限的情况下,尽可能通过一次交互携带尽可能多的数据。如在ping的过程中将队列中的广播消息也一起携带出去。
awareness:两个节点间的通信失败,很难判断是本机节点还是对端节点的网络异常,所以引入了类似健康度的概念。健康度越低,表明本地网络状态可能越差,执行对应网络请求时设置的超时时间就越长。
比如在ping失败后默认是会将本地健康度降低,因为无法排除为自己问题,但是当执行indirectPing并收到其他节点的回复后会再度调整。