按启动流程分析

位于QuorumPeerMain中的main方法
public static void main(String[] args) {
QuorumPeerMain main = new QuorumPeerMain();
try {
main.initializeAndRun(args);
}
//catch...
}
protected void initializeAndRun(String[] args)
throws ConfigException, IOException, AdminServerException
{
//加载配置文件zoo.cfg
QuorumPeerConfig config = new QuorumPeerConfig();
if (args.length == 1) {
config.parse(args[0]);
}
// 启动清除任务,主要清除旧的快照和日志文件
DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config
.getDataDir(), config.getDataLogDir(), config
.getSnapRetainCount(), config.getPurgeInterval());
purgeMgr.start();
// 单机启动和集群启动
if (args.length == 1 && config.isDistributed()) {
runFromConfig(config);
} else {
LOG.warn("Either no config or no quorum defined in config, running "
+ " in standalone mode");
// there is only server in the quorum -- run as standalone
ZooKeeperServerMain.main(args);
}
}
单机启动
public static void main(String[] args) {
ZooKeeperServerMain main = new ZooKeeperServerMain();
try {
main.initializeAndRun(args);
}
//catch...
}
protected void initializeAndRun(String[] args)
throws ConfigException, IOException, AdminServerException
{
try {
//日志相关工具的加载
ManagedUtil.registerLog4jMBeans();
} catch (JMException e) {
LOG.warn("Unable to register log4j JMX control", e);
}
//包装config
ServerConfig config = new ServerConfig();
if (args.length == 1) {
config.parse(args[0]);
} else {
config.parse(args);
}
//使用config启动
runFromConfig(config);
}
public void runFromConfig(ServerConfig config)
throws IOException, AdminServerException {
LOG.info("Starting server");
FileTxnSnapLog txnLog = null;
try {
// 创建FileTxnLog和FileSnap实例,并保存刚启动时候日志数据
txnLog = new FileTxnSnapLog(config.dataLogDir, config.dataDir);
// 创建服务器
final ZooKeeperServer zkServer = new ZooKeeperServer(txnLog,
config.tickTime, config.minSessionTimeout, config.maxSessionTimeout, null);
txnLog.setServerStats(zkServer.serverStats());
// 注册关闭服务器的处理器
final CountDownLatch shutdownLatch = new CountDownLatch(1);
zkServer.registerServerShutdownHandler(
new ZooKeeperServerShutdownHandler(shutdownLatch));
// 启动 Admin server
adminServer = AdminServerFactory.createAdminServer();
adminServer.setZooKeeperServer(zkServer);
adminServer.start();
//启动NIOServerCnxnFactory
boolean needStartZKServer = true;
//从解析出的配置中配置NIOServerCnxnFactory
if (config.getClientPortAddress() != null) {
cnxnFactory = ServerCnxnFactory.createFactory();
cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns(), false);
//启动zookeeper server
cnxnFactory.startup(zkServer);
needStartZKServer = false;
}
if (config.getSecureClientPortAddress() != null) {
secureCnxnFactory = ServerCnxnFactory.createFactory();
secureCnxnFactory.configure(config.getSecureClientPortAddress(), config.getMaxClientCnxns(), true);
secureCnxnFactory.startup(zkServer, needStartZKServer);
}
containerManager = new ContainerManager(zkServer.getZKDatabase(), zkServer.firstProcessor,
Integer.getInteger("znode.container.checkIntervalMs", (int) TimeUnit.MINUTES.toMillis(1)),
Integer.getInteger("znode.container.maxPerMinute", 10000)
);
containerManager.start();
// 等到使用shutdownLatch的线程数清零就关闭服务器
shutdownLatch.await();
shutdown();
if (cnxnFactory != null) {
cnxnFactory.join();
}
if (secureCnxnFactory != null) {
secureCnxnFactory.join();
}
if (zkServer.canShutdown()) {
zkServer.shutdown(true);
}
} catch (InterruptedException e) {
// warn, but generally this is ok
LOG.warn("Server interrupted", e);
} finally {
if (txnLog != null) {
txnLog.close();
}
}
}
集群启动
public void runFromConfig(QuorumPeerConfig config)
throws IOException, AdminServerException
{
try {
//日志相关工具的加载
ManagedUtil.registerLog4jMBeans();
} catch (JMException e) {
LOG.warn("Unable to register log4j JMX control", e);
}
//创建NIOServerCnxnFactory
LOG.info("Starting quorum peer");
try {
ServerCnxnFactory cnxnFactory = null;
ServerCnxnFactory secureCnxnFactory = null;
if (config.getClientPortAddress() != null) {
cnxnFactory = ServerCnxnFactory.createFactory();
cnxnFactory.configure(config.getClientPortAddress(),
config.getMaxClientCnxns(),
false);
}
if (config.getSecureClientPortAddress() != null) {
secureCnxnFactory = ServerCnxnFactory.createFactory();
secureCnxnFactory.configure(config.getSecureClientPortAddress(),
config.getMaxClientCnxns(),
true);
}
// quorumPeer是和集群相关的配置信息
quorumPeer = getQuorumPeer();
quorumPeer.setTxnFactory(new FileTxnSnapLog(
config.getDataLogDir(),
config.getDataDir()));
quorumPeer.enableLocalSessions(config.areLocalSessionsEnabled());
quorumPeer.enableLocalSessionsUpgrading(
config.isLocalSessionsUpgradingEnabled());
//quorumPeer.setQuorumPeers(config.getAllMembers());
quorumPeer.setElectionType(config.getElectionAlg());
quorumPeer.setMyid(config.getServerId());
quorumPeer.setTickTime(config.getTickTime());
quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());
quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());
quorumPeer.setInitLimit(config.getInitLimit());
quorumPeer.setSyncLimit(config.getSyncLimit());
quorumPeer.setConfigFileName(config.getConfigFilename());
quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
quorumPeer.setQuorumVerifier(config.getQuorumVerifier(), false);
if (config.getLastSeenQuorumVerifier()!=null) {
quorumPeer.setLastSeenQuorumVerifier(config.getLastSeenQuorumVerifier(), false);
}
quorumPeer.initConfigInZKDatabase();
quorumPeer.setCnxnFactory(cnxnFactory);
quorumPeer.setSecureCnxnFactory(secureCnxnFactory);
quorumPeer.setSslQuorum(config.isSslQuorum());
quorumPeer.setUsePortUnification(config.shouldUsePortUnification());
quorumPeer.setLearnerType(config.getPeerType());
quorumPeer.setSyncEnabled(config.getSyncEnabled());
quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs());
if (config.sslQuorumReloadCertFiles) {
quorumPeer.getX509Util().enableCertFileReloading();
}
// sets quorum sasl authentication configurations
quorumPeer.setQuorumSaslEnabled(config.quorumEnableSasl);
if(quorumPeer.isQuorumSaslAuthEnabled()){
quorumPeer.setQuorumServerSaslRequired(config.quorumServerRequireSasl);
quorumPeer.setQuorumLearnerSaslRequired(config.quorumLearnerRequireSasl);
quorumPeer.setQuorumServicePrincipal(config.quorumServicePrincipal);
quorumPeer.setQuorumServerLoginContext(config.quorumServerLoginContext);
quorumPeer.setQuorumLearnerLoginContext(config.quorumLearnerLoginContext);
}
quorumPeer.setQuorumCnxnThreadsSize(config.quorumCnxnThreadsSize);
//初始化
quorumPeer.initialize();
//quorumPeer的start方法,这是一个线程
quorumPeer.start();
//等待quorumPeer线程结束
quorumPeer.join();
} catch (InterruptedException e) {
// warn, but generally this is ok
LOG.warn("Quorum Peer interrupted", e);
}
}
public synchronized void start() {
if (!getView().containsKey(myid)) {
throw new RuntimeException("My id " + myid + " not in the peer list");
}
loadDataBase();
startServerCnxnFactory();
try {
//启动adminServer
adminServer.start();
} catch (AdminServerException e) {
LOG.warn("Problem starting AdminServer", e);
System.out.println(e);
}
//开始leader选举,每个节点加入的时候都会进行一轮选举
startLeaderElection();
//开启线程执行run方法
super.start();
}
public void run() {
updateThreadName();
//jmx监控相关
//...
try {
//主循环 main loop
while (running) {
//有四个状态:LOOKING、OBSERVING、FOLLOWING、LEADING
switch (getPeerState()) {
case LOOKING:
//刚启动的时候为LOOKING,正在选举
LOG.info("LOOKING");
//当前节点为只读节点,不关心
if (Boolean.getBoolean("readonlymode.enabled")) {
//...
//不是只读节点
} else {
try {
reconfigFlagClear();
if (shuttingDownLE) {
shuttingDownLE = false;
startLeaderElection();
}
//leader选举
setCurrentVote(makeLEStrategy().lookForLeader());
} catch (Exception e) {
LOG.warn("Unexpected exception", e);
setPeerState(ServerState.LOOKING);
}
}
break;
//case ...
}
leader选举
public Vote lookForLeader() throws InterruptedException {
//jxm监控相关
//...
try {
//记录当前server接受其他server的本轮投票信息
HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();
//选举接受后法定server的投票信息
HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>();
//选举超时时间
int notTimeout = finalizeWait;
synchronized(this){
//更新逻辑时钟+1
logicalclock.incrementAndGet();
//初始化选票,给自己投一票,这里看出hashmap的key为myid
updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
}
LOG.info("New election. My id = " + self.getId() +
", proposed zxid=0x" + Long.toHexString(proposedZxid));
//通知给所有其他server新的一票产生,放入sendqueue队列
sendNotifications();
//如果当前还是LOOKING并且没有停止一直接受选票直到产生新leader
while ((self.getPeerState() == ServerState.LOOKING) &&
(!stop)){
//从recvqueue队列中阻塞取出投票信息(其他server发来的)
Notification n = recvqueue.poll(notTimeout,
TimeUnit.MILLISECONDS);
//判读选票信息是否为空
if(n == null){
//判断是否投递过选票,如果投递过,说明没有断开连接
if(manager.haveDelivered()){
//重新发送选票信息
sendNotifications();
//如果没有投递过,说明断开连接,重连
} else {
manager.connectAll();
}
//将接收选票时限延长了一倍
int tmpTimeOut = notTimeout*2;
notTimeout = (tmpTimeOut < maxNotificationInterval?
tmpTimeOut : maxNotificationInterval);
LOG.info("Notification time out: " + notTimeout);
}
//检查选举voter和投票的leader是否合法
else if (validVoter(n.sid) && validVoter(n.leader)) {
//看voter的state状态
switch (n.state) {
//如果还是looking
case LOOKING:
// 如果选票的epoch大于自己的逻辑时钟,说明选票是最新的,自己的选票这一轮已经过时
if (n.electionEpoch > logicalclock.get()) {
//更新自己的逻辑时钟,并清空当前收到的其他server的投票
logicalclock.set(n.electionEpoch);
recvset.clear();
//比较自己和选票中谁更适合做leader,根据zxid、peerEpoch做决定
//并生成新的选票
if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
updateProposal(n.leader, n.zxid, n.peerEpoch);
} else {
updateProposal(getInitId(),
getInitLastLoggedZxid(),
getPeerEpoch());
}
//发送新的投票给其他server
sendNotifications();
// 否则选票的epoch小于自己的逻辑时钟,说明该选票是过期的,不做操作
} else if (n.electionEpoch < logicalclock.get()) {
if(LOG.isDebugEnabled()){
LOG.debug("Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x"
+ Long.toHexString(n.electionEpoch)
+ ", logicalclock=0x" + Long.toHexString(logicalclock.get()));
}
break;
// 等于,则还是比较自己和选票中谁更适合做leader,并生成新的选票
} else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
proposedLeader, proposedZxid, proposedEpoch)) {
updateProposal(n.leader, n.zxid, n.peerEpoch);
sendNotifications();
}
if(LOG.isDebugEnabled()){
LOG.debug("Adding vote: from=" + n.sid +
", proposed leader=" + n.leader +
", proposed zxid=0x" + Long.toHexString(n.zxid) +
", proposed election epoch=0x" + Long.toHexString(n.electionEpoch));
}
// 将收到的选票放入recvset
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
if (termPredicate(recvset,
new Vote(proposedLeader, proposedZxid,
logicalclock.get(), proposedEpoch))) {
// Verify if there is any change in the proposed leader
while((n = recvqueue.poll(finalizeWait,
TimeUnit.MILLISECONDS)) != null){
if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
proposedLeader, proposedZxid, proposedEpoch)){
recvqueue.put(n);
break;
}
}
/*
* This predicate is true once we don't read any new
* relevant message from the reception queue
*/
if (n == null) {
self.setPeerState((proposedLeader == self.getId()) ?
ServerState.LEADING: learningState());
Vote endVote = new Vote(proposedLeader,
proposedZxid, logicalclock.get(),
proposedEpoch);
leaveInstance(endVote);
return endVote;
}
}
break;
case OBSERVING:
LOG.debug("Notification from observer: " + n.sid);
break;
case FOLLOWING:
case LEADING:
//再次判断选举epoch是否等于逻辑时钟....
} else {
if (!validVoter(n.leader)) {
LOG.warn("Ignoring notification for non-cluster member sid {} from sid {}", n.leader, n.sid);
}
if (!validVoter(n.sid)) {
LOG.warn("Ignoring notification for sid {} from non-quorum member sid {}", n.leader, n.sid);
}
}
}
return null;
} finally {
//jmx相关...
}
}
原创文章,作者:彭晨涛,如若转载,请注明出处:https://www.codetool.top/article/zookeeper%e5%90%af%e5%8a%a8%e5%92%8c%e9%80%89%e4%b8%be%e8%bf%87%e7%a8%8b%e7%ae%80%e8%a6%81%e6%ba%90%e7%a0%81%e5%88%86%e6%9e%90/