suging闲谈-netty 的异步非阻塞IO线程与业务线程分离
阅读原文时间:2022年02月16日阅读:1

前言

surging 对外沉寂了一段时间了,但是作者并没有闲着,而是针对于客户的需要添加了不少功能,也给我带来了不少外快收益, 就比如协议转化,consul 的watcher 机制,JAVA版本,skywalking 升级支持8.0,.升级NET 6.0 ,而客户自己扩展支持服务编排流程引擎,后期客户还需要扩展定制coap ,XMPP等协议。而今天写这篇文章的目的针对于修改基于netty 的异步非阻塞业务逻辑操作

问题描述

年前客户把JAVA版本进行了测试,产生了不少问题,客户也比较茫然,因为有内存泄漏,通过jmeter压测,并发始终上不来,通过半个月的努力,终于把问题解决了,预估JAVA版本并发能达到2万左右,以下是客户通过设置jmeter压测实例

解决方案

当客户把问题抛给我后,我第一反应是IO线程被阻塞造成的,而这样就可以把问题定位在netty 的处理上,而处理server 端代码是NettyServerMessageListener,而其中ServerHandler的channelRead是处理业务逻辑的,在这当中我是通过ThreadPoolExecutor执行异步处理,可以看看NettyServerMessageListener代码:

public class NettyServerMessageListener implements IMessageListener {
private Thread thread;
private static final Logger logger = LoggerFactory.getLogger(NettyServerMessageListener.class);
private ChannelFuture channel;
private final ITransportMessageDecoder transportMessageDecoder;
private final ITransportMessageEncoder transportMessageEncoder;
ReceivedDelegate Received = new ReceivedDelegate();
@Inject
public NettyServerMessageListener( ITransportMessageCodecFactory codecFactory)
{
this.transportMessageEncoder = codecFactory.GetEncoder();
this.transportMessageDecoder = codecFactory.GetDecoder();
}

public void StartAsync(final String serverAddress) {  
    thread = new Thread(new Runnable() {  
        int parallel = Runtime.getRuntime().availableProcessors();  
        final DefaultEventLoopGroup eventExecutors = new DefaultEventLoopGroup(parallel);  
        ThreadFactory threadFactory = new DefaultThreadFactory("rpc-netty", true);  
        public void run() {  
            String\[\] array = serverAddress.split(":");  
            logger.debug("准备启动服务主机,监听地址:" + array\[0\] + "" + array\[1\] + "。");  
            EventLoopGroup bossGroup = new NioEventLoopGroup();  
            EventLoopGroup workerGroup = new NioEventLoopGroup(parallel,threadFactory);  
            ServerBootstrap bootstrap = new ServerBootstrap();  
            bootstrap.group(bossGroup, workerGroup).option(ChannelOption.SO\_BACKLOG,128)  
                    .childOption(ChannelOption.SO\_KEEPALIVE,true).childOption(ChannelOption.TCP\_NODELAY, true).channel(NioServerSocketChannel.class)  
                    .childHandler(new ChannelInitializer<NioSocketChannel>() {  
                        @Override  
                        protected void initChannel(NioSocketChannel socketChannel) throws Exception {  
                            socketChannel.pipeline()  
                                    .addLast(new LengthFieldPrepender(4))  
                                    .addLast(new LengthFieldBasedFrameDecoder(Integer.MAX\_VALUE, 0, 4, 0, 4))  
                                    .addLast(new ServerHandler(eventExecutors,new ReadAction<ChannelHandlerContext, TransportMessage>() {  
                                                @Override  
                                                public void run() {  
                                                    IMessageSender sender = new NettyServerMessageSender(transportMessageEncoder, this.parameter);  
                                                    onReceived(sender, this.parameter1);  
                                                }  
                                            },transportMessageDecoder)  
                                    );  
                        }  
                    })  
                    .option(ChannelOption.SO\_BACKLOG, 128)  
                    .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);  
            try {

                String host = array\[0\];  
                int port = Integer.parseInt(array\[1\]);  
                channel = bootstrap.bind(host, port).sync();  
                logger.debug("服务主机启动成功,监听地址:" + serverAddress + "。");  
            } catch (Exception e) {  
                if (e instanceof InterruptedException) {  
                    logger.info("Rpc server remoting server stop");  
                } else {  
                    logger.error("Rpc server remoting server error", e);  
                }

            }  
        }  
    });  
    thread.start();  
}

@Override  
public ReceivedDelegate getReceived() {  
    return Received;  
}

public void onReceived(IMessageSender sender, TransportMessage message) {  
    if (Received == null)  
        return;  
    Received.notifyX(sender,message);  
}

private class ReadAction<T,T1> implements  Runnable  
{  
    public  T parameter;  
    public T1 parameter1;  
    public void setParameter( T tParameter,T1 tParameter1) {  
        parameter = tParameter;  
        parameter1 = tParameter1;  
    }

    @Override  
    public void run() {

    }  
}

private class ServerHandler extends ChannelInboundHandlerAdapter {  
    private final DefaultEventLoopGroup serverHandlerPool;  
    private final ReadAction<ChannelHandlerContext, TransportMessage> serverRunnable;  
    private final ITransportMessageDecoder transportMessageDecoder;

    public ServerHandler(final DefaultEventLoopGroup threadPoolExecutor, ReadAction<ChannelHandlerContext, TransportMessage> runnable,  
                         ITransportMessageDecoder transportMessageDecoder) {  
        this.serverHandlerPool = threadPoolExecutor;  
        this.serverRunnable = runnable;  
        this.transportMessageDecoder = transportMessageDecoder;  
    }

    @Override  
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {  
        logger.warn("与服务器:" + ctx.channel().remoteAddress() + "通信时发送了错误。");  
        ctx.close();  
    }

    @Override  
    public void channelReadComplete(ChannelHandlerContext context) {  
        context.flush();  
    }

    @Override  
    public void channelRead(ChannelHandlerContext channelHandlerContext, Object message) throws Exception {  
       ByteBuf buffer = (ByteBuf) message;  
        try {  
            byte\[\] data = new byte\[buffer.readableBytes()\];  
            buffer.readBytes(data);  
            serverHandlerPool.execute(() -> {  
                TransportMessage transportMessage = null;  
                try {  
                    transportMessage = transportMessageDecoder.Decode(data);  
                } catch (IOException e) {  
                    e.printStackTrace();  
                }  
                serverRunnable.setParameter(channelHandlerContext, transportMessage);  
                serverRunnable.run();  
            });  
        }  
        finally {  
            ReferenceCountUtil.release(message);  
        }  
    }  
}  

}

ThreadPoolExecutor代码:

public static ThreadPoolExecutor makeServerThreadPool(final String serviceName, int corePoolSize, int maxPoolSize) {  
    ThreadPoolExecutor serverHandlerPool = new ThreadPoolExecutor(  
            corePoolSize,  
            maxPoolSize,  
            60L,  
            TimeUnit.SECONDS,  
            new ArrayBlockingQueue<Runnable>(  10000));  

/*
new LinkedBlockingQueue(10000),
r -> new Thread(r, "netty-rpc-" + serviceName + "-" + r.hashCode()),
new ThreadPoolExecutor.AbortPolicy());*/

    return serverHandlerPool;  
}

后面通过查找官方的文档发现以下addLast是IO线程阻塞调用

.addLast(new ServerHandler(eventExecutors,new ReadAction() {
@Override
public void run() {
IMessageSender sender = new NettyServerMessageSender(transportMessageEncoder, this.parameter);
onReceived(sender, this.parameter1);
}
},transportMessageDecoder)

后面通过使用EventExecutorGroup把IO线程与业务线程进行分离,把耗时业务处理添加到EventExecutorGroup进行处理,首先EventExecutorGroup代码如下

public static final EventExecutorGroup execThreadPool = new DefaultEventExecutorGroup( Runtime.getRuntime().availableProcessors()\*2,  
        (ThreadFactory) r -> {  
            Thread thread = new Thread(r);  
            thread.setName("custom-tcp-exec-"+r.hashCode());  
            return thread;  
        },  
        100000,  
        RejectedExecutionHandlers.reject()  
);

而addLast的ServerHandler添加了EventExecutorGroup, 最新的NettyServerMessageListener代码如下:

public class NettyServerMessageListener implements IMessageListener {
private Thread thread;
private static final Logger logger = LoggerFactory.getLogger(NettyServerMessageListener.class);
private ChannelFuture channel;
private final ITransportMessageDecoder transportMessageDecoder;
private final ITransportMessageEncoder transportMessageEncoder;
ReceivedDelegate Received = new ReceivedDelegate();
@Inject
public NettyServerMessageListener( ITransportMessageCodecFactory codecFactory)
{
this.transportMessageEncoder = codecFactory.GetEncoder();
this.transportMessageDecoder = codecFactory.GetDecoder();
}

public void StartAsync(final String serverAddress) {  
    thread = new Thread(new Runnable() {  
        public void run() {  
            String\[\] array = serverAddress.split(":");  
          logger.debug("准备启动服务主机,监听地址:" + array\[0\] + "" + array\[1\] + "。");  
            EventLoopGroup bossGroup = new NioEventLoopGroup(1);  
            EventLoopGroup workerGroup = new NioEventLoopGroup();  
            ServerBootstrap bootstrap = new ServerBootstrap();  
            bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)  
                    .childHandler(new ChannelInitializer<NioSocketChannel>() {  
                        @Override  
                        protected void initChannel(NioSocketChannel socketChannel) throws Exception {  
                            socketChannel.pipeline()  
                                    .addLast(new LengthFieldPrepender(4))  
                                    .addLast(new LengthFieldBasedFrameDecoder(Integer.MAX\_VALUE, 0, 4, 0, 4))  
                                    .addLast(ThreadPoolUtil.execThreadPool, "handler",new ServerHandler(new ReadAction<ChannelHandlerContext, TransportMessage>() {  
                                        @Override  
                                        public void run() {  
                                            IMessageSender sender = new NettyServerMessageSender(transportMessageEncoder, this.parameter);  
                                             onReceived(sender, this.parameter1);  
                                        }  
                                    },transportMessageDecoder)  
                                    );  
                        }  
                    })  
                    .option(ChannelOption.SO\_BACKLOG, 128)  
                    .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);  
            try {

                String host = array\[0\];  
                int port = Integer.parseInt(array\[1\]);  
                channel = bootstrap.bind(host, port).sync();  
                logger.debug("服务主机启动成功,监听地址:" + serverAddress + "。");  
            } catch (Exception e) {  
                if (e instanceof InterruptedException) {  
                    logger.info("Rpc server remoting server stop");  
                } else {  
                    logger.error("Rpc server remoting server error", e);  
                }

            }  
        }  
    });  
    thread.start();  
}

@Override  
public ReceivedDelegate getReceived() {  
    return Received;  
}

public void onReceived(IMessageSender sender, TransportMessage message) {  
    if (Received == null)  
        return;  
    Received.notifyX(sender,message);  
}

private class ReadAction<T,T1> implements  Runnable  
{  
    public  T parameter;  
    public T1 parameter1;  
    public void setParameter( T tParameter,T1 tParameter1) {  
        parameter = tParameter;  
        parameter1 = tParameter1;  
    }

    @Override  
    public void run() {

    }  
}

private class ServerHandler extends ChannelInboundHandlerAdapter {

    private final ReadAction<ChannelHandlerContext, TransportMessage> serverRunnable;  
    private final ITransportMessageDecoder transportMessageDecoder;

    public ServerHandler(ReadAction<ChannelHandlerContext, TransportMessage> runnable,  
                         ITransportMessageDecoder transportMessageDecoder) {

        this.serverRunnable = runnable;  
        this.transportMessageDecoder = transportMessageDecoder;  
    }

    @Override  
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {  
        logger.warn("与服务器:" + ctx.channel().remoteAddress() + "通信时发送了错误。");  
        ctx.close();  
    }

    @Override  
    public void channelReadComplete(ChannelHandlerContext context) {  
        context.flush();  
    }

    @Override  
    public void channelRead(ChannelHandlerContext channelHandlerContext, Object message) throws Exception {  
        ByteBuf buffer = (ByteBuf) message;  
        try {  
            byte\[\] data = new byte\[buffer.readableBytes()\];  
            buffer.readBytes(data);  
            TransportMessage  transportMessage = transportMessageDecoder.Decode(data);  
            serverRunnable.setParameter(channelHandlerContext, transportMessage);  
            serverRunnable.run();  
        }  
        finally {  
            ReferenceCountUtil.release(message);  
        }  
    }  
}  

}

通过以上修改,再通过jmeter压测已经不会出现timeout 问题,就连stage 网关-》.NET微服务-》JAVA微服务都没有Time out问题产生,jmeter的user thread拉长到2000也没有出现问题。

通过以上思路把.NET版本的surging 社区版本也进行了修改,已经提交到github,首先把ServiceHost中的serverMessageListener.Received 中的Task.Run移除,ServerHandler中ChannelRead进行移除,然后addLast的ServerHandler添加了EventExecutorGroup.通过以上修改再通过压测发现可以支持20万+ ,也未发现内存泄漏问题,执行client 1万次 ,服务端cpu 在6%左右,响应速度在1.1秒左右,可以开启多个surging 的client 进行压测,cpu 会叠加上升,响应速度没有影响,以下是执行1万次压测

surging 社区版本开源地址

总结

通过5年研发,surging 从原来的最初的基于netty 的RPC发展到现在可以支持多协议,多语言的异构微服务引擎,不仅是技术的提高,也带来名利的收益,只要不断坚持,终究能看到成果,我也会一直更新,为企业和社区用户带来自己的绵薄之力,让企业能更好的掌握微服务解决方案,已解决现在行业各种不同的业务需求。