Netty学习example示例

在这里插入图片描述

文章目录

  • simple
    • Server端
      • NettyServer
      • NettyServerHandler
    • Client端
      • NettyClient
      • NettyClientHandler
  • tcp(粘包和拆包)
    • Server端
      • NettyTcpServer
      • NettyTcpServerHandler
    • Client端
      • NettyTcpClient
      • NettyTcpClientHandler
  • protocol
    • codec
      • CustomMessageDecoder
      • CustomMessageEncoder
    • server端
      • ProtocolServer
      • ProtocolServerHandler
    • client端
      • ProtocolClient
      • ProtocolClientHandler
  • http
    • Server端
      • HttpServer
      • HttpServerHandler
    • Client端
      • HttpClient
      • HttpClientHandler
  • ws
    • Server端
      • WsServer
      • WsServerHandler
    • Client端
      • WsClient
      • WebSocketClientHandler
  • protobuf
    • Server端
      • NettyServer
      • NettyServerHandler
      • Student.proto
    • Client端
      • NettyClient
      • NettyClientHandler

simple

Server端

NettyServer

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;import java.net.InetSocketAddress;@Slf4j
public class NettyServer {public static void main(String[] args) {NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);NioEventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap serverBootstrap = new ServerBootstrap().group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).handler(new LoggingHandler()).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new StringDecoder());pipeline.addLast(new StringEncoder());pipeline.addLast(new NettyServerHandler());}});ChannelFuture bindFuture = serverBootstrap.bind(new InetSocketAddress(8888));Channel channel = bindFuture.sync().channel();log.info("server start");channel.closeFuture().sync();log.info("server stop");} catch (Exception e) {log.info("server error", e);} finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}
}

NettyServerHandler

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.extern.slf4j.Slf4j;@Slf4j
public class NettyServerHandler extends ChannelInboundHandlerAdapter {public void channelRead(io.netty.channel.ChannelHandlerContext ctx, Object msg) throws Exception {log.info("服务端接收到客户端数据:{}", msg);ctx.writeAndFlush("服务端收到客户端的数据: " + msg);}@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {log.info("【NettyServerHandler->userEventTriggered】: {}", evt);}public void exceptionCaught(io.netty.channel.ChannelHandlerContext ctx, Throwable cause) throws Exception {log.error("exceptionCaught异常:", cause);ctx.close();}public void handlerAdded(io.netty.channel.ChannelHandlerContext ctx) throws Exception {log.info("handlerAdded:{}", ctx.channel().remoteAddress());}public void handlerRemoved(io.netty.channel.ChannelHandlerContext ctx) throws Exception {log.info("handlerRemoved:{}", ctx.channel().remoteAddress());}public void channelRegistered(io.netty.channel.ChannelHandlerContext ctx) throws Exception {log.info("channelRegistered:{}", ctx.channel().remoteAddress());}public void channelUnregistered(io.netty.channel.ChannelHandlerContext ctx) throws Exception {log.info("channelUnregistered:{}", ctx.channel().remoteAddress());}public void channelActive(io.netty.channel.ChannelHandlerContext ctx) throws Exception {log.info("客户端连接:{}", ctx.channel().remoteAddress());}public void channelInactive(io.netty.channel.ChannelHandlerContext ctx) throws Exception {log.info("客户端断开连接:{}", ctx.channel().remoteAddress());}}

Client端

NettyClient

@Slf4j
public class NettyClient {public static void main(String[] args) {NioEventLoopGroup group = new NioEventLoopGroup(1);try {Bootstrap bootstrap = new Bootstrap().group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new StringDecoder());pipeline.addLast(new StringEncoder());pipeline.addLast(new NettyClientHandler());}});ChannelFuture connectFuture = bootstrap.connect("127.0.0.1", 8888);Channel channel = connectFuture.sync().channel();System.out.println("客户端连接成功");Scanner sc = new Scanner(System.in);while (true) {System.out.println("请输入内容: ");String line = sc.nextLine();if (line == null || line.isEmpty()) {continue;} else if ("exit".equals(line)) {channel.close();break;}channel.writeAndFlush(line);}channel.closeFuture().sync();System.out.println("客户端关闭");} catch (Exception e) {log.error("客户端发生异常: ", e);}}}

NettyClientHandler

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.extern.slf4j.Slf4j;@Slf4j
public class NettyClientHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {log.info( "【NettyClientHandler->channelRead】: {}", msg);}@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {log.info( "【NettyClientHandler->userEventTriggered】: {}", evt);}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {log.info( "异常: {}", cause.getMessage());ctx.close();}@Overridepublic void handlerAdded(ChannelHandlerContext ctx) throws Exception {log.info( "【NettyClientHandler->handlerAdded】: {}", ctx);}@Overridepublic void handlerRemoved(ChannelHandlerContext ctx) throws Exception {log.info( "【NettyClientHandler->handlerRemoved】: {}", ctx);}@Overridepublic void channelRegistered(ChannelHandlerContext ctx) throws Exception {log.info( "【NettyClientHandler->channelRegistered】: {}", ctx);}@Overridepublic void channelUnregistered(ChannelHandlerContext ctx) throws Exception {log.info( "【NettyClientHandler->channelUnregistered】: {}", ctx);}@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {log.info( "【NettyClientHandler->channelActive】: {}", ctx);}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {log.info( "【NettyClientHandler->channelInactive】: {}", ctx);}
}

tcp(粘包和拆包)

Server端

NettyTcpServer

@Slf4j
public class NettyTcpServer {public static void main(String[] args) {NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);NioEventLoopGroup workerGroup = new NioEventLoopGroup(1);try {ServerBootstrap serverBootstrap = new ServerBootstrap().group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new NettyTcpServerHandler());}});ChannelFuture bindFuture = serverBootstrap.bind("127.0.0.1", 9090);Channel channel = bindFuture.sync().channel();log.info("server start");channel.closeFuture().sync();} catch (Exception e) {log.info("server error", e);} finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}}

NettyTcpServerHandler

@Slf4j
public class NettyTcpServerHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf byteBuf = (ByteBuf) msg;byte[] bytes = new byte[byteBuf.readableBytes()];byteBuf.readBytes(bytes);String content = new String(bytes, StandardCharsets.UTF_8);log.info("服务端接收到的数据字节长度为:{}, 内容为: {}", bytes.length, content);ByteBuf buf = Unpooled.copiedBuffer(UUID.randomUUID().toString().getBytes(StandardCharsets.UTF_8));ctx.writeAndFlush(buf);}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {log.info("异常: {}", cause.getMessage());ctx.close();}
}

Client端

NettyTcpClient

@Slf4j
public class NettyTcpClient {public static void main(String[] args) {NioEventLoopGroup group = new NioEventLoopGroup(1);try {Bootstrap bootstrap = new Bootstrap().group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new NettyTcpClientHandler());}});ChannelFuture connectFuture = bootstrap.connect(new InetSocketAddress("127.0.0.1", 9090));Channel channel = connectFuture.sync().channel();channel.closeFuture().sync();} catch (Exception e) {log.error("客户端发生异常: ", e);} finally {group.shutdownGracefully();}}}

NettyTcpClientHandler

@Slf4j
public class NettyTcpClientHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf byteBuf = (ByteBuf) msg;log.info("客户端接收到数据:{}", byteBuf.toString(StandardCharsets.UTF_8));}@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {/*粘包:1. 这里连续发送10次byteBuf,发现服务端有可能1次就全部接收了,也有可能3次接受了,也有可能4次接收了,这是不确定的,这也就意味着基于底层NIO的tcp的数据传输 是基于流式传输的,会出现粘包的问题。2. 因此服务端必须 自行处理粘包问题,区分消息边界3. 这里测试的时候,可以多启动几个客户端来观察4. 这里示例的粘包示例与上面simple的区别在于:这里是在短时间内连续发送*//*for (int i = 0; i < 10; i++) {ByteBuf byteBuf = Unpooled.copiedBuffer(("hello, server " + i).getBytes(StandardCharsets.UTF_8));ctx.writeAndFlush(byteBuf);}*//*拆包:1. 这里1次发送了1个10000字节长的数据,而服务端分多次收到,有可能是2次,有可能是1次, 这是不确定的,2. 假设真实数据包就有这么长,那么服务端可能需要分多次才能接收到完整的数据包,3. 同时,我们发现总的数据长度服务端都接收到了,这说明底层NIO的tcp的数据传输 是可靠的4. 1条比较长的消息,服务端分多次才能收到,所以服务端需要解决拆包的问题,将多次接收到的消息转为1条完整的消息5. 这里示例的拆包示例与上面simple的区别在于:这里1次发送的消息数据很长*/StringBuilder sb = new StringBuilder();for (int i = 0; i < 1000; i++) {sb.append("Netty拆包示例|");}ctx.writeAndFlush(Unpooled.copiedBuffer(sb.toString().getBytes(StandardCharsets.UTF_8)));log.info("客户端发送数据长度:{}", sb.toString().length());/* 拆包 与 粘包 的核心问题就是 tcp是流式传输的,tcp可以保证数据可靠传输,但需要对方在接收时需要能区分出消息边界,从而获取1条完整的消息 */}}

protocol

codec

使用自定义协议,编解码器,识别消息边界,处理粘包和拆包问题

CustomMessageDecoder

public class CustomMessageDecoder extends ByteToMessageDecoder {@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {if (in.readableBytes() < 4) {return;}in.markReaderIndex();int len = in.readInt();if (in.readableBytes() < len) {in.resetReaderIndex();return;}byte[] bytes = new byte[len];in.readBytes(bytes);out.add(CustomMessage.builder().len(len).content(bytes).build());}
}

CustomMessageEncoder

public class CustomMessageEncoder extends MessageToByteEncoder<CustomMessage> {@Overrideprotected void encode(ChannelHandlerContext ctx, CustomMessage msg, ByteBuf out) {out.writeInt(msg.getLen());out.writeBytes(msg.getContent());}
}

server端

ProtocolServer

@Slf4j
public class ProtocolServer {public static void main(String[] args) {NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);NioEventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap serverBootstrap = new ServerBootstrap().group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new CustomMessageDecoder());pipeline.addLast(new CustomMessageEncoder());pipeline.addLast(new ProtocolServerHandler());}});ChannelFuture bindFuture = serverBootstrap.bind(new InetSocketAddress("127.0.0.1", 9090));Channel channel = bindFuture.sync().channel();log.info("server start");channel.closeFuture().sync();} catch (Exception e) {log.info("server error", e);} finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}log.info("server stop");}}

ProtocolServerHandler

@Slf4j
public class ProtocolServerHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {// 这里直接转, 如果不能转的话, 就说明前面的解码有问题CustomMessage customMessage = (CustomMessage) msg;log.info("服务端收到消息: {}, {}", customMessage.getLen(), new String(customMessage.getContent()));// 将消息回过去(需要加上对应的编码器)ctx.writeAndFlush(customMessage);}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {log.info("ProtocolServerHandler异常: {}", cause.getMessage());ctx.close();}
}

client端

ProtocolClient

@Slf4j
public class ProtocolClient {public static void main(String[] args) {NioEventLoopGroup group = new NioEventLoopGroup(1);try {Bootstrap bootstrap = new Bootstrap().group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new CustomMessageEncoder());pipeline.addLast(new CustomMessageDecoder());pipeline.addLast(new ProtocolClientHandler());}});ChannelFuture connectFuture = bootstrap.connect("localhost", 9090);Channel channel = connectFuture.sync().channel();channel.closeFuture().sync();} catch (Exception e) {log.info("client error", e);} finally {group.shutdownGracefully();}}}

ProtocolClientHandler

@Slf4j
public class ProtocolClientHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {// 这里直接转, 如果不能转的话, 就说明前面的解码有问题CustomMessage customMessage = (CustomMessage) msg;log.info("客户端收到消息: {}, {}", customMessage.getLen(), new String(customMessage.getContent()));}@Overridepublic void channelActive(ChannelHandlerContext ctx) {for (int i = 1; i <= 20; i++) {byte[] bytes = ("hello, server " + i).getBytes(StandardCharsets.UTF_8);CustomMessage message = CustomMessage.builder().content(bytes).len(bytes.length).build();ctx.writeAndFlush(message);}}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {log.error("【ProtocolClientHandler->exceptionCaught】: {}", cause.getMessage());}
}

http

Server端

HttpServer

@Slf4j
public class HttpServer {public static void main(String[] args) {NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);NioEventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap serverBootstrap = new ServerBootstrap().group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).handler(new LoggingHandler("【服务端主】")).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast("loggingHandler", new LoggingHandler("【服务端从】"));pipeline.addLast("httpServerCodec", new HttpServerCodec());pipeline.addLast("aggregator", new HttpObjectAggregator(10 * 1024 * 1024));pipeline.addLast("httpServerHandler", new HttpServerHandler());}});ChannelFuture channelFuture = serverBootstrap.bind(new InetSocketAddress(8080));channelFuture.sync();log.info("http服务器启动成功, 您可以访问: http://localhost:8080/test");channelFuture.channel().closeFuture().sync();} catch (Exception e) {log.info("服务端发生异常: ", e);} finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}}

HttpServerHandler

@Slf4j
public class HttpServerHandler extends SimpleChannelInboundHandler<HttpObject> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) {log.info("【HttpServerHandler->处理】:{}", msg);if (msg instanceof FullHttpRequest) {FullHttpRequest fullHttpRequest = (FullHttpRequest) msg;String uri = fullHttpRequest.uri();log.info("【uri】:{}", uri);HttpMethod method = fullHttpRequest.method();log.info("【method】:{}", method);// 响应回去byte[] bytes = ("服务器收到时间" + LocalDateTime.now()).getBytes(StandardCharsets.UTF_8);DefaultFullHttpResponse fullHttpResponse = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,HttpResponseStatus.OK,Unpooled.copiedBuffer(bytes));fullHttpResponse.headers().set(HttpHeaderNames.CONTENT_LENGTH, bytes.length);fullHttpResponse.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain;charset=utf-8");ChannelPromise promise = ctx.newPromise();promise.addListener(new GenericFutureListener<Future<? super Void>>() {@Overridepublic void operationComplete(Future<? super Void> future) throws Exception {log.info("操作完成");log.info("isDone: {}", future.isDone());log.info("isSuccess: {}", future.isSuccess());log.info("isCancelled: {}", future.isCancelled());log.info("hasException: {}", future.cause() != null, future.cause());}});ctx.writeAndFlush(fullHttpResponse, promise);log.info("刚刚写完");}}@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {log.error("【HttpServerHandler->userEventTriggered】:{}", evt);}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {log.error("【HttpServerHandler->exceptionCaught】", cause);}public void channelRegistered(ChannelHandlerContext ctx) {log.info("【HttpServerHandler->channelRegistered】");}public void channelUnregistered(ChannelHandlerContext ctx) {log.info("【HttpServerHandler->channelUnregistered】");}public void handlerAdded(ChannelHandlerContext ctx) {log.info("【HttpServerHandler->handlerAdded】");}public void handlerRemoved(ChannelHandlerContext ctx) {log.info("【HttpServerHandler->handlerRemoved】");}public void channelActive(ChannelHandlerContext ctx) {log.info("【HttpServerHandler->channelActive】");}public void channelInactive(ChannelHandlerContext ctx) {log.info("【HttpServerHandler->channelInactive】");}}

Client端

HttpClient

@Slf4j
public class HttpClient {public static void main(String[] args) {NioEventLoopGroup group = new NioEventLoopGroup(1);try {Bootstrap bootstrap = new Bootstrap().group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast("loggingHandler", new LoggingHandler(LogLevel.DEBUG));pipeline.addLast("httpClientCodec", new HttpClientCodec());pipeline.addLast("", new HttpObjectAggregator(10 * 1024));pipeline.addLast("httpClientHandler", new HttpClientHandler());}});ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8080);channelFuture.sync();Channel channel = channelFuture.channel();sendGetRequest(channel);// 等待通道关闭channelFuture.channel().closeFuture().sync();} catch (Exception e) {log.info("客户端发生异常: ", e);} finally {// 遇到问题, 调用此方法后客户端没有正常关闭, 将netty版本4.1.20.FINAL切换到4.1.76.FINAL即可group.shutdownGracefully();log.info("关闭group-finally");}log.info("客户端执行完毕");}private static void sendGetRequest(Channel channel) throws URISyntaxException {String url = "http://localhost:8080/test"; // 测试URLURI uri = new URI(url);String host = uri.getHost();String path = uri.getRawPath() + (uri.getRawQuery() == null ? "" : "?" + uri.getRawQuery());// 构建HTTP请求FullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1,HttpMethod.GET,path,Unpooled.EMPTY_BUFFER);request.headers().set(HttpHeaderNames.HOST, host).set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE).set(HttpHeaderNames.ACCEPT_ENCODING, HttpHeaderValues.GZIP);// 发送请求ChannelFuture channelFuture = channel.writeAndFlush(request);log.info("Request sent: " + request);}}

HttpClientHandler

@Slf4j
public class HttpClientHandler extends SimpleChannelInboundHandler<FullHttpResponse> {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, FullHttpResponse response) {// 处理响应log.info("处理响应, 响应头: {}", response.headers().toString());log.info("处理响应, 响应体: {}", response.content().toString(CharsetUtil.UTF_8));// 关闭连接ctx.channel().close();log.info("关闭连接");}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {log.info( "异常: {}", cause.getMessage());ctx.close();}
}

ws

Server端

WsServer

@Slf4j
public class WsServer {public static void main(String[] args) {NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);NioEventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap serverBootstrap = new ServerBootstrap().group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).handler(new LoggingHandler()).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast("httpServerCodec", new HttpServerCodec());pipeline.addLast("aggregator", new HttpObjectAggregator(10 * 1024 * 1024));WebSocketServerProtocolConfig config = WebSocketServerProtocolConfig.newBuilder().websocketPath("/ws").checkStartsWith(true).build();pipeline.addLast("wsProtocolHandler", new WebSocketServerProtocolHandler(config));pipeline.addLast("wsServerHandler", new WsServerHandler());}});ChannelFuture channelFuture = serverBootstrap.bind("127.0.0.1", 9090);channelFuture.sync();log.info("ws服务启动成功");channelFuture.channel().closeFuture().sync();} catch (Exception e) {log.error("服务端发生异常: ", e);} finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}log.info("ws服务关闭");}}

WsServerHandler

@Slf4j
public class WsServerHandler extends SimpleChannelInboundHandler<WebSocketFrame> {private static Map<String, Channel> CHANNELS = new ConcurrentHashMap<>();private static AttributeKey<String> ATTRIBUTE_KEY_TOKEN = AttributeKey.valueOf("token");private static AttributeKey<Boolean> ATTRIBUTE_KEY_REPEAT = AttributeKey.valueOf("repeat");@Overrideprotected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame webSocketFrame) throws Exception {log.info("【WsServerHandler->处理】:{}", webSocketFrame);if (webSocketFrame instanceof TextWebSocketFrame) {TextWebSocketFrame textWebSocketFrame = (TextWebSocketFrame) webSocketFrame;log.info("【textWebSocketFrame.text()】:{}", textWebSocketFrame.text());sendAll(ctx.channel(), textWebSocketFrame.text());}}private void sendAll(Channel channel, String text) {CHANNELS.forEach((token, ch) -> {if (channel != ch) {ch.writeAndFlush(new TextWebSocketFrame(text));}});}@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {log.info("【WsServerHandler->userEventTriggered】: {}", evt);if (evt instanceof WebSocketServerProtocolHandler.HandshakeComplete) {WebSocketServerProtocolHandler.HandshakeComplete handshakeComplete = (WebSocketServerProtocolHandler.HandshakeComplete) evt;String requestUri = handshakeComplete.requestUri();String subprotocol = handshakeComplete.selectedSubprotocol();log.info("【requestUri】:{}", requestUri);log.info("【subprotocol】:{}", subprotocol);handleAuth(requestUri, ctx);}}private void handleAuth(String requestUri, ChannelHandlerContext ctx) {try {Map<String, String> queryParams = getQueryParams(requestUri);String token = queryParams.get("token");log.info("【token】:{}", token);if (token == null) {ctx.close();log.info("token为空, 关闭channel");} else {ctx.channel().attr(ATTRIBUTE_KEY_TOKEN).set(token);Channel oldChannel = CHANNELS.put(token, ctx.channel());if (oldChannel != null) {oldChannel.attr(ATTRIBUTE_KEY_REPEAT).set(true);oldChannel.close();} else {sendAll(ctx.channel(), "欢迎" + token + "进入聊天室");}}} catch (Exception e) {ctx.close();}}private static Map<String, String> getQueryParams(String requestUri) throws URISyntaxException {URI uri = new URI(requestUri);String query = uri.getQuery();Map<String, String> queryParams = new HashMap<>();if (query != null) {String[] params = query.split("&");for (String param : params) {String[] keyValue = param.split("=");String key = keyValue[0];String value = keyValue.length > 1 ? keyValue[1] : "";queryParams.put(key, value);}}return queryParams;}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {log.error("【WsServerHandler->exceptionCaught】", cause);}public void handlerAdded(ChannelHandlerContext ctx) {log.info("【WsServerHandler->handlerAdded】");}public void handlerRemoved(ChannelHandlerContext ctx) {log.info("【WsServerHandler->handlerRemoved】");}public void channelRegistered(ChannelHandlerContext ctx) {log.info("【WsServerHandler->channelRegistered】");}public void channelUnregistered(ChannelHandlerContext ctx) {log.info("【WsServerHandler->channelUnregistered】");}public void channelActive(ChannelHandlerContext ctx) {log.info("【WsServerHandler->channelActive】");}public void channelInactive(ChannelHandlerContext ctx) {log.info("【WsServerHandler->channelInactive】");Channel channel = ctx.channel();Boolean isRepeat = channel.attr(ATTRIBUTE_KEY_REPEAT).get() != null&& channel.attr(ATTRIBUTE_KEY_REPEAT).get();if (!isRepeat) {CHANNELS.computeIfPresent(ctx.attr(ATTRIBUTE_KEY_TOKEN).get(), (key, ch) -> {CHANNELS.remove(channel.attr(ATTRIBUTE_KEY_TOKEN));sendAll(channel, channel.attr(ATTRIBUTE_KEY_TOKEN).get() + "离开聊天室");return null;});}}}

Client端

WsClient

@Slf4j
public class WsClient {public static void main(String[] args) {NioEventLoopGroup group = new NioEventLoopGroup(1);try {CountDownLatch connectLatch = new CountDownLatch(1);Bootstrap bootstrap = new Bootstrap().group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new HttpClientCodec());pipeline.addLast(new HttpObjectAggregator(10 * 1024));WebSocketClientProtocolConfig config = WebSocketClientProtocolConfig.newBuilder().handleCloseFrames(false).build();WebSocketClientHandshaker webSocketClientHandshaker = WebSocketClientHandshakerFactory.newHandshaker(new URI("ws://localhost:9090/ws/1?token=abc"),WebSocketVersion.V13,null,true,new DefaultHttpHeaders());pipeline.addLast(new WebSocketClientProtocolHandler(webSocketClientHandshaker, config));pipeline.addLast(new WebSocketClientHandler(connectLatch));}});ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 9090);Channel channel = channelFuture.channel();channelFuture.addListener((ChannelFutureListener) future -> {if (!future.isSuccess()) {System.err.println("Connection failed: " + future.cause());connectLatch.countDown(); // 确保不会死等}});// 等待连接完成(带超时)if (!connectLatch.await(10, TimeUnit.SECONDS)) {throw new RuntimeException("Connection timed out");}Scanner sc = new Scanner(System.in);while (true) {System.out.print("请输入:");String line = sc.nextLine();if (StringUtil.isNullOrEmpty(line)) {continue;}if ("exit".equals(line)) {channel.close();break;} else {// 发送消息WebSocketFrame frame = new TextWebSocketFrame(line);channelFuture.channel().writeAndFlush(frame);}}channelFuture.channel().closeFuture().sync();} catch (Exception e) {log.info("客户端发生异常: ", e);} finally {group.shutdownGracefully();}}}

WebSocketClientHandler

@Slf4j
public class WebSocketClientHandler extends SimpleChannelInboundHandler<WebSocketFrame> {private CountDownLatch connectLatch;public WebSocketClientHandler(CountDownLatch connectLatch) {this.connectLatch = connectLatch;}@Overrideprotected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame frame) throws Exception {// 处理接收到的WebSocket帧if (frame instanceof TextWebSocketFrame) {String text = ((TextWebSocketFrame) frame).text();System.out.println("Received: " + text);} else if (frame instanceof PingWebSocketFrame) {// 响应Ping帧ctx.writeAndFlush(new PongWebSocketFrame(frame.content().retain()));System.out.println("Responded to ping");} else if (frame instanceof CloseWebSocketFrame) {System.out.println("Received close frame");ctx.close();} else if (frame instanceof BinaryWebSocketFrame) {System.out.println("Received binary data: " + frame.content().readableBytes() + " bytes");}}@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {// 处理握手完成事件if (evt == WebSocketClientProtocolHandler.ClientHandshakeStateEvent.HANDSHAKE_COMPLETE) {System.out.println("WebSocket handshake complete event");// 握手完成后可以发送初始消息connectLatch.countDown();}}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {System.err.println("WebSocket error: ");cause.printStackTrace();ctx.close();}}

protobuf

Server端

NettyServer

@Slf4j
public class NettyServer {public static void main(String[] args) {NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);NioEventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap serverBootstrap = new ServerBootstrap().group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).handler(new LoggingHandler()).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new ProtobufEncoder());pipeline.addLast(new ProtobufDecoder(StudentPOJO.Student.getDefaultInstance()));pipeline.addLast(new NettyServerHandler());}});ChannelFuture bindFuture = serverBootstrap.bind(new InetSocketAddress(8888));Channel channel = bindFuture.sync().channel();log.info("server start");channel.closeFuture().sync();log.info("server stop");} catch (Exception e) {log.info("server error", e);} finally {bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}
}

NettyServerHandler

@Slf4j
public class NettyServerHandler extends ChannelInboundHandlerAdapter {public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {log.info("服务端接收到客户端数据:{}", msg);if (msg instanceof StudentPOJO.Student) {StudentPOJO.Student student = (StudentPOJO.Student) msg;log.info( "客户端发送的数据:{}, {}, {}", student, student.getId(), student.getName());}}@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {log.info("【NettyServerHandler->userEventTriggered】: {}", evt);}public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {log.error("exceptionCaught异常:", cause);ctx.close();}public void handlerAdded(ChannelHandlerContext ctx) throws Exception {log.info("handlerAdded:{}", ctx.channel().remoteAddress());}public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {log.info("handlerRemoved:{}", ctx.channel().remoteAddress());}public void channelRegistered(ChannelHandlerContext ctx) throws Exception {log.info("channelRegistered:{}", ctx.channel().remoteAddress());}public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {log.info("channelUnregistered:{}", ctx.channel().remoteAddress());}public void channelActive(ChannelHandlerContext ctx) throws Exception {log.info("channelActive:{}", ctx.channel().remoteAddress());}public void channelInactive(ChannelHandlerContext ctx) throws Exception {log.info("channelInactive:{}", ctx.channel().remoteAddress());}}

Student.proto

syntax = "proto3"; //版本
option java_outer_classname = "StudentPOJO";//生成的外部类名,同时也是文件名
//protobuf 使用message 管理数据
message Student { //会在 StudentPOJO 外部类生成一个内部类 Student, 他是真正发送的POJO对象int32 id = 1; // Student 类中有 一个属性 名字为 id 类型为int32(protobuf类型) 1表示属性序号,不是值string name = 2;
}
// 执行命令 protoc.exe --java_out=生成路径 Student.proto路径

Client端

NettyClient

@Slf4j
public class NettyClient {public static void main(String[] args) {NioEventLoopGroup group = new NioEventLoopGroup(1);try {Bootstrap bootstrap = new Bootstrap().group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new ProtobufEncoder());pipeline.addLast(new ProtobufDecoder(StudentPOJO.Student.getDefaultInstance()));pipeline.addLast(new NettyClientHandler());}});ChannelFuture connectFuture = bootstrap.connect("127.0.0.1", 8888);Channel channel = connectFuture.sync().channel();log.info("客户端连接成功");channel.closeFuture().sync();log.info("客户端关闭");} catch (Exception e) {log.error("客户端发生异常: ", e);}}}

NettyClientHandler

@Slf4j
public class NettyClientHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {log.info( "【NettyClientHandler->channelRead】: {}", msg);}@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {log.info( "【NettyClientHandler->userEventTriggered】: {}", evt);}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {log.info( "异常: {}", cause.getMessage());ctx.close();}@Overridepublic void handlerAdded(ChannelHandlerContext ctx) throws Exception {log.info( "【NettyClientHandler->handlerAdded】: {}", ctx);}@Overridepublic void handlerRemoved(ChannelHandlerContext ctx) throws Exception {log.info( "【NettyClientHandler->handlerRemoved】: {}", ctx);}@Overridepublic void channelRegistered(ChannelHandlerContext ctx) throws Exception {log.info( "【NettyClientHandler->channelRegistered】: {}", ctx);}@Overridepublic void channelUnregistered(ChannelHandlerContext ctx) throws Exception {log.info( "【NettyClientHandler->channelUnregistered】: {}", ctx);}@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {log.info( "【NettyClientHandler->channelActive】: {}", ctx);StudentPOJO.Student student = StudentPOJO.Student.newBuilder().setId(1).setName("张三san").build();ctx.writeAndFlush(student);}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {log.info( "【NettyClientHandler->channelInactive】: {}", ctx);}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.tpcf.cn/bicheng/83379.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

ThreadLocal ,底层原理,强引用,弱引用,内存泄漏

目录 ThreadLocal的基本概念 底层实现原理 强引用与弱引用 内存泄漏问题 内存泄漏的解决方案 示例代码 ThreadLocal的基本概念 ThreadLocal是Java中的一个类&#xff0c;位于java.lang包下&#xff0c;它提供了线程局部变量的功能。每个使用该变量的线程都有自己独立的初…

TomSolver 库 | config详解及其测试

一、C 关键特性解析 1. enum class 强类型枚举 enum class LogLevel { OFF, FATAL, ERROR, WARN, INFO, DEBUG, TRACE, ALL }; enum class NonlinearMethod { NEWTON_RAPHSON, LM };核心特性&#xff1a; 类型安全&#xff1a;禁止隐式转换为整数作用域限定&#xff1a;必须…

【DB2】ERRORCODE=-4499, SQLSTATE=08001

客户在连接DB2压测时报错ERRORCODE-4499, SQLSTATE08001&#xff0c;连接失败&#xff0c;主要是因为通信失败 在本地进行复现&#xff0c;用DBeaver代替java程序&#xff0c;将DB2COMM从TCPIP置为空&#xff0c;重启后重新连接&#xff0c;报一样的错误 而将防火墙开启&…

MicroPython+L298N+ESP32控制电机转速

要使用MicroPython控制L298N电机驱动板来控制电机的转速&#xff0c;你可以通过PWM&#xff08;脉冲宽度调制&#xff09;信号来调节电机速度。L298N是一个双H桥驱动器&#xff0c;可以同时控制两个电机的正反转和速度。 硬件准备&#xff1a; 1. L298N 电机控制板 2. ESP32…

WPF 全局加载界面、多界面实现渐变过渡效果

WPF 全局加载界面与渐变过渡效果 完整实现方案 MainWindow.xaml <Window x:Class"LoadingScreenDemo.MainWindow"xmlns"http://schemas.microsoft.com/winfx/2006/xaml/presentation"xmlns:x"http://schemas.microsoft.com/winfx/2006/xaml&quo…

RabbitMQ深度解析:从基础实践到高阶架构设计

引言​​ 在分布式系统与微服务架构主导的现代软件开发中&#xff0c;服务间通信的可靠性、异步处理能力及流量管控成为核心挑战。​​RabbitMQ​​作为基于AMQP协议的企业级消息中间件&#xff0c;凭借其灵活的路由机制、高可用架构与丰富的扩展能力&#xff0c;成为异步通信…

华为OD机试真题——矩形相交的面积(2025A卷:100分)Java/python/JavaScript/C/C++/GO最佳实现

2025 A卷 100分 题型 本专栏内全部题目均提供Java、python、JavaScript、C、C++、GO六种语言的最佳实现方式; 并且每种语言均涵盖详细的问题分析、解题思路、代码实现、代码详解、3个测试用例以及综合分析; 本文收录于专栏:《2025华为OD真题目录+全流程解析+备考攻略+经验分…

基于随机函数链接神经网络(RVFL)的锂电池健康状态(SOH)预测

基于随机函数链接神经网络(RVFL)的锂电池健康状态(SOH)预测 一、RVFL网络的基本原理与结构 随机向量功能链接(Random Vector Functional Link, RVFL)网络是一种单隐藏层前馈神经网络的随机化版本,其核心特征在于输入层到隐藏层的权重随机生成且固定,输出层权重通过最…

阿里云国际站,如何通过代理商邀请的链接注册账号

阿里云国际站&#xff1a;如何通过代理商邀请链接注册&#xff0c;解锁“云端超能力”与专属福利&#xff1f; 渴望在全球化浪潮中抢占先机&#xff1f;想获得阿里云国际站的海量云资源、遍布全球的加速节点与前沿AI服务&#xff0c;同时又能享受专属折扣、VIP级增值服务支持或…

PMOS以及电源转换电路设计

PMOS的使用 5V_EN5V时&#xff0c;PMOS截止&#xff1b; 5V_EN0V时&#xff0c;PMOS导通&#xff1b; 电源转换电路 当Vout0V时&#xff0c;Vg0V, Vgs>Vth, PMOS导通&#xff0c;只有电池供电&#xff1b; 当Vout5V时&#xff0c;Vg4.9V, Vs4.8V?, Vgs<Vth, PMOS截止&am…

云时代:DMZ安全架构的演进与实践

随着云计算的普及,传统的DMZ安全边界正在经历根本性变革。本文探讨如何在云环境中重新设计和实现DMZ架构,以应对现代安全挑战。 1. 传统DMZ与云DMZ的对比 传统DMZ(隔离区)是网络安全的经典架构,但云环境带来了新的挑战: 特性传统DMZ云DMZ物理边界明确的物理网络分区虚拟网…

mqtt协议连接阿里云平台

首先现在的阿里云物联网平台已经不在新购了&#xff0c;如下图所示&#xff1a; 解决办法&#xff1a;在咸鱼上租用一个账号&#xff0c;先用起来。 搭建阿里云平台&#xff0c;参考博客&#xff1a; &#xff08;一&#xff09;MQTT连接阿里云物联网平台&#xff08;小白向&…

职业本科院校无人机专业人才培养解决方案

2023年的中央经济工作会议强调了以科技创新推动现代化产业体系构建的重要性&#xff0c;并提出发展生物制造、商业航天、低空经济等战略性新兴产业。低空经济&#xff0c;依托民用无人机等低空飞行器&#xff0c;在多场景低空飞行活动的牵引下&#xff0c;正逐步形成一个辐射广…

Go语言字符串类型详解

1. 定义字符串类型 package mainimport ("fmt");func main() {var str1 string "你好 GoLang 1"var str2 "你好 GoLang 2"str3 : "你好 GoLang 3"fmt.Printf("%v--%T\n", str1, str1)// 你好 GoLang 1--stringfmt.Printf…

设计模式——中介者设计模式(行为型)

摘要 文章详细介绍了中介者设计模式&#xff0c;这是一种行为型设计模式&#xff0c;通过中介者对象封装多个对象间的交互&#xff0c;降低系统耦合度。文中阐述了其核心角色、优缺点、适用场景&#xff0c;并通过类图、时序图、实现方式、实战示例等多方面进行讲解&#xff0…

也说字母L:柔软的长舌

英语单词 tongue&#xff0c;意为“舌头” tongue n.舌&#xff0c;舌头&#xff1b;语言 很显然&#xff0c;“语言”是引申义&#xff0c;因为语言是抽象的&#xff0c;但舌头是具象的&#xff0c;根据由简入繁的原则&#xff0c;tongue显然首先是象形起义&#xff0c;表达…

性能测试实例(http和ldap协议压测)

一、某授权服务器生成授权码效率验证&#xff08;http协议&#xff09; 测试背景 在存量数据23万条的情况下&#xff0c;生成一条授权数据&#xff0c;需要10秒左右&#xff0c;用户反应数据生成效率太差&#xff0c;需要优化。初步判断是由于在授权数据生成时&#xff0c;有查…

Spring Boot中的事件与JMS消息集成

Spring Boot事件机制 Spring框架的事件处理是其核心特性之一,通过ApplicationEvent类和ApplicationListener接口实现。在Spring Boot应用中,事件机制是实现模块间消息传递的重要方式,通常用于业务逻辑内部通信。 内置事件类型 Spring应用上下文在启动时会触发多种内置事件…

第12次12: 修改和删除收货地址

第1步&#xff1a;在users应用下views.py中新增实现修改收货地址的视图类 class UpdateDestroyAddressView(LoginRequiredJSONMixin, View):def put(self, request, address_id):"""修改收货地址"""json_dict json.loads(request.body.decode(…

python常用库-pandas、Hugging Face的datasets库(大模型之JSONL(JSON Lines))

文章目录 python常用库pandas、Hugging Face的datasets库&#xff08;大模型之JSONL&#xff08;JSON Lines&#xff09;&#xff09;背景什么是JSONL&#xff08;JSON Lines&#xff09;通过pandas读取和保存JSONL文件pandas读取和保存JSONL文件 Hugging Face的datasets库Hugg…