watcher机制概述
Zookeeper中非常重要的一个机制就是watcher,在zkCli中可以在get、ls、ls2、stat 等操作中添加watch,从而监听节点变化,起到在分布式系统中收到消息通知作用。
Watcher机制包括三部分:注册、存储、通知
注册: 注册Watcher
存储: Watcher对象存在客户端的WatcherManager中
通知: 服务端触发Watcher事件,通知客户端,客户端从WatcherManager中取出对应的Watcher对象执行回调

源码分析
客户端注册
我们通过zkCli的执行命令流程来看一下watch起到了什么作用:

找到对应的类下的main方法:
public static void main(String args[]) throws CliException, IOException, InterruptedException
{
ZooKeeperMain main = new ZooKeeperMain(args);
main.run();//跟进去
}
void run() throws CliException, IOException, InterruptedException {
if (cl.getCommand() == null) {
System.out.println("Welcome to ZooKeeper!");
boolean jlinemissing = false;
try {
//这一部分使用反射生成ConsoleReader实例读取命令行指令
Class<?> consoleC = Class.forName("jline.console.ConsoleReader");
Class<?> completorC =
Class.forName("org.apache.zookeeper.JLineZNodeCompleter");
System.out.println("JLine support is enabled");
Object console =
consoleC.getConstructor().newInstance();
Object completor =
completorC.getConstructor(ZooKeeper.class).newInstance(zk);
Method addCompletor = consoleC.getMethod("addCompleter",
Class.forName("jline.console.completer.Completer"));
addCompletor.invoke(console, completor);
//执行指令
String line;
Method readLine = consoleC.getMethod("readLine", String.class);
while ((line = (String)readLine.invoke(console, getPrompt())) != null) {
//执行!此处跟进
executeLine(line);
}
}//catch ...
}
public void executeLine(String line) throws CliException, InterruptedException, IOException {
if (!line.equals("")) {
//将命令字符串转换成MyCommandOptions对象
cl.parseCommand(line);
//添加命令到历史记录
addToHistory(commandCount,line);
//执行命令,此处跟进
processCmd(cl);
commandCount++;
}
}
protected boolean processCmd(MyCommandOptions co) throws CliException, IOException, InterruptedException {
boolean watch = false;
try {
//执行命令,此处跟进
watch = processZKCmd(co);
exitCode = 0;
} catch (CliException ex) {
exitCode = ex.getExitCode();
System.err.println(ex.getMessage());
}
return watch;
}
protected boolean processZKCmd(MyCommandOptions co) throws CliException, IOException, InterruptedException {
String[] args = co.getArgArray();
String cmd = co.getCommand();
//命令没输入
if (args.length < 1) {
usage();
throw new MalformedCommandException("No command entered");
}
//不存在该指令
if (!commandMap.containsKey(cmd)) {
usage();
throw new CommandNotFoundException("Command not found " + cmd);
}
boolean watch = false;
LOG.debug("Processing " + cmd);
//quit等无实际操作的指令
if (cmd.equals("quit")) {
zk.close();
System.exit(exitCode);
} else if (cmd.equals("redo") && args.length >= 2) {
Integer i = Integer.decode(args[1]);
if (commandCount <= i || i < 0) { // don't allow redoing this redo
throw new MalformedCommandException("Command index out of range");
}
cl.parseCommand(history.get(i));
if (cl.getCommand().equals("redo")) {
throw new MalformedCommandException("No redoing redos");
}
history.put(commandCount, history.get(i));
processCmd(cl);
} else if (cmd.equals("history")) {
for (int i = commandCount - 10; i <= commandCount; ++i) {
if (i < 0) continue;
System.out.println(i + " - " + history.get(i));
}
} else if (cmd.equals("printwatches")) {
if (args.length == 1) {
System.out.println("printwatches is " + (printWatches ? "on" : "off"));
} else {
printWatches = args[1].equals("on");
}
} else if (cmd.equals("connect")) {
if (args.length >= 2) {
connectToZK(args[1]);
} else {
connectToZK(host);
}
}
// Below commands all need a live connection
if (zk == null || !zk.getState().isAlive()) {
System.out.println("Not connected");
return false;
}
// 如果是正常的操作命令,使用cliCmd的子类执行
CliCommand cliCmd = commandMapCli.get(cmd);
if(cliCmd != null) {
cliCmd.setZk(zk);
watch = cliCmd.parse(args).exec();//此处跟进
} else if (!commandMap.containsKey(cmd)) {
usage();
}
return watch;
}
这里我们选择cliCmd的子类GetCommand跟进exec方法:
public boolean exec() throws CliException {
//根据是否具有'-w'参数判断是否监听
boolean watch = cl.hasOption("w");
String path = args[1];
Stat stat = new Stat();
byte data[];
try {
//使用getData命令获取数据,此处跟进
data = zk.getData(path, watch, stat);
} catch (IllegalArgumentException ex) {
throw new MalformedPathException(ex.getMessage());
} catch (KeeperException|InterruptedException ex) {
throw new CliException(ex);
}
data = (data == null) ? "null".getBytes() : data;
out.println(new String(data));
if (cl.hasOption("s")) {
new StatPrinter(out).print(stat);
}
return watch;
}
public byte[] getData(String path, boolean watch, Stat stat)
throws KeeperException, InterruptedException {
//可以看到这里有一个watchManager.defaultWatcher,它是一个默认的watcher
return getData(path, watch ? watchManager.defaultWatcher : null, stat);
}
public byte[] getData(final String path, Watcher watcher, Stat stat)
throws KeeperException, InterruptedException
{
final String clientPath = path;
PathUtils.validatePath(clientPath);
// the watch contains the un-chroot path
WatchRegistration wcb = null;
if (watcher != null) {
//如果watcher不为空注册watcher,此处跟进,返回一个WatchRegistration
wcb = new DataWatchRegistration(watcher, clientPath);
}
final String serverPath = prependChroot(clientPath);
RequestHeader h = new RequestHeader();
h.setType(ZooDefs.OpCode.getData);
GetDataRequest request = new GetDataRequest();
request.setPath(serverPath);
request.setWatch(watcher != null);
GetDataResponse response = new GetDataResponse();
//此处用到了返回的WatchRegistration,跟进
ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
if (r.getErr() != 0) {
throw KeeperException.create(KeeperException.Code.get(r.getErr()),
clientPath);
}
if (stat != null) {
DataTree.copyStat(response.getStat(), stat);
}
return response.getData();
}
public DataWatchRegistration(Watcher watcher, String clientPath) {
super(watcher, clientPath);//调用WatchRegistration的构造方法
}
public WatchRegistration(Watcher watcher, String clientPath)
{
//包含一个watcher和一个监听地址
this.watcher = watcher;
this.clientPath = clientPath;
}
public ReplyHeader submitRequest(RequestHeader h, Record request,
Record response, WatchRegistration watchRegistration)
throws InterruptedException {
return submitRequest(h, request, response, watchRegistration, null);
}
public ReplyHeader submitRequest(RequestHeader h, Record request,
Record response, WatchRegistration watchRegistration,
WatchDeregistration watchDeregistration)
throws InterruptedException {
ReplyHeader r = new ReplyHeader();
//使用watchRegistration构建了一个queuePacket
Packet packet = queuePacket(h, r, request, response, null, null, null,
null, watchRegistration, watchDeregistration);
synchronized (packet) {
if (requestTimeout > 0) {
// Wait for request completion with timeout
waitForPacketFinish(r, packet);
} else {
// Wait for request completion infinitely
while (!packet.finished) {
packet.wait();
}
}
}
if (r.getErr() == Code.REQUESTTIMEOUT.intValue()) {
sendThread.cleanAndNotifyState();
}
return r;
}
public Packet queuePacket(RequestHeader h, ReplyHeader r, Record request,
Record response, AsyncCallback cb, String clientPath,
String serverPath, Object ctx, WatchRegistration watchRegistration,
WatchDeregistration watchDeregistration) {
Packet packet = null;
// Note that we do not generate the Xid for the packet yet. It is
// generated later at send-time, by an implementation of ClientCnxnSocket::doIO(),
// where the packet is actually sent.
packet = new Packet(h, r, request, response, watchRegistration);
packet.cb = cb;
packet.ctx = ctx;
packet.clientPath = clientPath;
packet.serverPath = serverPath;
packet.watchDeregistration = watchDeregistration;
// The synchronized block here is for two purpose:
// 1. synchronize with the final cleanup() in SendThread.run() to avoid race
// 2. synchronized against each packet. So if a closeSession packet is added,
// later packet will be notified.
synchronized (state) {
if (!state.isAlive() || closing) {
conLossPacket(packet);
} else {
// If the client is asking to close the session then
// mark as closing
if (h.getType() == OpCode.closeSession) {
closing = true;
}
outgoingQueue.add(packet);
}
}
//将该packet添加到了sendThread的ClientCnxnSocket中
sendThread.getClientCnxnSocket().packetAdded();
return packet;
}
在将packet添加到了sendThread
的ClientCnxnSocket
中之后,注意SendThread是不停地在doIO
,而这里packetAdded
之后还会调用selector的wakeup()方法,通知服务端这边有一个packet发过去了,并且在finishPacket
之后,向ZooKeeper中的一个静态内部类ZKWatchManager
内的hashmap添加了这个watcher
private final Map<String, Set<Watcher>> dataWatches =
new HashMap<String, Set<Watcher>>();
private final Map<String, Set<Watcher>> existWatches =
new HashMap<String, Set<Watcher>>();
private final Map<String, Set<Watcher>> childWatches =
new HashMap<String, Set<Watcher>>();
服务端存储
那么服务端干了什么呢?
研究服务端源码可以发现,在服务端是leading状态时,通过调用LeaderZooKeeperServer的startup方法,里面再调用ZooKeeperServer中的startup方法,在调用setupRequestProcessors()进行请求处理:
protected void setupRequestProcessors() {
RequestProcessor finalProcessor = new FinalRequestProcessor(this);
RequestProcessor syncProcessor = new SyncRequestProcessor(this,
finalProcessor);
((SyncRequestProcessor)syncProcessor).start();
firstProcessor = new PrepRequestProcessor(this, syncProcessor);
((PrepRequestProcessor)firstProcessor).start();
}
这里面有一个PrepRequestProcessor和一个封装了FinalRequestProcessor的SyncRequestProcessor,通过阅读源码发现FinalRequestProcessor的processRequest中
public void processRequest(Request request) {
//prehandling...
switch (request.type) {
//case...
case OpCode.getData: {
lastOp = "GETD";
GetDataRequest getDataRequest = new GetDataRequest();
ByteBufferInputStream.byteBuffer2Record(request.request,
getDataRequest);
DataNode n = zks.getZKDatabase().getNode(getDataRequest.getPath());
if (n == null) {
throw new KeeperException.NoNodeException();
}
PrepRequestProcessor.checkACL(zks, zks.getZKDatabase().aclForNode(n),
ZooDefs.Perms.READ,
request.authInfo);
Stat stat = new Stat();
//通过getWatch获取是否监视,如果监视传递一个ServerCnxn对象,继承自Watcher
byte b[] = zks.getZKDatabase().getData(getDataRequest.getPath(), stat,
getDataRequest.getWatch() ? cnxn : null);
rsp = new GetDataResponse(b, stat);
break;
}
//case...
public byte[] getData(String path, Stat stat, Watcher watcher)
throws KeeperException.NoNodeException {
return dataTree.getData(path, stat, watcher);
}
public byte[] getData(String path, Stat stat, Watcher watcher)
throws KeeperException.NoNodeException {
DataNode n = nodes.get(path);
if (n == null) {
throw new KeeperException.NoNodeException();
}
synchronized (n) {
n.copyStat(stat);
if (watcher != null) {
//添加监视,跟进
dataWatches.addWatch(path, watcher);
}
return n.data;
}
}
synchronized void addWatch(String path, Watcher watcher) {
//使用watchTable记录某路径所有的监视器
HashSet<Watcher> list = watchTable.get(path);
if (list == null) {
// don't waste memory if there are few watches on a node
// rehash when the 4th entry is added, doubling size thereafter
// seems like a good compromise
list = new HashSet<Watcher>(4);
watchTable.put(path, list);
}
list.add(watcher);
HashSet<String> paths = watch2Paths.get(watcher);
if (paths == null) {
// cnxns typically have many watches, so use default cap here
paths = new HashSet<String>();
watch2Paths.put(watcher, paths);
}
paths.add(path);
}
这样就把监视器和path关联起来了,而服务端可以通过刚刚传入的ServerCnxn联系客户端。
服务端通知
而如果数据发生变动无非就是处理了set命令,找到对应的函数:
public Stat setData(String path, byte[] data, int version, long zxid, long time) throws NoNodeException {
Stat s = new Stat();
DataNode n = (DataNode)this.nodes.get(path);
if (n == null) {
throw new NoNodeException();
} else {
byte[] lastdata = null;
byte[] lastdata;
// 赋值node
synchronized(n) {
lastdata = n.data;
n.data = data;
n.stat.setMtime(time);
n.stat.setMzxid(zxid);
n.stat.setVersion(version);
n.copyStat(s);
}
String lastPrefix;
if ((lastPrefix = this.getMaxPrefixWithQuota(path)) != null) {
this.updateBytes(lastPrefix, (long)((data == null ? 0 : data.length) - (lastdata == null ? 0 : lastdata.length)));
}
// 触发watcher
this.dataWatches.triggerWatch(path, EventType.NodeDataChanged);
return s;
}
}
public Set<Watcher> triggerWatch(String path, EventType type) {
return this.triggerWatch(path, type, (Set)null);
}
public Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {
WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path);
HashSet watchers;
synchronized(this) {
//移除watcher!
watchers = (HashSet)this.watchTable.remove(path);
if (watchers == null || watchers.isEmpty()) {
if (LOG.isTraceEnabled()) {
ZooTrace.logTraceMessage(LOG, 64L, "No watchers for " + path);
}
return null;
}
Iterator i= watchers.iterator();
while(i.hasNext()) {
//移除watcher!
Watcher w = (Watcher)i.next();
HashSet<String> paths = (HashSet)this.watch2Paths.get(w);
if (paths != null) {
paths.remove(path);
}
}
}
Iterator i = watchers.iterator();
while(true) {
Watcher w;
do {
if (!i.hasNext()) {
return watchers;
}
w = (Watcher)i.next();
} while(supress != null && supress.contains(w));
// watcher调用,这里的e对象里只有通知状态(KeeperState)、事件类型(EventType)以及节点路径(Path)
// 没有修改过后的新值也没有老的值
w.process(e);
}
}
public synchronized void process(WatchedEvent event) {
// 请求头标记-1,表明是通知
ReplyHeader h = new ReplyHeader(-1, -1L, 0);
if (LOG.isTraceEnabled()) {
ZooTrace.logTraceMessage(LOG, 64L, "Deliver event " + event + " to 0x" + Long.toHexString(this.sessionId) + " through " + this);
}
WatcherEvent e = event.getWrapper();
// 发送通知给客户端
this.sendResponse(h, e, "notification");
}
在此处就把监听的事件类型和状态返回给了客户端,那么再返回客户端看看客户端的处理
客户端处理事件(依客户端而定)
在客户端的SendThread中调用了doTransport->doIO->readResponse来接收服务器的响应
void readResponse(ByteBuffer incomingBuffer) throws IOException {
ByteBufferInputStream bbis = new ByteBufferInputStream(
incomingBuffer);
BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
ReplyHeader replyHdr = new ReplyHeader();
replyHdr.deserialize(bbia, "header");
//if replyHdr.getXid() == ...
// 如果是通知
if (replyHdr.getXid() == -1) {
// -1 means notification
if (LOG.isDebugEnabled()) {
LOG.debug("Got notification sessionid:0x"
+ Long.toHexString(sessionId));
}
//反序列化
WatcherEvent event = new WatcherEvent();
event.deserialize(bbia, "response");
// 从服务器地址转换为客户端地址
if (chrootPath != null) {
String serverPath = event.getPath();
if(serverPath.compareTo(chrootPath)==0)
event.setPath("/");
else if (serverPath.length() > chrootPath.length())
event.setPath(serverPath.substring(chrootPath.length()));
else {
LOG.warn("Got server path " + event.getPath()
+ " which is too short for chroot path "
+ chrootPath);
}
}
WatchedEvent we = new WatchedEvent(event);
if (LOG.isDebugEnabled()) {
LOG.debug("Got " + we + " for sessionid 0x"
+ Long.toHexString(sessionId));
}
//交给eventThread线程处理
eventThread.queueEvent( we );
return;
}
//...
}
public void queueEvent(WatchedEvent event) {
queueEvent(event, null);
}
private void queueEvent(WatchedEvent event,
Set<Watcher> materializedWatchers) {
if (event.getType() == EventType.None
&& sessionState == event.getState()) {
return;
}
//获取状态
sessionState = event.getState();
final Set<Watcher> watchers;
if (materializedWatchers == null) {
// materialize the watchers based on the event
// 因为只有路径、事件类型,要通过这些信息获取获取所有的watcher
watchers = watcher.materialize(event.getState(),
event.getType(), event.getPath());
} else {
watchers = new HashSet<Watcher>();
watchers.addAll(materializedWatchers);
}
//将Watchers和WatchedEvent关联起来
WatcherSetEventPair pair = new WatcherSetEventPair(watchers, event);
// queue the pair (watch set & event) for later processing
// 加入到一个阻塞队列中待处理
waitingEvents.add(pair);
}
public Set<Watcher> materialize(Watcher.Event.KeeperState state,
Watcher.Event.EventType type,
String clientPath)
{
Set<Watcher> result = new HashSet<Watcher>();
//收集并移除事件
switch (type) {
case None:
result.add(defaultWatcher);
boolean clear = disableAutoWatchReset && state != Watcher.Event.KeeperState.SyncConnected;
synchronized(dataWatches) {
for(Set<Watcher> ws: dataWatches.values()) {
result.addAll(ws);
}
if (clear) {
dataWatches.clear();
}
}
synchronized(existWatches) {
for(Set<Watcher> ws: existWatches.values()) {
result.addAll(ws);
}
if (clear) {
existWatches.clear();
}
}
synchronized(childWatches) {
for(Set<Watcher> ws: childWatches.values()) {
result.addAll(ws);
}
if (clear) {
childWatches.clear();
}
}
return result;
case NodeDataChanged:
case NodeCreated:
synchronized (dataWatches) {
addTo(dataWatches.remove(clientPath), result);
}
synchronized (existWatches) {
addTo(existWatches.remove(clientPath), result);
}
break;
case NodeChildrenChanged:
synchronized (childWatches) {
addTo(childWatches.remove(clientPath), result);
}
break;
case NodeDeleted:
synchronized (dataWatches) {
addTo(dataWatches.remove(clientPath), result);
}
// XXX This shouldn't be needed, but just in case
synchronized (existWatches) {
Set<Watcher> list = existWatches.remove(clientPath);
if (list != null) {
addTo(list, result);
LOG.warn("We are triggering an exists watch for delete! Shouldn't happen!");
}
}
synchronized (childWatches) {
addTo(childWatches.remove(clientPath), result);
}
break;
default:
String msg = "Unhandled watch event type " + type
+ " with state " + state + " on path " + clientPath;
LOG.error(msg);
throw new RuntimeException(msg);
}
return result;
}
}
这里可以看到客户端根据事件类型和路径从ZKWatchManager
中取出了对应的watcher并移除了它,说明客户端的watcher也是一次性的,
在ClientCnxn的EventThread中,不断地从waitingEvents中取事件:
public void run() {
try {
isRunning = true;
while (true) {
//取事件
Object event = waitingEvents.take();
if (event == eventOfDeath) {
wasKilled = true;
} else {
//处理事件,跟进
processEvent(event);
}
if (wasKilled)
synchronized (waitingEvents) {
if (waitingEvents.isEmpty()) {
isRunning = false;
break;
}
}
}
} catch (InterruptedException e) {
LOG.error("Event thread exiting due to interruption", e);
}
LOG.info("EventThread shut down for session: 0x{}",
Long.toHexString(getSessionId()));
}
private void processEvent(Object event) {
try {
// 如果是WatcherSetEventPair
if (event instanceof WatcherSetEventPair) {
// each watcher will process the event
WatcherSetEventPair pair = (WatcherSetEventPair) event;
for (Watcher watcher : pair.watchers) {
try {
//使用watcher的process
watcher.process(pair.event);
} catch (Throwable t) {
LOG.error("Error while calling watcher ", t);
}
}
} // else if...
} catch (Throwable t) {
LOG.error("Caught unexpected throwable", t);
}
}
而watcher.process在zkCli中的实现就是:
private class MyWatcher implements Watcher {
public void process(WatchedEvent event) {
if (getPrintWatches()) {
ZooKeeperMain.printMessage("WATCHER::");
ZooKeeperMain.printMessage(event.toString());
}
}
}
toString:
public String toString() {
return "WatchedEvent state:" + keeperState
+ " type:" + eventType + " path:" + path;
}
完全符合客户端的监听输出,就这样一个监听过程就完成了。而对于java中zk api的使用,添加的watcher有我们提供的回调函数,就会自动调用了。
源码看完了,来点其他优秀博主的总结压压惊:
Zookeeper Watcher 机制详解 - 大数据up - 博客园
zookeeper 中 Watcher 通知机制的一点理解大数据韦一笑的专栏-CSDN博客
原创文章,作者:彭晨涛,如若转载,请注明出处:https://www.codetool.top/article/zookeeper%e4%b8%ad%e7%9a%84watcher%e6%9c%ba%e5%88%b6%e4%b8%8e%e6%ba%90%e7%a0%81%e5%88%86%e6%9e%90/