Zookeeper中的Watcher机制与源码分析

watcher机制概述

Zookeeper中非常重要的一个机制就是watcher,在zkCli中可以在get、ls、ls2、stat 等操作中添加watch,从而监听节点变化,起到在分布式系统中收到消息通知作用。

Watcher机制包括三部分:注册、存储、通知

注册: 注册Watcher
存储: Watcher对象存在客户端的WatcherManager中
通知: 服务端触发Watcher事件,通知客户端,客户端从WatcherManager中取出对应的Watcher对象执行回调

Zookeeper中的Watcher机制与源码分析

源码分析

客户端注册

我们通过zkCli的执行命令流程来看一下watch起到了什么作用:

zkCli.sh

找到对应的类下的main方法:

public static void main(String args[]) throws CliException, IOException, InterruptedException
{
    ZooKeeperMain main = new ZooKeeperMain(args);
    main.run();//跟进去
}
void run() throws CliException, IOException, InterruptedException {
    if (cl.getCommand() == null) {
        System.out.println("Welcome to ZooKeeper!");
        boolean jlinemissing = false;
        try {
            //这一部分使用反射生成ConsoleReader实例读取命令行指令
            Class<?> consoleC = Class.forName("jline.console.ConsoleReader");
            Class<?> completorC =
                Class.forName("org.apache.zookeeper.JLineZNodeCompleter");

            System.out.println("JLine support is enabled");

            Object console =
                consoleC.getConstructor().newInstance();

            Object completor =
                completorC.getConstructor(ZooKeeper.class).newInstance(zk);
            Method addCompletor = consoleC.getMethod("addCompleter",
                    Class.forName("jline.console.completer.Completer"));
            addCompletor.invoke(console, completor);
            //执行指令
            String line;
            Method readLine = consoleC.getMethod("readLine", String.class);
            while ((line = (String)readLine.invoke(console, getPrompt())) != null) {
                //执行!此处跟进
                executeLine(line);
            }
        }//catch ...
}
public void executeLine(String line) throws CliException, InterruptedException, IOException {
    if (!line.equals("")) {
        //将命令字符串转换成MyCommandOptions对象
        cl.parseCommand(line);
        //添加命令到历史记录
        addToHistory(commandCount,line);
        //执行命令,此处跟进
        processCmd(cl);
        commandCount++;
    }
}
protected boolean processCmd(MyCommandOptions co) throws CliException, IOException, InterruptedException {
    boolean watch = false;
    try {
        //执行命令,此处跟进
        watch = processZKCmd(co);
        exitCode = 0;
    } catch (CliException ex) {
        exitCode = ex.getExitCode();
        System.err.println(ex.getMessage());
    }
    return watch;
}
protected boolean processZKCmd(MyCommandOptions co) throws CliException, IOException, InterruptedException {
    String[] args = co.getArgArray();
    String cmd = co.getCommand();
    //命令没输入
    if (args.length < 1) {
        usage();
        throw new MalformedCommandException("No command entered");
    }
    //不存在该指令
    if (!commandMap.containsKey(cmd)) {
        usage();
        throw new CommandNotFoundException("Command not found " + cmd);
    }

    boolean watch = false;
    LOG.debug("Processing " + cmd);

    //quit等无实际操作的指令
    if (cmd.equals("quit")) {
        zk.close();
        System.exit(exitCode);
    } else if (cmd.equals("redo") && args.length >= 2) {
        Integer i = Integer.decode(args[1]);
        if (commandCount <= i || i < 0) { // don't allow redoing this redo
            throw new MalformedCommandException("Command index out of range");
        }
        cl.parseCommand(history.get(i));
        if (cl.getCommand().equals("redo")) {
            throw new MalformedCommandException("No redoing redos");
        }
        history.put(commandCount, history.get(i));
        processCmd(cl);
    } else if (cmd.equals("history")) {
        for (int i = commandCount - 10; i <= commandCount; ++i) {
            if (i < 0) continue;
            System.out.println(i + " - " + history.get(i));
        }
    } else if (cmd.equals("printwatches")) {
        if (args.length == 1) {
            System.out.println("printwatches is " + (printWatches ? "on" : "off"));
        } else {
            printWatches = args[1].equals("on");
        }
    } else if (cmd.equals("connect")) {
        if (args.length >= 2) {
            connectToZK(args[1]);
        } else {
            connectToZK(host);
        }
    }

    // Below commands all need a live connection
    if (zk == null || !zk.getState().isAlive()) {
        System.out.println("Not connected");
        return false;
    }

    // 如果是正常的操作命令,使用cliCmd的子类执行
    CliCommand cliCmd = commandMapCli.get(cmd);
    if(cliCmd != null) {
        cliCmd.setZk(zk);
        watch = cliCmd.parse(args).exec();//此处跟进
    } else if (!commandMap.containsKey(cmd)) {
            usage();
    }
    return watch;
}

这里我们选择cliCmd的子类GetCommand跟进exec方法:

public boolean exec() throws CliException {
    //根据是否具有'-w'参数判断是否监听
    boolean watch = cl.hasOption("w");
    String path = args[1];
    Stat stat = new Stat();
    byte data[];
    try {
        //使用getData命令获取数据,此处跟进
        data = zk.getData(path, watch, stat);
    } catch (IllegalArgumentException ex) {
        throw new MalformedPathException(ex.getMessage());
    } catch (KeeperException|InterruptedException ex) {
        throw new CliException(ex);
    }
    data = (data == null) ? "null".getBytes() : data;
    out.println(new String(data));
    if (cl.hasOption("s")) {
        new StatPrinter(out).print(stat);
    }
    return watch;
}
public byte[] getData(String path, boolean watch, Stat stat)
        throws KeeperException, InterruptedException {
    //可以看到这里有一个watchManager.defaultWatcher,它是一个默认的watcher
    return getData(path, watch ? watchManager.defaultWatcher : null, stat);
}
public byte[] getData(final String path, Watcher watcher, Stat stat)
    throws KeeperException, InterruptedException
    {
    final String clientPath = path;
    PathUtils.validatePath(clientPath);

    // the watch contains the un-chroot path
    WatchRegistration wcb = null;
    if (watcher != null) {
        //如果watcher不为空注册watcher,此处跟进,返回一个WatchRegistration
        wcb = new DataWatchRegistration(watcher, clientPath);
    }

    final String serverPath = prependChroot(clientPath);

    RequestHeader h = new RequestHeader();
    h.setType(ZooDefs.OpCode.getData);
    GetDataRequest request = new GetDataRequest();
    request.setPath(serverPath);
    request.setWatch(watcher != null);
    GetDataResponse response = new GetDataResponse();
    //此处用到了返回的WatchRegistration,跟进
    ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
    if (r.getErr() != 0) {
        throw KeeperException.create(KeeperException.Code.get(r.getErr()),
                clientPath);
    }
    if (stat != null) {
        DataTree.copyStat(response.getStat(), stat);
    }
    return response.getData();
}
public DataWatchRegistration(Watcher watcher, String clientPath) {
    super(watcher, clientPath);//调用WatchRegistration的构造方法
}
public WatchRegistration(Watcher watcher, String clientPath)
{   
    //包含一个watcher和一个监听地址
    this.watcher = watcher;
    this.clientPath = clientPath;
}
public ReplyHeader submitRequest(RequestHeader h, Record request,
        Record response, WatchRegistration watchRegistration)
        throws InterruptedException {
    return submitRequest(h, request, response, watchRegistration, null);
}
public ReplyHeader submitRequest(RequestHeader h, Record request,
        Record response, WatchRegistration watchRegistration,
        WatchDeregistration watchDeregistration)
        throws InterruptedException {
    ReplyHeader r = new ReplyHeader();
    //使用watchRegistration构建了一个queuePacket
    Packet packet = queuePacket(h, r, request, response, null, null, null,
            null, watchRegistration, watchDeregistration);
    synchronized (packet) {
        if (requestTimeout > 0) {
            // Wait for request completion with timeout
            waitForPacketFinish(r, packet);
        } else {
            // Wait for request completion infinitely
            while (!packet.finished) {
                packet.wait();
            }
        }
    }
    if (r.getErr() == Code.REQUESTTIMEOUT.intValue()) {
        sendThread.cleanAndNotifyState();
    }
    return r;
}
public Packet queuePacket(RequestHeader h, ReplyHeader r, Record request,
        Record response, AsyncCallback cb, String clientPath,
        String serverPath, Object ctx, WatchRegistration watchRegistration,
        WatchDeregistration watchDeregistration) {
    Packet packet = null;

    // Note that we do not generate the Xid for the packet yet. It is
    // generated later at send-time, by an implementation of ClientCnxnSocket::doIO(),
    // where the packet is actually sent.
    packet = new Packet(h, r, request, response, watchRegistration);
    packet.cb = cb;
    packet.ctx = ctx;
    packet.clientPath = clientPath;
    packet.serverPath = serverPath;
    packet.watchDeregistration = watchDeregistration;
    // The synchronized block here is for two purpose:
    // 1. synchronize with the final cleanup() in SendThread.run() to avoid race
    // 2. synchronized against each packet. So if a closeSession packet is added,
    // later packet will be notified.
    synchronized (state) {
        if (!state.isAlive() || closing) {
            conLossPacket(packet);
        } else {
            // If the client is asking to close the session then
            // mark as closing
            if (h.getType() == OpCode.closeSession) {
                closing = true;
            }
            outgoingQueue.add(packet);
        }
    }
    //将该packet添加到了sendThread的ClientCnxnSocket中
    sendThread.getClientCnxnSocket().packetAdded();
    return packet;
}

在将packet添加到了sendThreadClientCnxnSocket中之后,注意SendThread是不停地在doIO,而这里packetAdded之后还会调用selector的wakeup()方法,通知服务端这边有一个packet发过去了,并且在finishPacket之后,向ZooKeeper中的一个静态内部类ZKWatchManager内的hashmap添加了这个watcher

private final Map<String, Set<Watcher>> dataWatches =
    new HashMap<String, Set<Watcher>>();
private final Map<String, Set<Watcher>> existWatches =
    new HashMap<String, Set<Watcher>>();
private final Map<String, Set<Watcher>> childWatches =
    new HashMap<String, Set<Watcher>>();

服务端存储

那么服务端干了什么呢?

研究服务端源码可以发现,在服务端是leading状态时,通过调用LeaderZooKeeperServer的startup方法,里面再调用ZooKeeperServer中的startup方法,在调用setupRequestProcessors()进行请求处理:

protected void setupRequestProcessors() {
    RequestProcessor finalProcessor = new FinalRequestProcessor(this);
    RequestProcessor syncProcessor = new SyncRequestProcessor(this,
            finalProcessor);
    ((SyncRequestProcessor)syncProcessor).start();
    firstProcessor = new PrepRequestProcessor(this, syncProcessor);
    ((PrepRequestProcessor)firstProcessor).start();
}

这里面有一个PrepRequestProcessor和一个封装了FinalRequestProcessor的SyncRequestProcessor,通过阅读源码发现FinalRequestProcessor的processRequest中

public void processRequest(Request request) {
        //prehandling...
        switch (request.type) {
        //case...
        case OpCode.getData: {
            lastOp = "GETD";
            GetDataRequest getDataRequest = new GetDataRequest();
            ByteBufferInputStream.byteBuffer2Record(request.request,
                    getDataRequest);
            DataNode n = zks.getZKDatabase().getNode(getDataRequest.getPath());
            if (n == null) {
                throw new KeeperException.NoNodeException();
            }
            PrepRequestProcessor.checkACL(zks, zks.getZKDatabase().aclForNode(n),
                    ZooDefs.Perms.READ,
                    request.authInfo);
            Stat stat = new Stat();
            //通过getWatch获取是否监视,如果监视传递一个ServerCnxn对象,继承自Watcher
            byte b[] = zks.getZKDatabase().getData(getDataRequest.getPath(), stat,
                    getDataRequest.getWatch() ? cnxn : null);
            rsp = new GetDataResponse(b, stat);
            break;
        }
        //case...
public byte[] getData(String path, Stat stat, Watcher watcher)
throws KeeperException.NoNodeException {
    return dataTree.getData(path, stat, watcher);
}
public byte[] getData(String path, Stat stat, Watcher watcher)
        throws KeeperException.NoNodeException {
    DataNode n = nodes.get(path);
    if (n == null) {
        throw new KeeperException.NoNodeException();
    }
    synchronized (n) {
        n.copyStat(stat);
        if (watcher != null) {
            //添加监视,跟进
            dataWatches.addWatch(path, watcher);
        }
        return n.data;
    }
}
synchronized void addWatch(String path, Watcher watcher) {
    //使用watchTable记录某路径所有的监视器
    HashSet<Watcher> list = watchTable.get(path);
    if (list == null) {
        // don't waste memory if there are few watches on a node
        // rehash when the 4th entry is added, doubling size thereafter
        // seems like a good compromise
        list = new HashSet<Watcher>(4);
        watchTable.put(path, list);
    }
    list.add(watcher);

    HashSet<String> paths = watch2Paths.get(watcher);
    if (paths == null) {
        // cnxns typically have many watches, so use default cap here
        paths = new HashSet<String>();
        watch2Paths.put(watcher, paths);
    }
    paths.add(path);
}

这样就把监视器和path关联起来了,而服务端可以通过刚刚传入的ServerCnxn联系客户端。

服务端通知

而如果数据发生变动无非就是处理了set命令,找到对应的函数:

public Stat setData(String path, byte[] data, int version, long zxid, long time) throws NoNodeException {
    Stat s = new Stat();
    DataNode n = (DataNode)this.nodes.get(path);
    if (n == null) {
        throw new NoNodeException();
    } else {
        byte[] lastdata = null;
        byte[] lastdata;
        // 赋值node
        synchronized(n) {
            lastdata = n.data;
            n.data = data;
            n.stat.setMtime(time);
            n.stat.setMzxid(zxid);
            n.stat.setVersion(version);
            n.copyStat(s);
        }

        String lastPrefix;
        if ((lastPrefix = this.getMaxPrefixWithQuota(path)) != null) {
            this.updateBytes(lastPrefix, (long)((data == null ? 0 : data.length) - (lastdata == null ? 0 : lastdata.length)));
        }

        // 触发watcher
        this.dataWatches.triggerWatch(path, EventType.NodeDataChanged);
        return s;
    }
}
public Set<Watcher> triggerWatch(String path, EventType type) {
    return this.triggerWatch(path, type, (Set)null);
}

public Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {
    WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path);
    HashSet watchers;

    synchronized(this) {
        //移除watcher!
        watchers = (HashSet)this.watchTable.remove(path);
        if (watchers == null || watchers.isEmpty()) {
            if (LOG.isTraceEnabled()) {
                ZooTrace.logTraceMessage(LOG, 64L, "No watchers for " + path);
            }

            return null;
        }

        Iterator i= watchers.iterator();

        while(i.hasNext()) {
            //移除watcher!
            Watcher w = (Watcher)i.next();
            HashSet<String> paths = (HashSet)this.watch2Paths.get(w);
            if (paths != null) {
                paths.remove(path);
            }
        }
    }

    Iterator i = watchers.iterator();

    while(true) {
        Watcher w;
        do {
            if (!i.hasNext()) {
                return watchers;
            }

            w = (Watcher)i.next();
        } while(supress != null && supress.contains(w));

        // watcher调用,这里的e对象里只有通知状态(KeeperState)、事件类型(EventType)以及节点路径(Path)
        // 没有修改过后的新值也没有老的值
        w.process(e);
    }
}
public synchronized void process(WatchedEvent event) {
   // 请求头标记-1,表明是通知
    ReplyHeader h = new ReplyHeader(-1, -1L, 0);
    if (LOG.isTraceEnabled()) {
        ZooTrace.logTraceMessage(LOG, 64L, "Deliver event " + event + " to 0x" + Long.toHexString(this.sessionId) + " through " + this);
    }

    WatcherEvent e = event.getWrapper();
    // 发送通知给客户端
    this.sendResponse(h, e, "notification");
}

在此处就把监听的事件类型和状态返回给了客户端,那么再返回客户端看看客户端的处理

客户端处理事件(依客户端而定)

在客户端的SendThread中调用了doTransport->doIO->readResponse来接收服务器的响应

void readResponse(ByteBuffer incomingBuffer) throws IOException {
    ByteBufferInputStream bbis = new ByteBufferInputStream(
            incomingBuffer);
    BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
    ReplyHeader replyHdr = new ReplyHeader();

    replyHdr.deserialize(bbia, "header");
    //if replyHdr.getXid() == ...

    // 如果是通知
    if (replyHdr.getXid() == -1) {
        // -1 means notification
        if (LOG.isDebugEnabled()) {
            LOG.debug("Got notification sessionid:0x"
                + Long.toHexString(sessionId));
        }
        //反序列化
        WatcherEvent event = new WatcherEvent();
        event.deserialize(bbia, "response");

        // 从服务器地址转换为客户端地址
        if (chrootPath != null) {
            String serverPath = event.getPath();
            if(serverPath.compareTo(chrootPath)==0)
                event.setPath("/");
            else if (serverPath.length() > chrootPath.length())
                event.setPath(serverPath.substring(chrootPath.length()));
            else {
                LOG.warn("Got server path " + event.getPath()
                        + " which is too short for chroot path "
                        + chrootPath);
            }
        }

        WatchedEvent we = new WatchedEvent(event);
        if (LOG.isDebugEnabled()) {
            LOG.debug("Got " + we + " for sessionid 0x"
                    + Long.toHexString(sessionId));
        }
        //交给eventThread线程处理
        eventThread.queueEvent( we );
        return;
    }
    //...
}
public void queueEvent(WatchedEvent event) {
    queueEvent(event, null);
}

private void queueEvent(WatchedEvent event,
        Set<Watcher> materializedWatchers) {
    if (event.getType() == EventType.None
            && sessionState == event.getState()) {
        return;
    }
    //获取状态
    sessionState = event.getState();
    final Set<Watcher> watchers;
    if (materializedWatchers == null) {
        // materialize the watchers based on the event
        // 因为只有路径、事件类型,要通过这些信息获取获取所有的watcher
        watchers = watcher.materialize(event.getState(),
                event.getType(), event.getPath());
    } else {
        watchers = new HashSet<Watcher>();
        watchers.addAll(materializedWatchers);
    }
    //将Watchers和WatchedEvent关联起来
    WatcherSetEventPair pair = new WatcherSetEventPair(watchers, event);
    // queue the pair (watch set & event) for later processing
    // 加入到一个阻塞队列中待处理
    waitingEvents.add(pair);
}
public Set<Watcher> materialize(Watcher.Event.KeeperState state,
                                    Watcher.Event.EventType type,
                                    String clientPath)
    {
        Set<Watcher> result = new HashSet<Watcher>();
        //收集并移除事件
        switch (type) {
        case None:
            result.add(defaultWatcher);
            boolean clear = disableAutoWatchReset && state != Watcher.Event.KeeperState.SyncConnected;
            synchronized(dataWatches) {
                for(Set<Watcher> ws: dataWatches.values()) {
                    result.addAll(ws);
                }
                if (clear) {
                    dataWatches.clear();
                }
            }

            synchronized(existWatches) {
                for(Set<Watcher> ws: existWatches.values()) {
                    result.addAll(ws);
                }
                if (clear) {
                    existWatches.clear();
                }
            }

            synchronized(childWatches) {
                for(Set<Watcher> ws: childWatches.values()) {
                    result.addAll(ws);
                }
                if (clear) {
                    childWatches.clear();
                }
            }

            return result;
        case NodeDataChanged:
        case NodeCreated:
            synchronized (dataWatches) {
                addTo(dataWatches.remove(clientPath), result);
            }
            synchronized (existWatches) {
                addTo(existWatches.remove(clientPath), result);
            }
            break;
        case NodeChildrenChanged:
            synchronized (childWatches) {
                addTo(childWatches.remove(clientPath), result);
            }
            break;
        case NodeDeleted:
            synchronized (dataWatches) {
                addTo(dataWatches.remove(clientPath), result);
            }
            // XXX This shouldn't be needed, but just in case
            synchronized (existWatches) {
                Set<Watcher> list = existWatches.remove(clientPath);
                if (list != null) {
                    addTo(list, result);
                    LOG.warn("We are triggering an exists watch for delete! Shouldn't happen!");
                }
            }
            synchronized (childWatches) {
                addTo(childWatches.remove(clientPath), result);
            }
            break;
        default:
            String msg = "Unhandled watch event type " + type
                + " with state " + state + " on path " + clientPath;
            LOG.error(msg);
            throw new RuntimeException(msg);
        }

        return result;
    }
}

这里可以看到客户端根据事件类型和路径从ZKWatchManager中取出了对应的watcher并移除了它,说明客户端的watcher也是一次性的,

在ClientCnxn的EventThread中,不断地从waitingEvents中取事件:

public void run() {
    try {
        isRunning = true;
        while (true) {
            //取事件
            Object event = waitingEvents.take();
            if (event == eventOfDeath) {
                wasKilled = true;
            } else {
                //处理事件,跟进
                processEvent(event);
            }
            if (wasKilled)
            synchronized (waitingEvents) {
                if (waitingEvents.isEmpty()) {
                    isRunning = false;
                    break;
                }
            }
        }
    } catch (InterruptedException e) {
        LOG.error("Event thread exiting due to interruption", e);
    }

    LOG.info("EventThread shut down for session: 0x{}",
                Long.toHexString(getSessionId()));
}
private void processEvent(Object event) {
    try {
        // 如果是WatcherSetEventPair
        if (event instanceof WatcherSetEventPair) {
            // each watcher will process the event
            WatcherSetEventPair pair = (WatcherSetEventPair) event;
            for (Watcher watcher : pair.watchers) {
                try {
                    //使用watcher的process
                    watcher.process(pair.event);
                } catch (Throwable t) {
                    LOG.error("Error while calling watcher ", t);
                }
            }
        } // else if...
    } catch (Throwable t) {
        LOG.error("Caught unexpected throwable", t);
    }
}

而watcher.process在zkCli中的实现就是:

private class MyWatcher implements Watcher {
    public void process(WatchedEvent event) {
        if (getPrintWatches()) {
            ZooKeeperMain.printMessage("WATCHER::");
            ZooKeeperMain.printMessage(event.toString());
        }
    }
}

toString:

public String toString() {
    return "WatchedEvent state:" + keeperState
        + " type:" + eventType + " path:" + path;
}

完全符合客户端的监听输出,就这样一个监听过程就完成了。而对于java中zk api的使用,添加的watcher有我们提供的回调函数,就会自动调用了。

源码看完了,来点其他优秀博主的总结压压惊:

Zookeeper Watcher 机制详解 - 大数据up - 博客园

zookeeper 中 Watcher 通知机制的一点理解大数据韦一笑的专栏-CSDN博客

原创文章,作者:彭晨涛,如若转载,请注明出处:https://www.codetool.top/article/zookeeper%e4%b8%ad%e7%9a%84watcher%e6%9c%ba%e5%88%b6%e4%b8%8e%e6%ba%90%e7%a0%81%e5%88%86%e6%9e%90/

发表评论

电子邮件地址不会被公开。