1 全局保存websocket的通道 NettyConfig.java
public class NettyConfig {
public static ChannelGroup group = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
}
2 WebsocketHandler.java 接收处理响应 客户端发来的消息
/**
* 接收处理响应客户端处理 *
*/
public class WebsocketHandler extends SimpleChannelInboundHandler
//客户端与服务端断开连接的时候调用
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
NettyConfig.group.remove(ctx.channel());
System.out.println("客户端与服务端连接关闭...");
}
//服务端接收客户端发送过来的数据结束之后调用
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
//工程出现异常的时候调用
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
//服务端处理客户端websocket请求的核心方法
@Override
protected void messageReceived(ChannelHandlerContext context, Object msg) throws Exception {
//处理客户端向服务端发起http握手请求的业务
if (msg instanceof FullHttpRequest) {
handHttpRequest(context, (FullHttpRequest)msg);
}else if (msg instanceof WebSocketFrame) { //处理websocket连接业务
handWebsocketFrame(context, (WebSocketFrame)msg);
}
}
/\*\*
\* 处理客户端与服务端之前的websocket业务
\* @param ctx
\* @param frame
\*/
private void handWebsocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame){
//判断是否是关闭websocket的指令
if (frame instanceof CloseWebSocketFrame) {
handshaker.close(ctx.channel(), (CloseWebSocketFrame)frame.retain());
}
//判断是否是ping消息
if (frame instanceof PingWebSocketFrame) {
ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));
return;
}
//判断是否是二进制消息,如果是二进制消息,抛出异常
if( ! (frame instanceof TextWebSocketFrame) ){
System.out.println("目前我们不支持二进制消息");
throw new RuntimeException("【"+this.getClass().getName()+"】不支持消息");
}
//返回应答消息
//获取客户端向服务端发送的消息
String request = ((TextWebSocketFrame) frame).text();
System.out.println("服务端收到客户端的消息====>>>" + request);
TextWebSocketFrame tws = new TextWebSocketFrame(new Date().toString()
+ request);
//群发,服务端向每个连接上来的客户端群发消息
//NettyConfig.group.writeAndFlush(tws);
//单发 发给莫个人
NettyConfig.group.find(ctx.channel().id()).writeAndFlush(tws);
}
/\*\*
\* 处理客户端向服务端发起http握手请求的业务
\* @param ctx
\* @param req
\*/
private void handHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req){
if (!req.getDecoderResult().isSuccess()
|| ! ("websocket".equals(req.headers().get("Upgrade")))) {
sendHttpResponse(ctx, req,
new DefaultFullHttpResponse(HttpVersion.HTTP\_1\_1, HttpResponseStatus.BAD\_REQUEST));
return;
}
WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(
WEB\_SOCKET\_URL, null, false);
handshaker = wsFactory.newHandshaker(req);
if (handshaker == null) {
WebSocketServerHandshakerFactory.sendUnsupportedWebSocketVersionResponse(ctx.channel());
}else{
handshaker.handshake(ctx.channel(), req);
}
}
/\*\*
\* 服务端向客户端响应消息
\* @param ctx
\* @param req
\* @param res
\*/
private void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req,
DefaultFullHttpResponse res){
if (res.getStatus().code() != 200) {
ByteBuf buf = Unpooled.copiedBuffer(res.getStatus().toString(), CharsetUtil.UTF\_8);
res.content().writeBytes(buf);
buf.release();
}
//服务端向客户端发送数据
ChannelFuture f = ctx.channel().writeAndFlush(res);
if (res.getStatus().code() != 200) {
f.addListener(ChannelFutureListener.CLOSE);
}
}
}
3 初始化连接时候的各个组件
**
* 初始化连接时候的各个组件 *
*/
public class MyWebSocketChannelHandler extends ChannelInitializer
@Override
protected void initChannel(SocketChannel e) throws Exception {
e.pipeline().addLast("http-codec", new HttpServerCodec());
e.pipeline().addLast("aggregator", new HttpObjectAggregator(65536));
e.pipeline().addLast("http-chunked", new ChunkedWriteHandler());
e.pipeline().addLast("handler", new WebsocketHandler());
}
}
4 启动服务
/**
* 程序的入口,负责启动应用
*
*/
public class Main {
public static void main(String[] args) {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workGroup);
b.channel(NioServerSocketChannel.class);
b.childHandler(new MyWebSocketChannelHandler());
System.err.println("服务端开启等待客户端连接….");
Channel ch = b.bind(8888).sync().channel();
ch.closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
}finally{
//优雅的退出程序
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
}
5 客户端连接服务