zookeeper服务端
阅读原文时间:2023年07月08日阅读:2

  服务端启动时,就启动线程通过NIO监听网络端口。每个连接都会有一个上下文环境对象,当接收到请求后,会在上下文环境对象中进行处理。

服务端启动线程,监听网络端口,(NIOServerCnxn.Factory):

static public class Factory extends Thread {
static {
//设置全局的异常处理
Thread.setDefaultUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
public void uncaughtException(Thread t, Throwable e) {
LOG.error("Thread " + t + " died", e);
}
});
/**
* jvm早期的nullpoint bug
* http://bugs.sun.com/view_bug.do?bug_id=6427854
*/
try {
Selector.open().close();
} catch(IOException ie) {
LOG.error("Selector failed to open", ie);
}
}
//服务器通道
final ServerSocketChannel ss;
//选择器
final Selector selector = Selector.open();
ZooKeeperServer zks;

     final ByteBuffer directBuffer = ByteBuffer.allocateDirect(64 \* 1024);  
     //所有的上下文环境  
     final HashSet<NIOServerCnxn> cnxns = new HashSet<NIOServerCnxn>();  
     //ip对应的上下文环境  
     final HashMap<InetAddress, Set<NIOServerCnxn>> ipMap =  
         new HashMap<InetAddress, Set<NIOServerCnxn>>( );  
     int maxClientCnxns = 10;

     public Factory(InetSocketAddress addr, int maxcc) throws IOException {  
         setDaemon(true);  
         //单个client链接最大数  
         maxClientCnxns = maxcc;  
       //创建服务器通道  
         this.ss = ServerSocketChannel.open();  
         ss.socket().setReuseAddress(true);  
       //绑定端口  
         ss.socket().bind(addr);  
       //设置通道为非阻塞通道  
         ss.configureBlocking(false);  
       //把通道注册到选择器中  
         ss.register(selector, SelectionKey.OP\_ACCEPT);  
     }  
     public void run() {  
         while (!ss.socket().isClosed()) {  
             try {  
                 //选择一组键  
                 selector.select(1000);  
                 Set<SelectionKey> selected;  
                 synchronized (this) {  
                     selected = selector.selectedKeys();  
                 }  
                 ArrayList<SelectionKey> selectedList = new ArrayList<SelectionKey>(  
                         selected);  
                 Collections.shuffle(selectedList);  
                 for (SelectionKey k : selectedList) {  
                     //如果通道已经准备好接收套接字  
                     if ((k.readyOps() & SelectionKey.OP\_ACCEPT) != 0) {  
                         SocketChannel sc = ((ServerSocketChannel) k  
                                 .channel()).accept();  
                         InetAddress ia = sc.socket().getInetAddress();  
                         //判断最大连接数  
                         int cnxncount = getClientCnxnCount(ia);  
                         if (maxClientCnxns > 0 && cnxncount >= maxClientCnxns){  
                             sc.close();  
                         } else {  
                             // 配置为非阻塞  
                             sc.configureBlocking(false);  
                           //把通道注册到选择器中  
                             SelectionKey sk = sc.register(selector,  
                                     SelectionKey.OP\_READ);  
                             NIOServerCnxn cnxn = createConnection(sc, sk);  
                             //给该通道附带一个上下文环境  
                             sk.attach(cnxn);  
                             addCnxn(cnxn);  
                         }  
                     } else if ((k.readyOps() & (SelectionKey.OP\_READ | SelectionKey.OP\_WRITE)) != 0) {  
                         //通过上线文件来进行读写  
                         NIOServerCnxn c = (NIOServerCnxn) k.attachment();  
                         c.doIO(k);  
                     } else {  
                         if (LOG.isDebugEnabled()) {  
                             LOG.debug("Unexpected ops in select "  
                                       + k.readyOps());  
                         }  
                     }  
                 }  
                 selected.clear();  
              catch (Exception e) {  
                 LOG.warn("Ignoring exception", e);  
             }  
         }  
         clear();  
     }  
     //关闭  
     public void shutdown() {  
             try {  
                 ss.close();  
                 clear();  
                 this.interrupt();  
                 this.join();  
             } catch (Exception e) {  
                 LOG.warn("Ignoring unexpected exception during shutdown", e);  
             }  
             try {  
                 selector.close();  
             } catch (IOException e) {  
                 LOG.warn("Selector closing", e);  
             }  
             if (zks != null) {  
                 zks.shutdown();  
             }  
     }  
     synchronized public void clear() {  
         selector.wakeup();  
         HashSet<NIOServerCnxn> cnxns;  
         synchronized (this.cnxns) {  
             cnxns = (HashSet<NIOServerCnxn>)this.cnxns.clone();  
         }  
         // got to clear all the connections that we have in the selector  
         for (NIOServerCnxn cnxn: cnxns) {  
             try {  
                 // don't hold this.cnxns lock as deadlock may occur  
                 cnxn.close();  
             } catch (Exception e) {  
                 LOG.warn("Ignoring exception closing cnxn sessionid 0x"  
                         + Long.toHexString(cnxn.sessionId), e);  
             }  
         }  
     }  
 }

  上下文环境主要通过NIO读取请求数据,首先会读取4个字节的请求数据,该数据分为两种情况,一种是真正的数据包长度,一种是命令。如果是真正的数据包长度会按长度读取数据报,进行处理。如果是命令,会根据命令进行处理。

服务端通道上下文,通过NIO进行读写(NIOServerCnxn.doIO):

void doIO(SelectionKey k) throws InterruptedException {
try {
//如果通道可以读取数据
if (k.isReadable()) {
//读取数据到缓存中
int rc = sock.read(incomingBuffer);
//如果读满缓存
if (incomingBuffer.remaining() == 0) {
boolean isPayload;
//如果第一次读取,则先读取长度内容,相应分配缓存;否则读取指定长度的数据内容
if (incomingBuffer == lenBuffer) {
incomingBuffer.flip();
isPayload = readLength(k);
incomingBuffer.clear();
} else {
isPayload = true;
}
if (isPayload) {
//读取数据,初始化、读取请求数据封装成packet
readPayload();
}
}
}
//如果通道可以读写数据
if (k.isWritable()) {
//缓存中有数据需要写入
if (outgoingBuffers.size() > 0) {
//创建bytebuffer
ByteBuffer directBuffer = factory.directBuffer;
directBuffer.clear();
//从队列中读取数据知道缓存读满
for (ByteBuffer b : outgoingBuffers) {
if (directBuffer.remaining() < b.remaining()) { b = (ByteBuffer) b.slice().limit( directBuffer.remaining()); } int p = b.position(); directBuffer.put(b); b.position(p); if (directBuffer.remaining() == 0) { break; } } //将数据写入通道 directBuffer.flip(); int sent = sock.write(directBuffer); ByteBuffer bb; //从缓存中已经发送的删除数据 while (outgoingBuffers.size() > 0) {
bb = outgoingBuffers.peek();
int left = bb.remaining() - sent;
if (left > 0) {
bb.position(bb.position() + sent);
break;
}
sent -= bb.remaining();
outgoingBuffers.remove();
}
}

             synchronized(this.factory){  
                 if (outgoingBuffers.size() == 0) {  
                     sk.interestOps(sk.interestOps()  
                             & (~SelectionKey.OP\_WRITE));  
                 } else {  
                     sk.interestOps(sk.interestOps()  
                             | SelectionKey.OP\_WRITE);  
                 }  
             }  
         }  
     }catch (IOException e) {  
         close();  
     }  
 }  
 private void readPayload() throws IOException, InterruptedException {  
     if (incomingBuffer.remaining() != 0) { // have we read length bytes?  
         int rc = sock.read(incomingBuffer); // sock is non-blocking, so ok  
     }  
     if (incomingBuffer.remaining() == 0) {  
         //重置缓存  
         incomingBuffer.flip();  
         //如果没有进行初始化,首先要初始化;如果已经链接,则读取请求数据,封装成packet  
         if (!initialized) {  
             readConnectRequest();  
         } else {  
             readRequest();  
         }  
       //重置  
         lenBuffer.clear();  
         incomingBuffer = lenBuffer;  
     }  
 }

服务端通道上下文,读取长度,如果是命令,另起线程处理命令操作(NIOServerCnxn.readLength)

private boolean readLength(SelectionKey k) throws IOException {
//如果是请求数据,根据长度分配缓存;如果是命令,执行相应命令。
int len = lenBuffer.getInt();
if (!initialized && checkFourLetterWord(k, len)) {
return false;
}
if (len < 0 || len > BinaryInputArchive.maxBuffer) {
throw new IOException("Len error " + len);
}
if (zk == null) {
throw new IOException("ZooKeeperServer not running");
}
incomingBuffer = ByteBuffer.allocate(len);
return true;
}
private boolean checkFourLetterWord(final SelectionKey k, final int len)
throws IOException
{
//获取命令
String cmd = cmd2String.get(len);
/** cancel the selection key to remove the socket handling
* from selector. This is to prevent netcat problem wherein
* netcat immediately closes the sending side after sending the
* commands and still keeps the receiving channel open.
* The idea is to remove the selectionkey from the selector
* so that the selector does not notice the closed read on the
* socket channel and keep the socket alive to write the data to
* and makes sure to close the socket after its done writing the data
*/
if (k != null) {
try {
k.cancel();
} catch(Exception e) {
LOG.error("Error cancelling command selection key ", e);
}
}
//根据命令类型,执行相应内容
final PrintWriter pwriter = new PrintWriter(
new BufferedWriter(new SendBufferWriter()));
if (len == ruokCmd) {
RuokCommand ruok = new RuokCommand(pwriter);
ruok.start();
return true;
} else if (len == getTraceMaskCmd) {
TraceMaskCommand tmask = new TraceMaskCommand(pwriter);
tmask.start();
return true;
} else if (len == setTraceMaskCmd) {
int rc = sock.read(incomingBuffer);
if (rc < 0) {
throw new IOException("Read error");
}

                 incomingBuffer.flip();  
                 long traceMask = incomingBuffer.getLong();  
                 ZooTrace.setTextTraceLevel(traceMask);  
                 SetTraceMaskCommand setMask = new SetTraceMaskCommand(pwriter, traceMask);  
                 setMask.start();  
                 return true;  
             } else if (len == enviCmd) {  
                 EnvCommand env = new EnvCommand(pwriter);  
                 env.start();  
                 return true;  
             } else if (len == confCmd) {  
                 ConfCommand ccmd = new ConfCommand(pwriter);  
                 ccmd.start();  
                 return true;  
             } else if (len == srstCmd) {  
                 StatResetCommand strst = new StatResetCommand(pwriter);  
                 strst.start();  
                 return true;  
             } else if (len == crstCmd) {  
                 CnxnStatResetCommand crst = new CnxnStatResetCommand(pwriter);  
                 crst.start();  
                 return true;  
             } else if (len == dumpCmd) {  
                 DumpCommand dump = new DumpCommand(pwriter);  
                 dump.start();  
                 return true;  
             } else if (len == statCmd || len == srvrCmd) {  
                 StatCommand stat = new StatCommand(pwriter, len);  
                 stat.start();  
                 return true;  
             } else if (len == consCmd) {  
                 ConsCommand cons = new ConsCommand(pwriter);  
                 cons.start();  
                 return true;  
             } else if (len == wchpCmd || len == wchcCmd || len == wchsCmd) {  
                 WatchCommand wcmd = new WatchCommand(pwriter, len);  
                 wcmd.start();  
                 return true;  
             }  
             return false;  
         }

服务端通道上下文,如果是请求数据,处理请求数据(NIOServerCnxn.readRequest\NIOServerCnxn.readConnectRequest):

private void readRequest() throws IOException {
//反序列化请求数据
InputStream bais = new ByteBufferInputStream(incomingBuffer);
BinaryInputArchive bia = BinaryInputArchive.getArchive(bais);
RequestHeader h = new RequestHeader();
h.deserialize(bia, "header");
incomingBuffer = incomingBuffer.slice();
if (h.getType() == OpCode.auth) {
//如果是认证请求
AuthPacket authPacket = new AuthPacket();
ZooKeeperServer.byteBuffer2Record(incomingBuffer, authPacket);
String scheme = authPacket.getScheme();
//进行认证
AuthenticationProvider ap = ProviderRegistry.getProvider(scheme);
if (ap == null
|| (ap.handleAuthentication(this, authPacket.getAuth())
!= KeeperException.Code.OK)) {
// 认证失败,返回失败内容
ReplyHeader rh = new ReplyHeader(h.getXid(), 0,
KeeperException.Code.AUTHFAILED.intValue());
sendResponse(rh, null, null);
//关闭链接
sendCloseSession();
disableRecv();
} else {
//认证成功,返回成功内容
ReplyHeader rh = new ReplyHeader(h.getXid(), 0,
KeeperException.Code.OK.intValue());
sendResponse(rh, null, null);
}
return;
} else {
//如果是请求,提交到zk处理
Request si = new Request(this, sessionId, h.getXid(), h.getType(), incomingBuffer, authInfo);
si.setOwner(ServerCnxn.me);
zk.submitRequest(si);
}
}

 private void readConnectRequest() throws IOException, InterruptedException {  
     //反序列化链接请求对象  
     BinaryInputArchive bia = BinaryInputArchive  
             .getArchive(new ByteBufferInputStream(incomingBuffer));  
     ConnectRequest connReq = new ConnectRequest();  
     connReq.deserialize(bia, "connect");  
     if (connReq.getLastZxidSeen() > zk.getZKDatabase().getDataTreeLastProcessedZxid()) {  
         throw new CloseRequestException(msg);  
     }  
     sessionTimeout = connReq.getTimeOut();  
     byte passwd\[\] = connReq.getPasswd();  
     //初始化session  
     disableRecv();  
     if (connReq.getSessionId() != 0) {  
         long clientSessionId = connReq.getSessionId();  
         factory.closeSessionWithoutWakeup(clientSessionId);  
         setSessionId(clientSessionId);  
         zk.reopenSession(this, sessionId, passwd, sessionTimeout);  
     } else {  
         zk.createSession(this, passwd, sessionTimeout);  
     }  
     initialized = true;  
 }

服务端通道上下文,写返回数据(NIOServerCnxn.sendResponse)

synchronized public void sendResponse(ReplyHeader h, Record r, String tag) {
try {
//序列化返回结果,
ByteArrayOutputStream baos = new ByteArrayOutputStream();
BinaryOutputArchive bos = BinaryOutputArchive.getArchive(baos);
try {
baos.write(fourBytes);
bos.writeRecord(h, "header");
if (r != null) {
bos.writeRecord(r, tag);
}
baos.close();
} catch (IOException e) {
LOG.error("Error serializing response");
}
//写入数据长度
byte b[] = baos.toByteArray();
ByteBuffer bb = ByteBuffer.wrap(b);
bb.putInt(b.length - 4).rewind();
//
sendBuffer(bb);
} catch(Exception e) {
LOG.warn("Unexpected exception. Destruction averted.", e);
}
}
void sendBuffer(ByteBuffer bb) {
try {
if (bb != closeConn) {
//直接发送数据
if ((sk.interestOps() & SelectionKey.OP_WRITE) == 0) {
try {
sock.write(bb);
} catch (IOException e) {
// we are just doing best effort right now
}
}
if (bb.remaining() == 0) {
packetSent();
return;
}
}
//写入缓存中。
synchronized(this.factory){
sk.selector().wakeup();
outgoingBuffers.add(bb);
if (sk.isValid()) {
sk.interestOps(sk.interestOps() | SelectionKey.OP_WRITE);
}
}

     } catch(Exception e) {  
         LOG.error("Unexpected Exception: ", e);  
     }  
 }