netty websocket
阅读原文时间:2023年07月09日阅读:1

1 全局保存websocket的通道  NettyConfig.java

public class NettyConfig {
public static ChannelGroup group = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
}

2  WebsocketHandler.java  接收处理响应  客户端发来的消息

/**
* 接收处理响应客户端处理 *
*/
public class WebsocketHandler extends SimpleChannelInboundHandler{
private WebSocketServerHandshaker handshaker;
private static final String WEB_SOCKET_URL = "ws://192.168.3.167:8888/websocket";
//客户端与服务端创建连接的时候调用
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
NettyConfig.group.add(ctx.channel());
System.out.println("客户端与服务端连接开启…");
}

//客户端与服务端断开连接的时候调用  
@Override  
public void channelInactive(ChannelHandlerContext ctx) throws Exception {  
    NettyConfig.group.remove(ctx.channel());  
    System.out.println("客户端与服务端连接关闭...");  
}

//服务端接收客户端发送过来的数据结束之后调用  
@Override  
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {  
        ctx.flush();  
}

//工程出现异常的时候调用  
@Override  
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {  
    cause.printStackTrace();  
    ctx.close();  
}

//服务端处理客户端websocket请求的核心方法  
@Override  
protected void messageReceived(ChannelHandlerContext context, Object msg) throws Exception {  
    //处理客户端向服务端发起http握手请求的业务  
    if (msg instanceof FullHttpRequest) {  
        handHttpRequest(context,  (FullHttpRequest)msg);  
    }else if (msg instanceof WebSocketFrame) { //处理websocket连接业务  
        handWebsocketFrame(context, (WebSocketFrame)msg);  
    }  
}

/\*\*  
 \* 处理客户端与服务端之前的websocket业务  
 \* @param ctx  
 \* @param frame  
 \*/  
private void handWebsocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame){  
    //判断是否是关闭websocket的指令  
    if (frame instanceof CloseWebSocketFrame) {  
        handshaker.close(ctx.channel(), (CloseWebSocketFrame)frame.retain());  
    }  
    //判断是否是ping消息  
    if (frame instanceof PingWebSocketFrame) {  
        ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));  
        return;  
    }

    //判断是否是二进制消息,如果是二进制消息,抛出异常  
    if( ! (frame instanceof TextWebSocketFrame) ){  
        System.out.println("目前我们不支持二进制消息");  
        throw new RuntimeException("【"+this.getClass().getName()+"】不支持消息");  
    }  
    //返回应答消息  
    //获取客户端向服务端发送的消息  
    String request = ((TextWebSocketFrame) frame).text();  
    System.out.println("服务端收到客户端的消息====>>>" + request);  
    TextWebSocketFrame tws = new TextWebSocketFrame(new Date().toString()  
                                                                                    + request);  
    //群发,服务端向每个连接上来的客户端群发消息  
    //NettyConfig.group.writeAndFlush(tws);

    //单发  发给莫个人  
    NettyConfig.group.find(ctx.channel().id()).writeAndFlush(tws);

}  
/\*\*  
 \* 处理客户端向服务端发起http握手请求的业务  
 \* @param ctx  
 \* @param req  
 \*/  
private void handHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req){  
    if (!req.getDecoderResult().isSuccess()  
            || ! ("websocket".equals(req.headers().get("Upgrade")))) {  
        sendHttpResponse(ctx, req,  
                new DefaultFullHttpResponse(HttpVersion.HTTP\_1\_1, HttpResponseStatus.BAD\_REQUEST));  
        return;  
    }  
    WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(  
            WEB\_SOCKET\_URL, null, false);  
    handshaker = wsFactory.newHandshaker(req);  
    if (handshaker == null) {  
        WebSocketServerHandshakerFactory.sendUnsupportedWebSocketVersionResponse(ctx.channel());  
    }else{  
        handshaker.handshake(ctx.channel(), req);  
    }  
}

/\*\*  
 \* 服务端向客户端响应消息  
 \* @param ctx  
 \* @param req  
 \* @param res  
 \*/  
private void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req,  
        DefaultFullHttpResponse res){  
    if (res.getStatus().code() != 200) {  
        ByteBuf buf = Unpooled.copiedBuffer(res.getStatus().toString(), CharsetUtil.UTF\_8);  
        res.content().writeBytes(buf);  
        buf.release();  
    }  
    //服务端向客户端发送数据  
    ChannelFuture f = ctx.channel().writeAndFlush(res);  
    if (res.getStatus().code() != 200) {  
        f.addListener(ChannelFutureListener.CLOSE);  
    }  
}

}

3  初始化连接时候的各个组件

**
* 初始化连接时候的各个组件 *
*/
public class MyWebSocketChannelHandler extends ChannelInitializer {

@Override  
protected void initChannel(SocketChannel e) throws Exception {  
    e.pipeline().addLast("http-codec", new HttpServerCodec());  
    e.pipeline().addLast("aggregator", new HttpObjectAggregator(65536));  
    e.pipeline().addLast("http-chunked", new ChunkedWriteHandler());  
    e.pipeline().addLast("handler", new WebsocketHandler());  
}

}

4 启动服务

/**
* 程序的入口,负责启动应用
*
*/
public class Main {
public static void main(String[] args) {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workGroup);
b.channel(NioServerSocketChannel.class);
b.childHandler(new MyWebSocketChannelHandler());
System.err.println("服务端开启等待客户端连接….");
Channel ch = b.bind(8888).sync().channel();
ch.closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
}finally{
//优雅的退出程序
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
}

5 客户端连接服务




WebSocket客户端