Mina代码跟踪(1)
阅读原文时间:2023年07月15日阅读:1

1  NioSocketAcceptor类关系图

  

1.1 NioSocketAcceptor acceptor = new NioSocketAcceptor(5);

NioSocketAcceptor 初始化顺序

  AbstractIoService构造函数

protected AbstractIoService(IoSessionConfig sessionConfig, Executor executor) {  
    if (sessionConfig == null) {  
        throw new IllegalArgumentException("sessionConfig");  
    }

    if (getTransportMetadata() == null) {  
        throw new IllegalArgumentException("TransportMetadata");  
    }

    if (!getTransportMetadata().getSessionConfigType().isAssignableFrom(  
            sessionConfig.getClass())) {  
        throw new IllegalArgumentException("sessionConfig type: "  
                + sessionConfig.getClass() + " (expected: "  
                + getTransportMetadata().getSessionConfigType() + ")");  
    }

    // Create the listeners, and add a first listener : a activation listener  
    // for this service, which will give information on the service state.  
    listeners = new IoServiceListenerSupport(this);  
    listeners.add(serviceActivationListener);

    // Stores the given session configuration  
    this.sessionConfig = sessionConfig;

    // Make JVM load the exception monitor before some transports  
    // change the thread context class loader.  
    ExceptionMonitor.getInstance();

    if (executor == null) {  
        this.executor = Executors.newCachedThreadPool();  
        createdExecutor = true;  
    } else {  
        this.executor = executor;  
        createdExecutor = false;  
    }

    threadName = getClass().getSimpleName() + '-' + id.incrementAndGet();  
}

protected AbstractIoAcceptor(IoSessionConfig sessionConfig, Executor executor) {  
    super(sessionConfig, executor);  
    defaultLocalAddresses.add(null);  
}

AbstractPollingIoAcceptor 构造函数

private AbstractPollingIoAcceptor(IoSessionConfig sessionConfig,  
        Executor executor, IoProcessor<S> processor,  
        boolean createdProcessor) {  
    super(sessionConfig, executor);

    if (processor == null) {  
        throw new IllegalArgumentException("processor");  
    }

    this.processor = processor;  
    this.createdProcessor = createdProcessor;

    try {  
        // Initialize the selector  
        init();

        // The selector is now ready, we can switch the  
        // flag to true so that incoming connection can be accepted  
        selectable = true;  
    } catch (RuntimeException e) {  
        throw e;  
    } catch (Exception e) {  
        throw new RuntimeIoException("Failed to initialize.", e);  
    } finally {  
        if (!selectable) {  
            try {  
                destroy();  
            } catch (Exception e) {  
                ExceptionMonitor.getInstance().exceptionCaught(e);  
            }  
        }  
    }  
}

NioSocketAcceptor 构造函数

public NioSocketAcceptor(int processorCount) {  
    super(new DefaultSocketSessionConfig(), NioProcessor.class, processorCount);  
    ((DefaultSocketSessionConfig) getSessionConfig()).init(this);  
}

1.2 IoFilterChain 过滤链

aaarticlea/png;base64,iVBORw0KGgoAAAANSUhEUgAAAT0AAADuCAIAAADEJEf/AAALp0lEQVR4nO2d23HrIBBAqcclqAQXokkZnnEDVOAyNJO/24eL8f1ASIBAD4dEu/ic2Y+YWESsOVpJyPeaFwBow5y9AwBwmLy3//79o5122sW2U28B9IG3APrAWwB94C2APvAWQB94C6APvAXQB+u3tNOur516C6APvAXQB94C6ANvAfSBtwD6wFsAfeAtgD5Yv6Wddn3t1FsAfeAtgD7wFkAfeAugD7wF0AfeAugDbwH0wfot7bTra6feAugDbwH0gbcA+sBbAH3gLYA+8BZAH3gLoA/Wb2mnXV879RZAH3gLoA+8BdAH3gLoA28B9IG3APrAWwB9tL9+ezOGkBOnz4c22tuvtzdjXq9vQkKUvIWjtJ9HvJUTeFuL9vOIt3ICb2vRfh7xVk7gbS3azyPeygm8rUX7ecRbOYG3tWg/j615+/zqjDHGdPZx/s7g7Um0v552trf33lzs873fLuNhO9MP5xtY11sJ80RXe/vHv1/0dria/n74Vz/y9uj7izH0Rf9XfvVL3sJR2s/jr3jrTlazZqa/mky79+Zi7dXMZ7kP6055jTHmOoxvHvHm3HtzsYM7N7500fvnNwfvjzrxu7HsdvQzadnzK7yVQPt5rOyt07L7eu79VeitF+n51c2iTvXzYTt/1Rq9IeyzUG/n9z9sN+n6sP3XM9/tOfbibS3az2NFb4e+YOzar+J6+yw3RlJN17GJqMuXExf7dJ3EYue7rTI0vD2N9vNYud4O1+D8c8+vdns7XE3Mlrf3fr6r7NuHq0kqar7bWEvqrTbaz+OvXN8esPeQt+lJ7Jq30ft9+/JMON/tXxuLt3VpP49n30/e9HbSI7gQfX0/7XWx1ffivDp02P28en07d8v9ZN20v54mZv026+33016y95P9JeXa9a3f1pj+mpw/l+4n17pSreuthHmiq73949/Z3hJzUG9r0X4e8VZO4G0t2s8j3soJvK1F+3nEWzmBt7VoP494Kyfwthbt5xFv5QTe1qL9PNby1j2KYPq7fwIp/zDDT2P8eu3ezue9qrQDW8N0S0pvficJb2vR/npa7G34FZy1R4LmpVG/7GkzE3rs7e2vsMd/ZXrgqaq3/nv2m0M+0VsJ80RXe/vHv0y93VEw48chSpv/xFt/BBkfhPDPOR30djvGDqenl3dbR70VTPt53PY2qkjZx5jirfwPNqnJy2eeoq/Rji+HPuwkFOBh7ext73sJvjkQknwb4TqUNgy9zTgc7XM/ZIY5vJZfTsg9m7UyWLytTft53PA2qm9zMdnjba7euh6Cr8LOX+sxxpiuG7sd7Vo+degPIv0Qa/a699ObM0ZN3i42LLm639tCivYPFm+r034e171Nrg/dy84+kuvbzj72eDtu5XsLXvqp7N85bHi7lKRwsrpp1+L6dtyH3d72UYoWh7Ydg+U8uTrt5/ENb01/f6/eDvG57Egwlad7Qke89VtlT1bXvF0WbX9Rnd522vY2Pl2/2OeBweJtddrP47q3SdFY1Ntj3ia9LSpkMJUzd32i69tEp7Hn+UtC73kbHC9+7O2BweJtbdrP47q3P7u+XSzDxJK4rTr7yE3l5H7y99Av7ycnN3jCf57qPW+DA03mGrhwfet+cPtZunLeGCzeVqb99bTb5vrtu/eTF3ePozs0EytTOT7VLB1Hops9xhjTXY57G445viKdOyzeT573M/zTBwe74q2EeaKrvf3j343nHMVEyVs4Svt5xFs5gbe1aD+PeCsn8LYW7ecRb+UE3tai/TzirZzA21q0n0e8lRN4W4v284i3cgJva9H+ehreygnWb2u1t3/8uxlDyImzp0MjtJ/HG/VWTOBtLdrPI97KCbytRft5xFs5gbe1aD+PeCsn8LYW7ecRb+UE3tai/TzirZzA21q0v56Gt3Ki5K2EeaKrvf3jH97KCeptLdrP4+lPGhBhnD0dGqH9PN6ot2ICb2vRfh7xVk7gbS3azyPeygm8rUX7ecRbOYG3tWg/j3grJ/C2Fu2vp+GtnCh5K2Ge6Gpv//iHt3KCeluL9vOIt3ICb2vRfh7xVk7gbS3az+PpTwgRYZw9HRqBPKqBSQ8TTAU14C1MMBV04KRFXXCwnqajfd1bOftJ+9+0c/zWwWQsJRdeeKuC0FW8hRfeqiBxFXWBGaAAvIUEZoB0spai7ofDxy8dvIUlfPyiWfETdT8Z1gNFt+/09vT9pP2P2zlmi2a9qFJyPxY+eLlsaom3HwsfvFz2aIm6nwmfulzwFkrwqQtlv5Co+4HwkQsFb2EFPnKhHLIRdT8N1m8ltpc8PPrvNkkbF+212jlOS4R/VA3WYSpoAnXBwTzQBN6Cg3mgCbwFB/MAQB94C6APvAXQB+t+mtpZp6XdQb3VBPelwME80ATegoN5AKAPvAXQB94C6ANvNcH1LTiYB5rAW3CwHqipnfVb2h0cvwH0gbcA+sBbAH3grTK4NQUvvFUH3sILb9WBt/DCWwCNsB5IO+362qm3APrAW2VwfQsvvFUH3sJLi7cr/y8O8eFx9tw8Bx3Dvhnzen0TRBJ4Kxq8JbKBt6LBWyIbeBshZ53KgbdENhJvpc3bT1+/xVsiG9Rb0eAtkQ28FU0lb++9GemHg9sOV2OMMdeh3pwb3N7099/ofI7nV3ek83mv/miY7kO52Oc7neOtaFJvx7k409nH1mf8sN1bxrqY59zYj/+L48uJUv9Pe4n3+MtmJnTS+eGI/8rFPn/B20Xy11OKt7+BjmEXvB0/7H0l4kfzY0OtHQXTG5V7Tx1v/RGk+3pOL/v7UW+3I0z+uOf7soq39dAx7HVvpxNgf+Cfz4d9Y1IVr0P8nrmraG4F3fp2m5RN985kRkYVaew54+201XrnmeFE+zb0YSehAA9rZ29734s/KGxloLRhmPyMw8XUDeEbxp8Xf33nYPH27B3YxW3DW69lf/efsftow3PjhdtjXYon3Obk26y3UX2bi8keb3Odbwyn68ZuR7umQaW5Mv2Q5G0rA6UNS67u97aQov2DxVud67ept/Op8qiHP2cOXibeFk7VfuxtctLuXnb2kVzfdvaxx9vN4UzvHDa8XUqylYHShqWbC7u97aMULQ5tOwZbOk+WNm9Zv91bb4f45G8k6232VO13vJ0PKAfr7f7hHPHWb7WegdKGUfKDM52D3san6xf7PDBYrm91DPu2+/o2OWYvSsr42Y9vG2d5zXqb7MCi3h7zdudwFuXOH87sveTtdgZ2eRscL37s7YHB4u3ZO7CLdW+jErc4hX4N184+ks++uEnm58Tb3O3rate3i873DSc66fAld+iX95OTGzy5DLxVbzv72E5d8oPbz1LyNwaLtxooeDsTX/mka6q5zz64b9ldwukyX4iO7am38T3PS7dcv333fvKi8+gOzepw4rPlcPPieXIhA3u9DcccX5GWUrc8PJk0+YcGi7fSufGcI5ELvBUN3hLZwFvR4C2RDbyNkLNO5cBbIhs31m8lg7dENqi3osFbIht4Kxq8JbKBt6LBWyIbeCua29n/uDYhNs6em+egY9g36i2RC7wVDd4S2cDbCDnrVA68JbJxY/1WMnhLZIN6Kxq8JbKBt6LBWyIbeCsavCWygbeiwVsiG3grmtMX9wmxcfbcPAcdw75Rb4lc4G2EnHUqB94S2bixfisZvCWyQb0VDd4S2cBb0eAtkQ28FQ3eEtnAW9HgLZENvBUN3hLZwFvR4C2RDbyNkLNO5Tj9oRxCbEiYn6zfAsA2eAugD7wF0AfeAugDbwH0gbcA+sBbAH3oWL+lnXbaQ6i3APrAWwB94C2APvAWQB94C6APvAXQB94C6IP1W9pp19dOvQXQB94C6ANvAfSBtwD6wFsAfeAtgD7wFkAfrN/STru+duotgD7wFkAfeAugD7wF0AfeAugDbwH0gbcA+mD9lnba9bVTbwH0gbcA+sBbAH3gLYA+8BZAH3gLoA+8BdAH67e0066vnXoLoA+8BdAH3gLo4z9bi8g0yOWkNgAAAABJRU5ErkJggg==" alt="" />

acceptor.getFilterChain().addLast("logger", new LoggingFilter());

/\*\*  AbstractIoService  
 \* {@inheritDoc}  
 \*/  
public final DefaultIoFilterChainBuilder getFilterChain() {  
    if (filterChainBuilder instanceof DefaultIoFilterChainBuilder) {  
        return (DefaultIoFilterChainBuilder) filterChainBuilder;  
    }

    throw new IllegalStateException(  
                "Current filter chain builder is not a DefaultIoFilterChainBuilder.");  
}

源代码:

public class DefaultIoFilterChainBuilder implements IoFilterChainBuilder {

private final static Logger LOGGER =  
    LoggerFactory.getLogger(DefaultIoFilterChainBuilder.class);  
private final List<Entry> entries;

/\*\*  
 \* Creates a new instance with an empty filter list.  
 \*/  
public DefaultIoFilterChainBuilder() {  
    entries = new CopyOnWriteArrayList<Entry>();  
}

/\*\*  
 \* Creates a new copy of the specified {@link DefaultIoFilterChainBuilder}.  
 \*/  
public DefaultIoFilterChainBuilder(DefaultIoFilterChainBuilder filterChain) {  
    if (filterChain == null) {  
        throw new IllegalArgumentException("filterChain");  
    }  
    entries = new CopyOnWriteArrayList<Entry>(filterChain.entries);  
}

addLast

/\*\*  
 \* @see IoFilterChain#addLast(String, IoFilter)  
 \*/  
public synchronized void addLast(String name, IoFilter filter) {  
    register(entries.size(), new EntryImpl(name, filter));  
}  

   private void register(int index, Entry e) {
        if (contains(e.getName())) {
            throw new IllegalArgumentException(
                    "Other filter is using the same name: " + e.getName());
        }

        entries.add(index, e);
    }

1.3 IoFilter

aaarticlea/png;base64,iVBORw0KGgoAAAANSUhEUgAAAg8AAAFmCAIAAABlVO76AAAVpklEQVR4nO3d25GiTBgGYOKZEAxhArEmjK0yASOYMKyauz8Pg/G/8IQINoj0gX6e4mL3G0fRPrw0wm5zAoCQJvUOAFAAaQFAmLQAIOySFv/991/vj9XV1dXV1U/WFgCMIS0ACJMWAIRJCwDCpAUAYdICgDBpAUCY+y3U1dXV1cN1awsAwqQFAGHSAoAwaQFAmLQAIExaABAmLQAIc7+Furq6unq4bm0BQJi0ACBMWgAQJi0ACJMWAIRJCwDCpAUAYe63UFdXV1cP160tAAiTFgCESQsAwqQFAGHSAoAwaQFAmLQAIMz9Furq6urq4bq1BQBh0gKAMGkBQJi0ACBMWgAQJi0ACJMWAIS530JdXV1dPVy3tgAgTFoAECYtAAiTFlX41zS2SVvqFoPsGBVV+Nc0p9OfbeQmLeCZUVEFaSEtYCajogrSQlrATO63qKIuLd5Li9zaUV09Yd0xVBWkxXtpAdwYFVWQFtICZjIqqpB7Whx/Nk3TNM1m/5t+Z6QF9DEqqvDptNhtm6/98b2fPm+/+02zPaQPCWkBLxgVVZiQFofvZrub/KNZaTH18YPbYTuYOi9+JC1gDKOiCqPS4nw6qDcPuj+6ze+7bfO133839/NIv/vzSaWmaZrvw+XBF9f5erdtvvaH89mnr83D4+8Pbj3+4Umuu/H8tJdU6FTG/EhaQJBRUYVAWpzDYPNzHPujdlpcp+/jz+YeD7e1wu9+c/024uEB7eccWFvcH/+739xC4ne//Tn2P+3HMkNawDPXlVdRf5EWh+1ATrz60ePa4jhcfJjKb99PdOLh+a83X/vj+Uke46T/aT/y1h7SIrd2VFdPWHcMVYXA2uLw3TrDM+ZHo9Pi8N08CqXFbnu/MupaP3w3ndVD/9NaW8CCjIoqjPreYkJmTEqL7mmiV2nx8Phr/flcU//TfiAnpAUMMSqq8OlrooJpcZuUW18wnP6O+++n3/p7OnPVTo7zn19+b3F/WtdEwYKMiiosdr9Fb1r8HfdfvddEXb8qePW9xfV3m2b73TlDNXRN1ItvIN7bpAU8Myqq8Om0WPkmLeCZUVEFaSEtYCajogrSQlrATK4rr6IuLd5Li9zaUV09Yd0xVBWkxXtpAdwYFVWQFtICZjIqqpBbWpxvlGu2u+td2f232kkLyIdRUYVl0qL9z82+uk36fgvF9faIfU9aXJ4th/8QSVrAM6OiCguuLUYsDh5v1hv6dWkBWTMqqhAvLa7/Z2rTNE3/rd2Pv3X9w76z/ni+D/zhv8e4/PWwXeQslrSAZ0ZFFSKlxSUq2hP91/44Ki361hbnZ2j9Fxf3f8K2aZpmsxleskgL+DTXlVdRj5MW9++uT3+n6183+9/O9xab/e+YtLj81vXZWn+9psVi56z+ud9CXd39FnVKmBbNdvfe2uLQPgl1Pxu1a5+JWjotgBujogpx0qKzGnhaW0xLi86ztTZpAQkYFVWIkxbzvrfoLk2uz9b6t80P35v9r7SAJIyKKiyTFn33W7x7TdTTFVAPX27fSAtIxaiowoJrizVu0gKeGRVVkBbSAmYyKqogLaQFzOS68irq0uK9tMitHdXVE9YdQ1VBWryXFsCNUVEFaSEtYCajogrSQlrATEZFFaSFtICZjIoq/Gsa26QtdYtBdoyKKlhbWFvATEZFFaSFtICZXFdeRV1avJcWubWjunrCumOoKkiL99ICuDEqqiAtpAXMZFRUQVpIC5jJqKiCtJAWMJNRUQVpIS1gJqOiCslvdituS91ikB2jogrWFtYWMJPryquoS4v30iK3dlRXT1h3DFUFafFeWgA3RkUVpIW0gJmMiipIC2kBMxkVVZAW0gJmMiqqIC2kBcxkVFRBWkgLmMmoqIK0kBYwk+vKq6gnvzW6uC3PdlRXT1h3DEUajt+hLEYsaUgLKIsRSwLnqBAYUBDDlQSkBRTHcCW2W0hICyiI4Ups7ZAQGFAKY5WoOvEgLaAUritXj1p/nRb57Ke6urr7LUimdyVheQFFMFCJR1pAuQxUIhlKBWkBRTBQieRFKggMyJ9RSgyv80BaQP6MUmII5oHAgMwZoixuTBJIC8ic+y3UF6+PT4s8919dXf1kbcHSxi8aLC8gZ8Yny5IWsA7GJwuaFADSAnJmfLKg4H98/fz4yHsIjGRwksDUFAGSMzhJQCpAcQxaAMLcb6GeoD4kt/1UV1e/sbYAIExakIDvLaA4Bi0JSAsojkFLAtICimPQAhAmLQAIkxYAhLnfQj12/V/TZLU/6urqY+rWFsTmK24okXFLbNICSmTcAhAmLQAIkxYAhEkLYvO9BZTIuCU2aQElcr9F8fUX/2upbeaWQ/uqq2dSd5RXvH9Nczr92T6+WQNBm/FQPGkhLSAC46F40kJaQATGQ/GkhbSACIyH4kkLaQERGA/FkxbSAiIwHoq3RFoctk3TNM12dzp8N03TNN+HSHP0bts0TfO1P0oLyIv7LYqvD6fF737TtG0P/dPicf/18LjNz74nLS7Pttn/zg2h8zN/Pi0+sIedtMihfdXVM6k7eipeYG0xYnFwTYu+x3wyLXa3sBjOg7zSInXbQkaMh+JNS4vjz+Zpyu5Ji9tvXf+w76w/Lg9uB8Bt7XIpbg/nxcTlaS+vsv3pmdMvr9LZsYcnv0fI5S18bx9XKt0V0vQ9lBbwgvFQvAlpcZ1nW9Po1/44Ki361hbnZzj/1uWn28N9Lt5s2k97n6AHAqy7V6fTbrv5OQ6+i4fX2h561xaT9lBawCvGQ/HGp8X9u+vT3+n6183+t3NUvtn/jkmL61ph9/TX61zcs3p4So7uXg2diWrVH9Kl/evz9lBawEvGQ/FmpkWz3b23tjg8nie6aM3FrS/Vu9+3X3Pp59gKrZ606D1DNTotpuyhtIAA46F449Oic6z9tLaYlhadZ3taB7Tm4sv83loxXJ72a38cTIvL819ORgXWFpv9b3Bt8WoPpQWEGA/FG58W87636C5NejNgs/99nosvv3j7EqK92rhdpHv+aes5H16u/Vrtd/G4D2/vobSAIPdbFF8fTou++y3evSbq6fqih6+Ob/rm4v4vCdovej9ltPlqze+tl2vXH95CZ9J/bw8H0yKH9lVXz6Tu6Kl4w2mx0u3xTNRym7UFtBkPxZMW0gIiMB6KJy2kBURgPBSvurSItUkLaDMeiictpAVEYDwUT1pIC4jAeCietJAWEIH7LYqvS4vl0iKH9lVXz6Tu6Kl4/5rGttCWum0hI8ZD8awtlltbpG5byIjxUDxpIS0gAuOheNJCWkAExkPxpIW0gAiMh+JJC2kBERgPxZMW0gIicL9F8XVpsVxa5NC+6uqZ1B09FU9aLJcWqdsWMmI8FC/5LWwr3lK3LWTEeCietYW1BURgPBRPWkgLiMB4KJ60kBYQgfFQPGkhLSAC46F40kJaQATutyi+Li2WS4sc2lddPZO6o6fiSYvl0iJ120JGjIfiSQtpAREYD8WTFtICIjAeipf8hucVb6nbFjJiPDBo6Rkz2nQsBmA+w4YHMWfVmBN354UsKWAqA4PTqTV7dooRXnTRl+i83PhHyg/ocL9F1fUXs2Fv8YP7c3v+9gst+n7fm/SX/hzU1UupO2iqzshD5ggLizgv1Puii/4KrJKRUItJJ1UiTJHtl8g2MEQF3BgM6/fGmffIaRHnFd94LWkBNwZDFTI8oM4/LUQFtBkPtcjqgHr89+oxd+CNx0A9jIdaZHVAnTwtgi8nKqDDkKhIPgfUQ6+ST2BIC+hwv0Vd9ZEH1BHu8wjuwxKv26m/HVrJ21FdPX7dAVR1cjigzmEfXrxcwm/gIVuGQV1eXE2bz5fMaQNjKD9kBpUzACry+vbpHBYWkx7zQaluFYSCGBi1eH0Enc/CYvxjPijJP0MCZTE2qhBcTOSWFuMf9innlxu/b3KF2ujx6xf8oiKHL5bfftgHZfjvo0A+dPc1G/kPzWaYFv5jCciN+y1WWx/5/0ZE/v8bXix0XvxT6jl8nq/r533OZ3/U1T9ed+y2TtkelbdPf438nzZKsZo3Ar107hXKec4aioec9xk4SYv1Me0CSzCzrIczIVnRHKyM3rwSJqY8aRdWQ1degxVMSSt4C7BuhmjxVjPPruaNDFn9G2Td3G9Rdr0zASXfnzn1f02T1f4sUe98mZF8f9TVx9cd7JTKl6hATKabIskJIDKTTnlExWpYIFIQPbUwK55cVvzWXqv2jVMW3bQkq59WVv8GoVwGZxkqOWVRw3t8zSdAtnTNAphBqlLJkQHFcb9F7vXI//9E2vqQ3PZTXb3CukOYrDnGBDJhMspXhVFR4VsOcmKKTOiFmap2gqj2jb/mYyE5XTA7lR9L1vzeIWdGZl7MlQTpJCSh22XELMBIlS9ASUKHy4XBD+TM/RZZ1IeiIrf9jFN//jTy3E919arqjmcTc0rhmQ9kKr2ICPSwlIzwXj6W9/jcWJTulYyxDRTEhJWGqGBROhgfp0slYCQTgS8z+CydKSoDeCSfEuTGmIzHDDiezwpy436LSPXz9JfP/mRe76RF8v1ZQb29rs1hf9SLqzuCi8GRMpnQFXmbrrM44xNYARPZgnynTc50TibRXZZiKM7kA4zAAQ3j6SiLMALn8xlCVgzIzzPNfYSPEbJiQH6YOY5yOTHFC66//ljd9ezq8fubLZ8tz37ywbrjiM/o7SuwnH9Nczr92TLZapgB1v8OI6ihoyThg31BWmS11dBX1/8Ol1ZDL0nFZ/uCtMhqq6Gvrv8dLmqtXST5KeAituRtlHyKtN225P0hgvW/w4XkMF8sx0yU/+ygjbLakveHCNb/Dpew+p5hJsp/dtBGWW3J+0ME63+HH1dDtzAT5T87FNNGh++maZrm+zDreXbbpmmar/1x7v4ctk3TNM1296Edy6U/RLDg/QHJTyvnti30OS9RL2YmSrf1zg6R77d42KXjz6Z5tN1NfFO/+03TNM1m//vJz+pxUj7uv1q7OH72fzMtHl+uaTY/+560+MAbT94fItQXzEMzTrAzZUvb5d+gA2lxnk/Pc+vU6W/ptLg8f7P5Od5ebmykzUyLvgXE8mmxMtIi0lZWZ9J2+Tfoy7S4nm/Z/ByvybE9nIvfh1NnIXL5le5h+PAjT6e/WyDdjtmPQw++TcqXP7Sf5He/351evMrlV55/9PDq28PgXh2e06K7P9/73jfe/xJ9H2Ye/SECaVHL5DLJmtpureepu23Umxbb3X2dsbnOm5dHtufE8289HWIPPvI+aZ7OP9r8HIcefP3Yt/cA++vb86dfDLz6uX7Z5+2hf6/GpEXf2iLwEvcPM5v+EIG0iLSV1Zm6bdc9xGsfzS2xvX9mIO156pRt1Hcm6mGCu77ZVpDc/7rZ/z5/7EOPvHzIj/P+4NOG0mLoFx/r97S4vPr18be/9u5Vpz+09+dFWgy9xItTfMn7QwTSItJWVmcaSIsPXJEybpufFus/Tz2QFs+J3jni7p+dm+1uZFo0212nHnjat9Li9irXnbmnxeHhZNPt3fbvVU9/GJEWQy/x/GHm0x8ikBaRtrI604i0uJ7VPQ/O8wMuE8HYc8rHh1ND93E45QR69zzyvprz1N02ejwT1dq6E1znwPnF2mLoka+O4ofXFoeeM4G/+/1u6BeH0qLz+O4O9K8tpqXF0EtIi6V0e/OSW+9hxdynWtdp7km6bde7trhPT+2TvBPOKQ+lxRsn0G/nkcekxTrOU7+dFsMf5tM4CnyjMO17i6drov4O26bZ7gZfpX0I0n53z+/08L3Z//bu1bjvLYbe+NNLSItT3OvBlzjl/XZaRD7Nncl108F6t+2631tcx16rfm7WSeeUT6PTYvhUe/c8ctrz1BncbzEiLU4jr3T69DVR7absdKSBV7k/ePP1+O6uwdNu6L69GpkWPW+8/yWmpUVu47qc+y2WTIu3t3pOc0/Sbbvh7y36Tzp3DJxTnpkWveeR6zlP3W0jW9IteX+IIIO0mHqpdW/9NhFcV7Xb6/i/T/F9vzhmclnHae5Jum03kBbtA/netUX3ke+uLYZPtX8mLUo8T91tI1vSLXl/iCB1WoTPdU6st1+lvTYf+MX3JpcST3NPMiot7m3X+jSmnFM+9pyMvjxg7An0N9NiDeeppUVWW/L+EEHitJh6qfVQ/XltcRj3iy7H7jWQFnfbw8M1UY9Lh7HnlI/t1cnlxHR3lXab34PXRD20RaLz1CnbyJZ0S94fIsgxLYKXWj/XX6bF7bzQ8NV4FZzmnsRMlP/soI2y2pL3hwgSp8XUS60zSYsST3NPYibKf3bQRlltyftDBHl/b/F8qfXr+nBaDP1iPae5JzET5T87aKOstuT9IYIE91vcvbhGu+eM9tO/lNmuB9Ni4Bcjn+bO5LrpYN1M9N7skPJ+C1vSLXl/iFAv4V7uoduOBm9HevcJo3embJmJ8m/Qf6n/dy9bZ0vbHyLINi122+u1+Y/X2wzV337CWiaXSea1XRVb8gbVRlltyftDBBmnRdNyn9mH6m8/oc7Uw0yUf4Nqo6y25P0hgmzTYm1bWZ1J2+XfoNooqy15f4hAWuhMPZKfAi5iS95GyXu17bYl7w8RSAudqYe2y79BtVFWW/L+EIG00Jl6aLv8G1QbZbUl7w8RRLzfou7tn/st1rX1zg7ut6h2S94fItSXXVvY8jnNPYmZ6L3ZQRtVuyXvDxE4E6Uz9dB2+Tdo8qMfW2dL2x8ikBa1TC6TaLv8G1QbZbUl7w8RSAudqYe2y79BtVFWW/L+EIG00Jl6aLv8G1QbZbUl7w8RSAudqUfyU8BFbMnbKHmvtt225P0hAmmhM/XQdvk3qDbKakveHyJwv0XUzpTJddPBurYb2aAJ20sbZbUl7w8R6tYWKTtTtrRd/g2qjbLakveHCNydV8tp7knMRPnPDtooqy15f4jA2kJn6qHt8m9QbZTVlrw/RCAtdKYe2i7/Bk2+VrZ1trT9IQJpUcvkMom2W1mDwnzSwuTSQ9utrEFhPmlhcumh7VbWoDCf+y2iTi6ZXDcdrCc/BVzElk97qatHqFtbRE0LgEJJC2kBECYtpAVAmHu5U57mBiiFKQyAMGkBQJi0ACBswfst1NXV1dVXU7e2ACBMWgAQJi0ACJMWAIRJCwDCpAUAYdICgDD3W6irq6urh+vWFgCESQsAwqQFAGHSAoAwaQFAmLQAIExaABDmfgt1dXV19XDd2gKAMGkBQJi0ACBMWgAQJi0ACJMWAIRJCwDC3G+hrq6urh6uW1sAECYtAAiTFgCESQsAwqQFAGHSAoAwaQFAmPst1NXV1dXDdWsLAMKkBQBh0gKAMGkBQJi0ACBMWgAQJi0ACHO/hbq6urp6uG5tAUCYtAAgTFoAECYtAAiTFgCESQsAwqQFAGHut1BXV1dXD9etLQAI+x9/pPPUvfBzwwAAAABJRU5ErkJggg==" alt="" />

1.4 IoSessionConfig

1.4 IoProcess

  aaarticlea/png;base64," alt="" />

1.5 acceptor.bind(new InetSocketAddress(this.serverAddr, this.serverPort));

  最终会运行AbstractPollingIoAcceptor.bindInternal

AbstractPollingIoAcceptor
protected final Set bindInternal(
List localAddresses) throws Exception {
// Create a bind request as a Future operation. When the selector
// have handled the registration, it will signal this future.
AcceptorOperationFuture request = new AcceptorOperationFuture(
localAddresses);

    // adds the Registration request to the queue for the Workers  
    // to handle  
    registerQueue.add(request);

    // creates the Acceptor instance and has the local  
    // executor kick it off.  
    startupAcceptor();

    // As we just started the acceptor, we have to unblock the select()  
    // in order to process the bind request we just have added to the  
    // registerQueue.  
    wakeup();

    // Now, we wait until this request is completed.  
    request.awaitUninterruptibly();

    if (request.getException() != null) {  
        throw request.getException();  
    }

    // Update the local addresses.  
    // setLocalAddresses() shouldn't be called from the worker thread  
    // because of deadlock.  
    Set<SocketAddress> newLocalAddresses = new HashSet<SocketAddress>();

    for (H handle:boundHandles.values()) {  
        newLocalAddresses.add(localAddress(handle));  
    }

    return newLocalAddresses;  
}

1.5.1 startupAcceptor() 使用线程池执行Acceptor

/\*\*  
 \* This method is called by the doBind() and doUnbind()  
 \* methods.  If the acceptor is null, the acceptor object will  
 \* be created and kicked off by the executor.  If the acceptor  
 \* object is null, probably already created and this class  
 \* is now working, then nothing will happen and the method  
 \* will just return.  
 \*/  
private void startupAcceptor() {  
    // If the acceptor is not ready, clear the queues  
    // TODO : they should already be clean : do we have to do that ?  
    if (!selectable) {  
        registerQueue.clear();  
        cancelQueue.clear();  
    }

    // start the acceptor if not already started  
    Acceptor acceptor = acceptorRef.get();

    if (acceptor == null) {  
        acceptor = new Acceptor();

        if (acceptorRef.compareAndSet(null, acceptor)) {  
            executeWorker(acceptor);  
        }  
    }  
}

Acceptor() 实现了Runnable接口 不停的执行

/**
* This class is called by the startupAcceptor() method and is
* placed into a NamePreservingRunnable class.
* It's a thread accepting incoming connections from clients.
* The loop is stopped when all the bound handlers are unbound.
*/
private class Acceptor implements Runnable {
public void run() {
assert (acceptorRef.get() == this);

        int nHandles = 0;

        while (selectable) {  
            try {  
                // Detect if we have some keys ready to be processed  
                // The select() will be woke up if some new connection  
                // have occurred, or if the selector has been explicitly  
                // woke up  
                int selected = select();

                // this actually sets the selector to OP\_ACCEPT,  
                // and binds to the port on which this class will  
                // listen on  
                nHandles += registerHandles();

                // Now, if the number of registred handles is 0, we can  
                // quit the loop: we don't have any socket listening  
                // for incoming connection.  
                if (nHandles == 0) {  
                    acceptorRef.set(null);

                    if (registerQueue.isEmpty() && cancelQueue.isEmpty()) {  
                        assert (acceptorRef.get() != this);  
                        break;  
                    }

                    if (!acceptorRef.compareAndSet(null, this)) {  
                        assert (acceptorRef.get() != this);  
                        break;  
                    }

                    assert (acceptorRef.get() == this);  
                }

                if (selected > 0) {  
                    // We have some connection request, let's process  
                    // them here.  
                    processHandles(selectedHandles());  
                }

                // check to see if any cancellation request has been made.  
                nHandles -= unregisterHandles();  
            } catch (ClosedSelectorException cse) {  
                // If the selector has been closed, we can exit the loop  
                break;  
            } catch (Throwable e) {  
                ExceptionMonitor.getInstance().exceptionCaught(e);

                try {  
                    Thread.sleep(1000);  
                } catch (InterruptedException e1) {  
                    ExceptionMonitor.getInstance().exceptionCaught(e1);  
                }  
            }  
        }

        // Cleanup all the processors, and shutdown the acceptor.  
        if (selectable && isDisposing()) {  
            selectable = false;  
            try {  
                if (createdProcessor) {  
                    processor.dispose();  
                }  
            } finally {  
                try {  
                    synchronized (disposalLock) {  
                        if (isDisposing()) {  
                            destroy();  
                        }  
                    }  
                } catch (Exception e) {  
                    ExceptionMonitor.getInstance().exceptionCaught(e);  
                } finally {  
                    disposalFuture.setDone();  
                }  
            }  
        }  
    }

NioSocketAcceptor
/**
* Check if we have at least one key whose corresponding channels is
* ready for I/O operations.
*
* This method performs a blocking selection operation.
* It returns only after at least one channel is selected,
* this selector's wakeup method is invoked, or the current thread
* is interrupted, whichever comes first.
*
* @return The number of keys having their ready-operation set updated
* @throws IOException If an I/O error occurs
* @throws ClosedSelectorException If this selector is closed
*/
@Override
protected int select() throws Exception {
return selector.select();
}

请问

/**
* Sets up the socket communications. Sets items such as:
*


* Blocking
* Reuse address
* Receive buffer size
* Bind to listen port
* Registers OP_ACCEPT for selector
*/
private int registerHandles() {
for (;;) {
// The register queue contains the list of services to manage
// in this acceptor.
AcceptorOperationFuture future = registerQueue.poll();

        if (future == null) {  
            return 0;  
        }

        // We create a temporary map to store the bound handles,  
        // as we may have to remove them all if there is an exception  
        // during the sockets opening.  
        Map<SocketAddress, H> newHandles = new ConcurrentHashMap<SocketAddress, H>();  
        List<SocketAddress> localAddresses = future.getLocalAddresses();

        try {  
            // Process all the addresses  
            for (SocketAddress a : localAddresses) {  
                H handle = open(a);  
                newHandles.put(localAddress(handle), handle);  
            }

            // Everything went ok, we can now update the map storing  
            // all the bound sockets.  
            boundHandles.putAll(newHandles);

            // and notify.  
            future.setDone();  
            return newHandles.size();  
        } catch (Exception e) {  
            // We store the exception in the future  
            future.setException(e);  
        } finally {  
            // Roll back if failed to bind all addresses.  
            if (future.getException() != null) {  
                for (H handle : newHandles.values()) {  
                    try {  
                        close(handle);  
                    } catch (Exception e) {  
                        ExceptionMonitor.getInstance().exceptionCaught(e);  
                    }  
                }

                // TODO : add some comment : what is the wakeup() waking up ?  
                wakeup();  
            }  
        }  
    }  
}

接上图open方法 jdk自带的NIO

/\*\*  
 \* {@inheritDoc}  
 \*/  
@Override  
protected ServerSocketChannel open(SocketAddress localAddress)  
        throws Exception {  
    // Creates the listening ServerSocket  
    ServerSocketChannel channel = ServerSocketChannel.open();

    boolean success = false;

    try {  
        // This is a non blocking socket channel  
        channel.configureBlocking(false);

        // Configure the server socket,  
        ServerSocket socket = channel.socket();

        // Set the reuseAddress flag accordingly with the setting  
        socket.setReuseAddress(isReuseAddress());

        // and bind.  
        socket.bind(localAddress, getBacklog());

        // Register the channel within the selector for ACCEPT event  
        channel.register(selector, SelectionKey.OP\_ACCEPT);  
        success = true;  
    } finally {  
        if (!success) {  
            close(channel);  
        }  
    }  
    return channel;  
}

Acceptor接收数据

if (selected > 0) {
// We have some connection request, let's process
// them here.
processHandles(selectedHandles());
}

    /\*\*  
     \* This method will process new sessions for the Worker class.  All  
     \* keys that have had their status updates as per the Selector.selectedKeys()  
     \* method will be processed here.  Only keys that are ready to accept  
     \* connections are handled here.  
     \* <p/>  
     \* Session objects are created by making new instances of SocketSessionImpl  
     \* and passing the session object to the SocketIoProcessor class.  
     \*/  
    @SuppressWarnings("unchecked")  
    private void processHandles(Iterator<H> handles) throws Exception {  
        while (handles.hasNext()) {  
            H handle = handles.next();  
            handles.remove();

            // Associates a new created connection to a processor,  
            // and get back a session  
            S session = accept(processor, handle);

            if (session == null) {  
                break;  
            }  

          //初始化session
initSession(session, null, null);

            // add the session to the SocketIoProcessor 过滤链处理数据入口  
            session.getProcessor().add(session);  
        }  
    }

processHandles 的accept 获取SocketChannel 封装成NioSocketSession

protected NioSession accept(IoProcessor<NioSession> processor,  
        ServerSocketChannel handle) throws Exception {

    SelectionKey key = handle.keyFor(selector);

    if ((key == null) || (!key.isValid()) || (!key.isAcceptable()) ) {  
        return null;  
    }

    // accept the connection from the client  
    SocketChannel ch = handle.accept();

    if (ch == null) {  
        return null;  
    }

    return new NioSocketSession(this, processor, ch);  
}

processor

/\*\*  
 \* {@inheritDoc}  
 \*/  
public final void add(S session) {  
    if (disposed || disposing) {  
        throw new IllegalStateException("Already disposed.");  
    }

    // Adds the session to the newSession queue and starts the worker  
    newSessions.add(session);  
    startupProcessor();  
}  
/\*\*  
 \* Starts the inner Processor, asking the executor to pick a thread in its  
 \* pool. The Runnable will be renamed  
 \*/  
private void startupProcessor() {  
    Processor processor = processorRef.get();

    if (processor == null) {  
        processor = new Processor();

        if (processorRef.compareAndSet(null, processor)) {  
            executor.execute(new NamePreservingRunnable(processor, threadName));  
        }  
    }

    // Just stop the select() and start it again, so that the processor  
    // can be activated immediately.  
    wakeup();  
}

主要接收数据处理代码逻辑

/**
* The main loop. This is the place in charge to poll the Selector, and to
* process the active sessions. It's done in
* - handle the newly created sessions
* -
*/
private class Processor implements Runnable {
public void run() {
assert (processorRef.get() == this);

        int nSessions = 0;  
        lastIdleCheckTime = System.currentTimeMillis();

        for (;;) {  
            try {  
                // This select has a timeout so that we can manage  
                // idle session when we get out of the select every  
                // second. (note : this is a hack to avoid creating  
                // a dedicated thread).  
                long t0 = System.currentTimeMillis();  
                int selected = select(SELECT\_TIMEOUT);  
                long t1 = System.currentTimeMillis();  
                long delta = (t1 - t0);

                if ((selected == 0) && !wakeupCalled.get() && (delta < 100)) {  
                    // Last chance : the select() may have been  
                    // interrupted because we have had an closed channel.  
                    if (isBrokenConnection()) {  
                        LOG.warn("Broken connection");

                        // we can reselect immediately  
                        // set back the flag to false  
                        wakeupCalled.getAndSet(false);

                        continue;  
                    } else {  
                        LOG.warn("Create a new selector. Selected is 0, delta = "  
                                        + (t1 - t0));  
                        // Ok, we are hit by the nasty epoll  
                        // spinning.  
                        // Basically, there is a race condition  
                        // which causes a closing file descriptor not to be  
                        // considered as available as a selected channel, but  
                        // it stopped the select. The next time we will  
                        // call select(), it will exit immediately for the same  
                        // reason, and do so forever, consuming 100%  
                        // CPU.  
                        // We have to destroy the selector, and  
                        // register all the socket on a new one.  
                        registerNewSelector();  
                    }

                    // Set back the flag to false  
                    wakeupCalled.getAndSet(false);

                    // and continue the loop  
                    continue;  
                }

                // Manage newly created session first  
                nSessions += handleNewSessions();

                updateTrafficMask();

                // Now, if we have had some incoming or outgoing events,  
                // deal with them  
                if (selected > 0) {  
                    //LOG.debug("Processing ..."); // This log hurts one of the MDCFilter test...  
                    process();  
                }

                // Write the pending requests  
                long currentTime = System.currentTimeMillis();  
                flush(currentTime);

                // And manage removed sessions  
                nSessions -= removeSessions();

                // Last, not least, send Idle events to the idle sessions  
                notifyIdleSessions(currentTime);

                // Get a chance to exit the infinite loop if there are no  
                // more sessions on this Processor  
                if (nSessions == 0) {  
                    processorRef.set(null);

                    if (newSessions.isEmpty() && isSelectorEmpty()) {  
                        // newSessions.add() precedes startupProcessor  
                        assert (processorRef.get() != this);  
                        break;  
                    }

                    assert (processorRef.get() != this);

                    if (!processorRef.compareAndSet(null, this)) {  
                        // startupProcessor won race, so must exit processor  
                        assert (processorRef.get() != this);  
                        break;  
                    }

                    assert (processorRef.get() == this);  
                }

                // Disconnect all sessions immediately if disposal has been  
                // requested so that we exit this loop eventually.  
                if (isDisposing()) {  
                    for (Iterator<S> i = allSessions(); i.hasNext();) {  
                        scheduleRemove(i.next());  
                    }

                    wakeup();  
                }  
            } catch (ClosedSelectorException cse) {  
                // If the selector has been closed, we can exit the loop  
                break;  
            } catch (Throwable t) {  
                ExceptionMonitor.getInstance().exceptionCaught(t);

                try {  
                    Thread.sleep(1000);  
                } catch (InterruptedException e1) {  
                    ExceptionMonitor.getInstance().exceptionCaught(e1);  
                }  
            }  
        }

        try {  
            synchronized (disposalLock) {  
                if (disposing) {  
                    doDispose();  
                }  
            }  
        } catch (Throwable t) {  
            ExceptionMonitor.getInstance().exceptionCaught(t);  
        } finally {  
            disposalFuture.setValue(true);  
        }  
    }  
}

过滤链:

private void process() throws Exception {  
    for (Iterator<S> i = selectedSessions(); i.hasNext();) {  
        S session = i.next();  
        process(session);  
        i.remove();  
    }  
}

/\*\*  
 \* Deal with session ready for the read or write operations, or both.  
 \*/  
private void process(S session) {  
    // Process Reads  
    if (isReadable(session) && !session.isReadSuspended()) {  
        read(session);  
    }

    // Process writes  
    if (isWritable(session) && !session.isWriteSuspended()) {  
        // add the session to the queue, if it's not already there  
        if (session.setScheduledForFlush(true)) {  
            flushingSessions.add(session);  
        }  
    }  
}

private void read(S session) {
IoSessionConfig config = session.getConfig();
int bufferSize = config.getReadBufferSize();
IoBuffer buf = IoBuffer.allocate(bufferSize);

    final boolean hasFragmentation = session.getTransportMetadata()  
            .hasFragmentation();

    try {  
        int readBytes = 0;  
        int ret;

        try {  
            if (hasFragmentation) {

                while ((ret = read(session, buf)) > 0) {  
                    readBytes += ret;

                    if (!buf.hasRemaining()) {  
                        break;  
                    }  
                }  
            } else {  
                ret = read(session, buf);

                if (ret > 0) {  
                    readBytes = ret;  
                }  
            }  
        } finally {  
            buf.flip();  
        }

        if (readBytes > 0) {  

//处理逻辑
IoFilterChain filterChain = session.getFilterChain();
filterChain.fireMessageReceived(buf);
buf = null;

            if (hasFragmentation) {  
                if (readBytes << 1 < config.getReadBufferSize()) {  
                    session.decreaseReadBufferSize();  
                } else if (readBytes == config.getReadBufferSize()) {  
                    session.increaseReadBufferSize();  
                }  
            }  
        }

        if (ret < 0) {  
            scheduleRemove(session);  
        }  
    } catch (Throwable e) {  
        if (e instanceof IOException) {  
            if (!(e instanceof PortUnreachableException)  
                    || !AbstractDatagramSessionConfig.class.isAssignableFrom(config.getClass())  
                    || ((AbstractDatagramSessionConfig) config).isCloseOnPortUnreachable()) {  
                scheduleRemove(session);  
            }  
        }

        IoFilterChain filterChain = session.getFilterChain();  
        filterChain.fireExceptionCaught(e);  
    }  
}

调用过滤连: DefaultIoFilterChain

public void fireMessageReceived(Object message) {  
    if (message instanceof IoBuffer) {  
        session.increaseReadBytes(((IoBuffer) message).remaining(), System  
                .currentTimeMillis());  
    }

    Entry head = this.head;  
    callNextMessageReceived(head, session, message);  
}

private void callNextMessageReceived(Entry entry, IoSession session,  
        Object message) {  
    try {  
        IoFilter filter = entry.getFilter();  
        NextFilter nextFilter = entry.getNextFilter();  
        filter.messageReceived(nextFilter, session,  
                message);  
    } catch (Throwable e) {  
        fireExceptionCaught(e);  
    }  
}

业务处理handler总是最后处理