ZK(zookeeper)是微服务解决方案中拥有服务注册发现最为核心的环境,是微服务的基石 。作为服务注册发现模块,并不是只有ZK一种产品,目前得到行业认可的还有:Eureka、Consul 。这里我们只聊ZK,这个工具本身很小zip包就几兆,安装非常傻瓜,能够支持集群部署 。

文章插图
背景在集群环境下ZK的leader&follower的概念,已经节点异常ZK面临的问题以及如何解决 。ZK本身是JAVA语言开发,也开源到Github上但官方文档对内部介绍的很少,零散的博客很多,有些写的很不错 。
ZK节点状态角色ZK集群单节点状态(每个节点有且只有一个状态),ZK的定位一定需要一个leader节点处于lading状态 。
- looking:寻找leader状态,当前集群没有leader,进入leader选举流程 。
- following:跟随者状态,接受leading节点同步和指挥 。
- leading:领导者状态 。
- observing:观察者状态,表明当前服务器是observer 。
ZK集群中事务处理是leader负责,follower会转发到leader来统一处理 。简单理解就是ZK的写统一leader来做,读可以follower处理,这也就是CAP理论中ZK更适合读多写少的服务 。过半选举算法ZK投票处理策略
投票信息包含 :所选举leader的Serverid,Zxid,SelectionEpoch
- Epoch判断,自身logicEpoch与SelectionEpoch判断:大于、小于、等于 。
- 优先检查ZXID 。ZXID比较大的服务器优先作为Leader 。
- 如果ZXID相同,那么就比较myid 。myid较大的服务器作为Leader服务器 。
ZK中有三种选举算法,分别是LeaderElection,FastLeaderElection,AuthLeaderElection,FastLeaderElection和AuthLeaderElection是类似的选举算法,唯一区别是后者加入了认证信息,FastLeaderElection比LeaderElection更高效,后续的版本只保留FastLeaderElection 。
理解:
在集群环境下多个节点启动,ZK首先需要在多个节点中选出一个节点作为leader并处于Leading状态,这样就面临一个选举问题,同时选举规则是什么样的 。“过半选举算法”:投票选举中获得票数过半的节点胜出,即状态从looking变为leading,效率更高 。
官网资料描述:Clustered (Multi-Server) Setup,如下图:

文章插图
以5台服务器讲解思路:
- 服务器1启动,此时只有它一台服务器启动了,它发出去的Vote没有任何响应,所以它的选举状态一直是LOOKING状态;
- 服务器2启动,它与最开始启动的服务器1进行通信,互相交换自己的选举结果,由于两者都没有历史数据,所以id值较大的服务器2胜出,但是由于没有达到超过半数以上的服务器都同意选举它(这个例子中的半数以上是3),所以服务器1,2还是继续保持LOOKING状态.
- 服务器3启动,根据前面的理论,分析有三台服务器选举了它,服务器3成为服务器1,2,3中的老大,所以它成为了这次选举的leader.
- 服务器4启动,根据前面的分析,理论上服务器4应该是服务器1,2,3,4中最大的,但是由于前面已经有半数以上的服务器选举了服务器3,所以它只能接收当小弟的命了.
- 服务器5启动,同4一样,当小弟.
假设5台中挂了2台(3、4),其中leader也挂掉:
leader和follower间有检查心跳,需要同步数据 Leader节点挂了,整个Zookeeper集群将暂停对外服务,进入新一轮Leader选举1)服务器1、2、5发现与leader失联,状态转为looking,开始新的投票
2)服务器1、2、5分别开始投票并广播投票信息,自身Epoch自增;
3) 服务器1、2、5分别处理投票,判断出leader分别广播
4)根据投票处理逻辑会选出一台(2票过半)
5)各自服务器重新变更为leader、follower状态
6)重新提供服务
源码解析:
/** * Starts a new round of leader election. Whenever our QuorumPeer * changes its state to LOOKING, this method is invoked, and it * sends notifications to all other peers. */public Vote lookForLeader() throws InterruptedException {try {self.jmxLeaderElectionBean = new LeaderElectionBean();MBeanRegistry.getInstance().register(self.jmxLeaderElectionBean, self.jmxLocalPeerBean);} catch (Exception e) {LOG.warn("Failed to register with JMX", e);self.jmxLeaderElectionBean = null;}self.start_fle = Time.currentElapsedTime();try {Map<Long, Vote> recvset = new HashMap<Long, Vote>();Map<Long, Vote> outofelection = new HashMap<Long, Vote>();int notTimeout = minNotificationInterval;synchronized (this) {logicalclock.incrementAndGet();updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());}LOG.info("New election. My id =" + self.getId() + ", proposed zxid=0x" + Long.toHexString(proposedZxid));sendNotifications();SyncedLearnerTracker voteSet;/** Loop in which we exchange notifications until we find a leader*/while ((self.getPeerState() == ServerState.LOOKING) && (!stop)) {/** Remove next notification from queue, times out after 2 times* the termination time*/Notification n = recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS);/** Sends more notifications if haven't received enough.* Otherwise processes new notification.*/if (n == null) {if (manager.haveDelivered()) {sendNotifications();} else {manager.connectAll();}/** Exponential backoff*/int tmpTimeOut = notTimeout * 2;notTimeout = (tmpTimeOut < maxNotificationInterval ? tmpTimeOut : maxNotificationInterval);LOG.info("Notification time out: " + notTimeout);} else if (validVoter(n.sid) && validVoter(n.leader)) {/** Only proceed if the vote comes from a replica in the current or next* voting view for a replica in the current or next voting view.*/switch (n.state) {case LOOKING:if (getInitLastLoggedZxid() == -1) {LOG.debug("Ignoring notification as our zxid is -1");break;}if (n.zxid == -1) {LOG.debug("Ignoring notification from member with -1 zxid {}", n.sid);break;}// If notification > current, replace and send messages outif (n.electionEpoch > logicalclock.get()) {logicalclock.set(n.electionEpoch);recvset.clear();if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {updateProposal(n.leader, n.zxid, n.peerEpoch);} else {updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());}sendNotifications();} else if (n.electionEpoch < logicalclock.get()) {if (LOG.isDebugEnabled()) {LOG.debug("Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x" + Long.toHexString(n.electionEpoch)+ ", logicalclock=0x" + Long.toHexString(logicalclock.get()));}break;} else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) {updateProposal(n.leader, n.zxid, n.peerEpoch);sendNotifications();}if (LOG.isDebugEnabled()) {LOG.debug("Adding vote: from=" + n.sid+ ", proposed leader=" + n.leader+ ", proposed zxid=0x" + Long.toHexString(n.zxid)+ ", proposed election epoch=0x" + Long.toHexString(n.electionEpoch));}// don't care about the version if it's in LOOKING staterecvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));voteSet = getVoteTracker(recvset, new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch));if (voteSet.hasAllQuorums()) {// Verify if there is any change in the proposed leaderwhile ((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) != null) {if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) {recvqueue.put(n);break;}}/** This predicate is true once we don't read any new* relevant message from the reception queue*/if (n == null) {setPeerState(proposedLeader, voteSet);Vote endVote = new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch);leaveInstance(endVote);return endVote;}}break;case OBSERVING:LOG.debug("Notification from observer: {}", n.sid);break;case FOLLOWING:case LEADING:/** Consider all notifications from the same epoch* together.*/if (n.electionEpoch == logicalclock.get()) {recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));voteSet = getVoteTracker(recvset, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));if (voteSet.hasAllQuorums() && checkLeader(outofelection, n.leader, n.electionEpoch)) {setPeerState(n.leader, voteSet);Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch);leaveInstance(endVote);return endVote;}}/** Before joining an established ensemble, verify that* a majority are following the same leader.*/outofelection.put(n.sid, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));voteSet = getVoteTracker(outofelection, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));if (voteSet.hasAllQuorums() && checkLeader(outofelection, n.leader, n.electionEpoch)) {synchronized (this) {logicalclock.set(n.electionEpoch);setPeerState(n.leader, voteSet);}Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch);leaveInstance(endVote);return endVote;}break;default:LOG.warn("Notification state unrecoginized: " + n.state + " (n.state), " + n.sid + " (n.sid)");break;}} else {if (!validVoter(n.leader)) {LOG.warn("Ignoring notification for non-cluster member sid {} from sid {}", n.leader, n.sid);}if (!validVoter(n.sid)) {LOG.warn("Ignoring notification for sid {} from non-quorum member sid {}", n.leader, n.sid);}}}return null;} finally {try {if (self.jmxLeaderElectionBean != null) {MBeanRegistry.getInstance().unregister(self.jmxLeaderElectionBean);}} catch (Exception e) {LOG.warn("Failed to unregister with JMX", e);}self.jmxLeaderElectionBean = null;LOG.debug("Number of connection processing threads: {}", manager.getConnectionThreadCount());}}/** We return true if one of the following three cases hold:* 1- New epoch is higher* 2- New epoch is the same as current epoch, but new zxid is higher* 3- New epoch is the same as current epoch, new zxid is the same*as current zxid, but server id is higher.*/return ((newEpoch > curEpoch) || ((newEpoch == curEpoch) && ((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId)))));
推荐阅读
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- 5GHz WiFi中的GHz是什么意思?
- 超实用的tomcat启动脚本实现
- 写了多年代码,你却不知道的程序设计的5个底层逻辑
- 22个超详细的 JS 数组方法
- 四种Python爬虫常用的定位元素方法对比,你偏爱哪一款?
- PHP中操作数据库的预处理语句
- 你应该知道的常用排序算法之快速排序
- LRU和LFU的区别
- 你知道Linux中用户们的密码藏在哪儿吗?
- 为什么我们要培养孩子的财商-?儿童财商培养
