Zookeeper启动和选举过程简要源码分析

按启动流程分析

zkServer.sh

位于QuorumPeerMain中的main方法

public static void main(String[] args) {
    QuorumPeerMain main = new QuorumPeerMain();
    try {
            main.initializeAndRun(args);
    }
    //catch...
}
protected void initializeAndRun(String[] args)
    throws ConfigException, IOException, AdminServerException
{
    //加载配置文件zoo.cfg
    QuorumPeerConfig config = new QuorumPeerConfig();
    if (args.length == 1) {
        config.parse(args[0]);
    }

    // 启动清除任务,主要清除旧的快照和日志文件
    DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config
            .getDataDir(), config.getDataLogDir(), config
            .getSnapRetainCount(), config.getPurgeInterval());
    purgeMgr.start();
    // 单机启动和集群启动
    if (args.length == 1 && config.isDistributed()) {
        runFromConfig(config);
    } else {
        LOG.warn("Either no config or no quorum defined in config, running "
                + " in standalone mode");
        // there is only server in the quorum -- run as standalone
        ZooKeeperServerMain.main(args);
    }
}

单机启动

public static void main(String[] args) {
    ZooKeeperServerMain main = new ZooKeeperServerMain();
    try {
        main.initializeAndRun(args);
    } 
    //catch...
}
protected void initializeAndRun(String[] args)
    throws ConfigException, IOException, AdminServerException
{
    try {
        //日志相关工具的加载
        ManagedUtil.registerLog4jMBeans();
    } catch (JMException e) {
        LOG.warn("Unable to register log4j JMX control", e);
    }
    //包装config
    ServerConfig config = new ServerConfig();
    if (args.length == 1) {
        config.parse(args[0]);
    } else {
        config.parse(args);
    }
    //使用config启动
    runFromConfig(config);
}
public void runFromConfig(ServerConfig config)
        throws IOException, AdminServerException {
    LOG.info("Starting server");
    FileTxnSnapLog txnLog = null;
    try {
        // 创建FileTxnLog和FileSnap实例,并保存刚启动时候日志数据
        txnLog = new FileTxnSnapLog(config.dataLogDir, config.dataDir);
        // 创建服务器
        final ZooKeeperServer zkServer = new ZooKeeperServer(txnLog,
                config.tickTime, config.minSessionTimeout, config.maxSessionTimeout, null);
        txnLog.setServerStats(zkServer.serverStats());

        // 注册关闭服务器的处理器
        final CountDownLatch shutdownLatch = new CountDownLatch(1);
        zkServer.registerServerShutdownHandler(
                new ZooKeeperServerShutdownHandler(shutdownLatch));

        // 启动 Admin server
        adminServer = AdminServerFactory.createAdminServer();
        adminServer.setZooKeeperServer(zkServer);
        adminServer.start();

        //启动NIOServerCnxnFactory
        boolean needStartZKServer = true;
        //从解析出的配置中配置NIOServerCnxnFactory
        if (config.getClientPortAddress() != null) {
            cnxnFactory = ServerCnxnFactory.createFactory();
            cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns(), false);
            //启动zookeeper server
            cnxnFactory.startup(zkServer);
            needStartZKServer = false;
        }
        if (config.getSecureClientPortAddress() != null) {
            secureCnxnFactory = ServerCnxnFactory.createFactory();
            secureCnxnFactory.configure(config.getSecureClientPortAddress(), config.getMaxClientCnxns(), true);
            secureCnxnFactory.startup(zkServer, needStartZKServer);
        }

        containerManager = new ContainerManager(zkServer.getZKDatabase(), zkServer.firstProcessor,
                Integer.getInteger("znode.container.checkIntervalMs", (int) TimeUnit.MINUTES.toMillis(1)),
                Integer.getInteger("znode.container.maxPerMinute", 10000)
        );
        containerManager.start();

        // 等到使用shutdownLatch的线程数清零就关闭服务器
        shutdownLatch.await();

        shutdown();

        if (cnxnFactory != null) {
            cnxnFactory.join();
        }
        if (secureCnxnFactory != null) {
            secureCnxnFactory.join();
        }
        if (zkServer.canShutdown()) {
            zkServer.shutdown(true);
        }
    } catch (InterruptedException e) {
        // warn, but generally this is ok
        LOG.warn("Server interrupted", e);
    } finally {
        if (txnLog != null) {
            txnLog.close();
        }
    }
}

集群启动

public void runFromConfig(QuorumPeerConfig config)
        throws IOException, AdminServerException
{   
    try {
        //日志相关工具的加载
        ManagedUtil.registerLog4jMBeans();
    } catch (JMException e) {
        LOG.warn("Unable to register log4j JMX control", e);
    }

    //创建NIOServerCnxnFactory
    LOG.info("Starting quorum peer");
    try {
        ServerCnxnFactory cnxnFactory = null;
        ServerCnxnFactory secureCnxnFactory = null;

        if (config.getClientPortAddress() != null) {
            cnxnFactory = ServerCnxnFactory.createFactory();
            cnxnFactory.configure(config.getClientPortAddress(),
                    config.getMaxClientCnxns(),
                    false);
        }

        if (config.getSecureClientPortAddress() != null) {
            secureCnxnFactory = ServerCnxnFactory.createFactory();
            secureCnxnFactory.configure(config.getSecureClientPortAddress(),
                    config.getMaxClientCnxns(),
                    true);
        }

        // quorumPeer是和集群相关的配置信息
        quorumPeer = getQuorumPeer();
        quorumPeer.setTxnFactory(new FileTxnSnapLog(
                    config.getDataLogDir(),
                    config.getDataDir()));
        quorumPeer.enableLocalSessions(config.areLocalSessionsEnabled());
        quorumPeer.enableLocalSessionsUpgrading(
            config.isLocalSessionsUpgradingEnabled());
        //quorumPeer.setQuorumPeers(config.getAllMembers());
        quorumPeer.setElectionType(config.getElectionAlg());
        quorumPeer.setMyid(config.getServerId());
        quorumPeer.setTickTime(config.getTickTime());
        quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());
        quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());
        quorumPeer.setInitLimit(config.getInitLimit());
        quorumPeer.setSyncLimit(config.getSyncLimit());
        quorumPeer.setConfigFileName(config.getConfigFilename());
        quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
        quorumPeer.setQuorumVerifier(config.getQuorumVerifier(), false);
        if (config.getLastSeenQuorumVerifier()!=null) {
            quorumPeer.setLastSeenQuorumVerifier(config.getLastSeenQuorumVerifier(), false);
        }
        quorumPeer.initConfigInZKDatabase();
        quorumPeer.setCnxnFactory(cnxnFactory);
        quorumPeer.setSecureCnxnFactory(secureCnxnFactory);
        quorumPeer.setSslQuorum(config.isSslQuorum());
        quorumPeer.setUsePortUnification(config.shouldUsePortUnification());
        quorumPeer.setLearnerType(config.getPeerType());
        quorumPeer.setSyncEnabled(config.getSyncEnabled());
        quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs());
        if (config.sslQuorumReloadCertFiles) {
            quorumPeer.getX509Util().enableCertFileReloading();
        }

        // sets quorum sasl authentication configurations
        quorumPeer.setQuorumSaslEnabled(config.quorumEnableSasl);
        if(quorumPeer.isQuorumSaslAuthEnabled()){
            quorumPeer.setQuorumServerSaslRequired(config.quorumServerRequireSasl);
            quorumPeer.setQuorumLearnerSaslRequired(config.quorumLearnerRequireSasl);
            quorumPeer.setQuorumServicePrincipal(config.quorumServicePrincipal);
            quorumPeer.setQuorumServerLoginContext(config.quorumServerLoginContext);
            quorumPeer.setQuorumLearnerLoginContext(config.quorumLearnerLoginContext);
        }
        quorumPeer.setQuorumCnxnThreadsSize(config.quorumCnxnThreadsSize);
        //初始化
        quorumPeer.initialize();
        //quorumPeer的start方法,这是一个线程
        quorumPeer.start();
        //等待quorumPeer线程结束
        quorumPeer.join();
    } catch (InterruptedException e) {
        // warn, but generally this is ok
        LOG.warn("Quorum Peer interrupted", e);
    }
}
public synchronized void start() {
    if (!getView().containsKey(myid)) {
        throw new RuntimeException("My id " + myid + " not in the peer list");
        }
    loadDataBase();
    startServerCnxnFactory();
    try {
        //启动adminServer
        adminServer.start();
    } catch (AdminServerException e) {
        LOG.warn("Problem starting AdminServer", e);
        System.out.println(e);
    }
    //开始leader选举,每个节点加入的时候都会进行一轮选举
    startLeaderElection();
    //开启线程执行run方法
    super.start();
}
public void run() {
    updateThreadName();
    //jmx监控相关
    //...

    try {
        //主循环 main loop
        while (running) {
            //有四个状态:LOOKING、OBSERVING、FOLLOWING、LEADING
            switch (getPeerState()) {
            case LOOKING:
                //刚启动的时候为LOOKING,正在选举
                LOG.info("LOOKING");
                //当前节点为只读节点,不关心
                if (Boolean.getBoolean("readonlymode.enabled")) {
                    //...
                //不是只读节点
                } else {
                    try {
                        reconfigFlagClear();
                        if (shuttingDownLE) {
                            shuttingDownLE = false;
                            startLeaderElection();
                            }
                        //leader选举
                        setCurrentVote(makeLEStrategy().lookForLeader());
                    } catch (Exception e) {
                        LOG.warn("Unexpected exception", e);
                        setPeerState(ServerState.LOOKING);
                    }                        
                }
                break;
            //case ...
}

leader选举

public Vote lookForLeader() throws InterruptedException {
    //jxm监控相关
    //...

    try {
        //记录当前server接受其他server的本轮投票信息
        HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();
        //选举接受后法定server的投票信息
        HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>();
        //选举超时时间
        int notTimeout = finalizeWait;

        synchronized(this){
            //更新逻辑时钟+1
            logicalclock.incrementAndGet();
            //初始化选票,给自己投一票,这里看出hashmap的key为myid
            updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
        }

        LOG.info("New election. My id =  " + self.getId() +
                ", proposed zxid=0x" + Long.toHexString(proposedZxid));
        //通知给所有其他server新的一票产生,放入sendqueue队列
        sendNotifications();

        //如果当前还是LOOKING并且没有停止一直接受选票直到产生新leader
        while ((self.getPeerState() == ServerState.LOOKING) &&
                (!stop)){

            //从recvqueue队列中阻塞取出投票信息(其他server发来的)
            Notification n = recvqueue.poll(notTimeout,
                    TimeUnit.MILLISECONDS);

            //判读选票信息是否为空
            if(n == null){
                //判断是否投递过选票,如果投递过,说明没有断开连接
                if(manager.haveDelivered()){
                    //重新发送选票信息
                    sendNotifications();
                //如果没有投递过,说明断开连接,重连
                } else {
                    manager.connectAll();
                }

                //将接收选票时限延长了一倍
                int tmpTimeOut = notTimeout*2;
                notTimeout = (tmpTimeOut < maxNotificationInterval?
                        tmpTimeOut : maxNotificationInterval);
                LOG.info("Notification time out: " + notTimeout);
            } 
            //检查选举voter和投票的leader是否合法
            else if (validVoter(n.sid) && validVoter(n.leader)) {
                //看voter的state状态
                switch (n.state) {
                //如果还是looking
                case LOOKING:
                    // 如果选票的epoch大于自己的逻辑时钟,说明选票是最新的,自己的选票这一轮已经过时
                    if (n.electionEpoch > logicalclock.get()) {
                        //更新自己的逻辑时钟,并清空当前收到的其他server的投票
                        logicalclock.set(n.electionEpoch);
                        recvset.clear();
                        //比较自己和选票中谁更适合做leader,根据zxid、peerEpoch做决定
                        //并生成新的选票
                        if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                                getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
                            updateProposal(n.leader, n.zxid, n.peerEpoch);
                        } else {
                            updateProposal(getInitId(),
                                    getInitLastLoggedZxid(),
                                    getPeerEpoch());
                        }
                        //发送新的投票给其他server
                        sendNotifications();
                    // 否则选票的epoch小于自己的逻辑时钟,说明该选票是过期的,不做操作
                    } else if (n.electionEpoch < logicalclock.get()) {
                        if(LOG.isDebugEnabled()){
                            LOG.debug("Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x"
                                    + Long.toHexString(n.electionEpoch)
                                    + ", logicalclock=0x" + Long.toHexString(logicalclock.get()));
                        }
                        break;
                    // 等于,则还是比较自己和选票中谁更适合做leader,并生成新的选票
                    } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                            proposedLeader, proposedZxid, proposedEpoch)) {
                        updateProposal(n.leader, n.zxid, n.peerEpoch);
                        sendNotifications();
                    }

                    if(LOG.isDebugEnabled()){
                        LOG.debug("Adding vote: from=" + n.sid +
                                ", proposed leader=" + n.leader +
                                ", proposed zxid=0x" + Long.toHexString(n.zxid) +
                                ", proposed election epoch=0x" + Long.toHexString(n.electionEpoch));
                    }

                    // 将收到的选票放入recvset
                    recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));

                    if (termPredicate(recvset,
                            new Vote(proposedLeader, proposedZxid,
                                    logicalclock.get(), proposedEpoch))) {

                        // Verify if there is any change in the proposed leader
                        while((n = recvqueue.poll(finalizeWait,
                                TimeUnit.MILLISECONDS)) != null){
                            if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                                    proposedLeader, proposedZxid, proposedEpoch)){
                                recvqueue.put(n);
                                break;
                            }
                        }

                        /*
                            * This predicate is true once we don't read any new
                            * relevant message from the reception queue
                            */
                        if (n == null) {
                            self.setPeerState((proposedLeader == self.getId()) ?
                                    ServerState.LEADING: learningState());
                            Vote endVote = new Vote(proposedLeader,
                                    proposedZxid, logicalclock.get(), 
                                    proposedEpoch);
                            leaveInstance(endVote);
                            return endVote;
                        }
                    }
                    break;
                case OBSERVING:
                    LOG.debug("Notification from observer: " + n.sid);
                    break;
                case FOLLOWING:
                case LEADING:
                   //再次判断选举epoch是否等于逻辑时钟....
            } else {
                if (!validVoter(n.leader)) {
                    LOG.warn("Ignoring notification for non-cluster member sid {} from sid {}", n.leader, n.sid);
                }
                if (!validVoter(n.sid)) {
                    LOG.warn("Ignoring notification for sid {} from non-quorum member sid {}", n.leader, n.sid);
                }
            }
        }
        return null;
    } finally {
        //jmx相关...
    }
}

原创文章,作者:彭晨涛,如若转载,请注明出处:https://www.codetool.top/article/zookeeper%e5%90%af%e5%8a%a8%e5%92%8c%e9%80%89%e4%b8%be%e8%bf%87%e7%a8%8b%e7%ae%80%e8%a6%81%e6%ba%90%e7%a0%81%e5%88%86%e6%9e%90/

发表评论

电子邮件地址不会被公开。