服务端启动时,就启动线程通过NIO监听网络端口。每个连接都会有一个上下文环境对象,当接收到请求后,会在上下文环境对象中进行处理。
服务端启动线程,监听网络端口,(NIOServerCnxn.Factory):
static public class Factory extends Thread {
static {
//设置全局的异常处理
Thread.setDefaultUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
public void uncaughtException(Thread t, Throwable e) {
LOG.error("Thread " + t + " died", e);
}
});
/**
* jvm早期的nullpoint bug
* http://bugs.sun.com/view_bug.do?bug_id=6427854
*/
try {
Selector.open().close();
} catch(IOException ie) {
LOG.error("Selector failed to open", ie);
}
}
//服务器通道
final ServerSocketChannel ss;
//选择器
final Selector selector = Selector.open();
ZooKeeperServer zks;
final ByteBuffer directBuffer = ByteBuffer.allocateDirect(64 \* 1024);
//所有的上下文环境
final HashSet<NIOServerCnxn> cnxns = new HashSet<NIOServerCnxn>();
//ip对应的上下文环境
final HashMap<InetAddress, Set<NIOServerCnxn>> ipMap =
new HashMap<InetAddress, Set<NIOServerCnxn>>( );
int maxClientCnxns = 10;
public Factory(InetSocketAddress addr, int maxcc) throws IOException {
setDaemon(true);
//单个client链接最大数
maxClientCnxns = maxcc;
//创建服务器通道
this.ss = ServerSocketChannel.open();
ss.socket().setReuseAddress(true);
//绑定端口
ss.socket().bind(addr);
//设置通道为非阻塞通道
ss.configureBlocking(false);
//把通道注册到选择器中
ss.register(selector, SelectionKey.OP\_ACCEPT);
}
public void run() {
while (!ss.socket().isClosed()) {
try {
//选择一组键
selector.select(1000);
Set<SelectionKey> selected;
synchronized (this) {
selected = selector.selectedKeys();
}
ArrayList<SelectionKey> selectedList = new ArrayList<SelectionKey>(
selected);
Collections.shuffle(selectedList);
for (SelectionKey k : selectedList) {
//如果通道已经准备好接收套接字
if ((k.readyOps() & SelectionKey.OP\_ACCEPT) != 0) {
SocketChannel sc = ((ServerSocketChannel) k
.channel()).accept();
InetAddress ia = sc.socket().getInetAddress();
//判断最大连接数
int cnxncount = getClientCnxnCount(ia);
if (maxClientCnxns > 0 && cnxncount >= maxClientCnxns){
sc.close();
} else {
// 配置为非阻塞
sc.configureBlocking(false);
//把通道注册到选择器中
SelectionKey sk = sc.register(selector,
SelectionKey.OP\_READ);
NIOServerCnxn cnxn = createConnection(sc, sk);
//给该通道附带一个上下文环境
sk.attach(cnxn);
addCnxn(cnxn);
}
} else if ((k.readyOps() & (SelectionKey.OP\_READ | SelectionKey.OP\_WRITE)) != 0) {
//通过上线文件来进行读写
NIOServerCnxn c = (NIOServerCnxn) k.attachment();
c.doIO(k);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Unexpected ops in select "
+ k.readyOps());
}
}
}
selected.clear();
catch (Exception e) {
LOG.warn("Ignoring exception", e);
}
}
clear();
}
//关闭
public void shutdown() {
try {
ss.close();
clear();
this.interrupt();
this.join();
} catch (Exception e) {
LOG.warn("Ignoring unexpected exception during shutdown", e);
}
try {
selector.close();
} catch (IOException e) {
LOG.warn("Selector closing", e);
}
if (zks != null) {
zks.shutdown();
}
}
synchronized public void clear() {
selector.wakeup();
HashSet<NIOServerCnxn> cnxns;
synchronized (this.cnxns) {
cnxns = (HashSet<NIOServerCnxn>)this.cnxns.clone();
}
// got to clear all the connections that we have in the selector
for (NIOServerCnxn cnxn: cnxns) {
try {
// don't hold this.cnxns lock as deadlock may occur
cnxn.close();
} catch (Exception e) {
LOG.warn("Ignoring exception closing cnxn sessionid 0x"
+ Long.toHexString(cnxn.sessionId), e);
}
}
}
}
上下文环境主要通过NIO读取请求数据,首先会读取4个字节的请求数据,该数据分为两种情况,一种是真正的数据包长度,一种是命令。如果是真正的数据包长度会按长度读取数据报,进行处理。如果是命令,会根据命令进行处理。
服务端通道上下文,通过NIO进行读写(NIOServerCnxn.doIO):
void doIO(SelectionKey k) throws InterruptedException {
try {
//如果通道可以读取数据
if (k.isReadable()) {
//读取数据到缓存中
int rc = sock.read(incomingBuffer);
//如果读满缓存
if (incomingBuffer.remaining() == 0) {
boolean isPayload;
//如果第一次读取,则先读取长度内容,相应分配缓存;否则读取指定长度的数据内容
if (incomingBuffer == lenBuffer) {
incomingBuffer.flip();
isPayload = readLength(k);
incomingBuffer.clear();
} else {
isPayload = true;
}
if (isPayload) {
//读取数据,初始化、读取请求数据封装成packet
readPayload();
}
}
}
//如果通道可以读写数据
if (k.isWritable()) {
//缓存中有数据需要写入
if (outgoingBuffers.size() > 0) {
//创建bytebuffer
ByteBuffer directBuffer = factory.directBuffer;
directBuffer.clear();
//从队列中读取数据知道缓存读满
for (ByteBuffer b : outgoingBuffers) {
if (directBuffer.remaining() < b.remaining()) {
b = (ByteBuffer) b.slice().limit(
directBuffer.remaining());
}
int p = b.position();
directBuffer.put(b);
b.position(p);
if (directBuffer.remaining() == 0) {
break;
}
}
//将数据写入通道
directBuffer.flip();
int sent = sock.write(directBuffer);
ByteBuffer bb;
//从缓存中已经发送的删除数据
while (outgoingBuffers.size() > 0) {
bb = outgoingBuffers.peek();
int left = bb.remaining() - sent;
if (left > 0) {
bb.position(bb.position() + sent);
break;
}
sent -= bb.remaining();
outgoingBuffers.remove();
}
}
synchronized(this.factory){
if (outgoingBuffers.size() == 0) {
sk.interestOps(sk.interestOps()
& (~SelectionKey.OP\_WRITE));
} else {
sk.interestOps(sk.interestOps()
| SelectionKey.OP\_WRITE);
}
}
}
}catch (IOException e) {
close();
}
}
private void readPayload() throws IOException, InterruptedException {
if (incomingBuffer.remaining() != 0) { // have we read length bytes?
int rc = sock.read(incomingBuffer); // sock is non-blocking, so ok
}
if (incomingBuffer.remaining() == 0) {
//重置缓存
incomingBuffer.flip();
//如果没有进行初始化,首先要初始化;如果已经链接,则读取请求数据,封装成packet
if (!initialized) {
readConnectRequest();
} else {
readRequest();
}
//重置
lenBuffer.clear();
incomingBuffer = lenBuffer;
}
}
服务端通道上下文,读取长度,如果是命令,另起线程处理命令操作(NIOServerCnxn.readLength)
private boolean readLength(SelectionKey k) throws IOException {
//如果是请求数据,根据长度分配缓存;如果是命令,执行相应命令。
int len = lenBuffer.getInt();
if (!initialized && checkFourLetterWord(k, len)) {
return false;
}
if (len < 0 || len > BinaryInputArchive.maxBuffer) {
throw new IOException("Len error " + len);
}
if (zk == null) {
throw new IOException("ZooKeeperServer not running");
}
incomingBuffer = ByteBuffer.allocate(len);
return true;
}
private boolean checkFourLetterWord(final SelectionKey k, final int len)
throws IOException
{
//获取命令
String cmd = cmd2String.get(len);
/** cancel the selection key to remove the socket handling
* from selector. This is to prevent netcat problem wherein
* netcat immediately closes the sending side after sending the
* commands and still keeps the receiving channel open.
* The idea is to remove the selectionkey from the selector
* so that the selector does not notice the closed read on the
* socket channel and keep the socket alive to write the data to
* and makes sure to close the socket after its done writing the data
*/
if (k != null) {
try {
k.cancel();
} catch(Exception e) {
LOG.error("Error cancelling command selection key ", e);
}
}
//根据命令类型,执行相应内容
final PrintWriter pwriter = new PrintWriter(
new BufferedWriter(new SendBufferWriter()));
if (len == ruokCmd) {
RuokCommand ruok = new RuokCommand(pwriter);
ruok.start();
return true;
} else if (len == getTraceMaskCmd) {
TraceMaskCommand tmask = new TraceMaskCommand(pwriter);
tmask.start();
return true;
} else if (len == setTraceMaskCmd) {
int rc = sock.read(incomingBuffer);
if (rc < 0) {
throw new IOException("Read error");
}
incomingBuffer.flip();
long traceMask = incomingBuffer.getLong();
ZooTrace.setTextTraceLevel(traceMask);
SetTraceMaskCommand setMask = new SetTraceMaskCommand(pwriter, traceMask);
setMask.start();
return true;
} else if (len == enviCmd) {
EnvCommand env = new EnvCommand(pwriter);
env.start();
return true;
} else if (len == confCmd) {
ConfCommand ccmd = new ConfCommand(pwriter);
ccmd.start();
return true;
} else if (len == srstCmd) {
StatResetCommand strst = new StatResetCommand(pwriter);
strst.start();
return true;
} else if (len == crstCmd) {
CnxnStatResetCommand crst = new CnxnStatResetCommand(pwriter);
crst.start();
return true;
} else if (len == dumpCmd) {
DumpCommand dump = new DumpCommand(pwriter);
dump.start();
return true;
} else if (len == statCmd || len == srvrCmd) {
StatCommand stat = new StatCommand(pwriter, len);
stat.start();
return true;
} else if (len == consCmd) {
ConsCommand cons = new ConsCommand(pwriter);
cons.start();
return true;
} else if (len == wchpCmd || len == wchcCmd || len == wchsCmd) {
WatchCommand wcmd = new WatchCommand(pwriter, len);
wcmd.start();
return true;
}
return false;
}
服务端通道上下文,如果是请求数据,处理请求数据(NIOServerCnxn.readRequest\NIOServerCnxn.readConnectRequest):
private void readRequest() throws IOException {
//反序列化请求数据
InputStream bais = new ByteBufferInputStream(incomingBuffer);
BinaryInputArchive bia = BinaryInputArchive.getArchive(bais);
RequestHeader h = new RequestHeader();
h.deserialize(bia, "header");
incomingBuffer = incomingBuffer.slice();
if (h.getType() == OpCode.auth) {
//如果是认证请求
AuthPacket authPacket = new AuthPacket();
ZooKeeperServer.byteBuffer2Record(incomingBuffer, authPacket);
String scheme = authPacket.getScheme();
//进行认证
AuthenticationProvider ap = ProviderRegistry.getProvider(scheme);
if (ap == null
|| (ap.handleAuthentication(this, authPacket.getAuth())
!= KeeperException.Code.OK)) {
// 认证失败,返回失败内容
ReplyHeader rh = new ReplyHeader(h.getXid(), 0,
KeeperException.Code.AUTHFAILED.intValue());
sendResponse(rh, null, null);
//关闭链接
sendCloseSession();
disableRecv();
} else {
//认证成功,返回成功内容
ReplyHeader rh = new ReplyHeader(h.getXid(), 0,
KeeperException.Code.OK.intValue());
sendResponse(rh, null, null);
}
return;
} else {
//如果是请求,提交到zk处理
Request si = new Request(this, sessionId, h.getXid(), h.getType(), incomingBuffer, authInfo);
si.setOwner(ServerCnxn.me);
zk.submitRequest(si);
}
}
private void readConnectRequest() throws IOException, InterruptedException {
//反序列化链接请求对象
BinaryInputArchive bia = BinaryInputArchive
.getArchive(new ByteBufferInputStream(incomingBuffer));
ConnectRequest connReq = new ConnectRequest();
connReq.deserialize(bia, "connect");
if (connReq.getLastZxidSeen() > zk.getZKDatabase().getDataTreeLastProcessedZxid()) {
throw new CloseRequestException(msg);
}
sessionTimeout = connReq.getTimeOut();
byte passwd\[\] = connReq.getPasswd();
//初始化session
disableRecv();
if (connReq.getSessionId() != 0) {
long clientSessionId = connReq.getSessionId();
factory.closeSessionWithoutWakeup(clientSessionId);
setSessionId(clientSessionId);
zk.reopenSession(this, sessionId, passwd, sessionTimeout);
} else {
zk.createSession(this, passwd, sessionTimeout);
}
initialized = true;
}
服务端通道上下文,写返回数据(NIOServerCnxn.sendResponse)
synchronized public void sendResponse(ReplyHeader h, Record r, String tag) {
try {
//序列化返回结果,
ByteArrayOutputStream baos = new ByteArrayOutputStream();
BinaryOutputArchive bos = BinaryOutputArchive.getArchive(baos);
try {
baos.write(fourBytes);
bos.writeRecord(h, "header");
if (r != null) {
bos.writeRecord(r, tag);
}
baos.close();
} catch (IOException e) {
LOG.error("Error serializing response");
}
//写入数据长度
byte b[] = baos.toByteArray();
ByteBuffer bb = ByteBuffer.wrap(b);
bb.putInt(b.length - 4).rewind();
//
sendBuffer(bb);
} catch(Exception e) {
LOG.warn("Unexpected exception. Destruction averted.", e);
}
}
void sendBuffer(ByteBuffer bb) {
try {
if (bb != closeConn) {
//直接发送数据
if ((sk.interestOps() & SelectionKey.OP_WRITE) == 0) {
try {
sock.write(bb);
} catch (IOException e) {
// we are just doing best effort right now
}
}
if (bb.remaining() == 0) {
packetSent();
return;
}
}
//写入缓存中。
synchronized(this.factory){
sk.selector().wakeup();
outgoingBuffers.add(bb);
if (sk.isValid()) {
sk.interestOps(sk.interestOps() | SelectionKey.OP_WRITE);
}
}
} catch(Exception e) {
LOG.error("Unexpected Exception: ", e);
}
}
手机扫一扫
移动阅读更方便
你可能感兴趣的文章