Netty是 一个异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。
官网:https://netty.io/wiki/user-guide-for-4.x.html
更多样例
https://github.com/netty/netty/tree/4.1/example/src/main/java/io/netty/example
- API使用简单,学习成本低。
- 功能强大,内置了多种解码编码器,支持多种协议。
- 性能高,对比其他主流的NIO框架,Netty的性能最优。
- 社区活跃,发现BUG会及时修复,迭代版本周期短,不断加入新的功能。
- Dubbo、Elasticsearch都采用了Netty,质量得到验证。
BossGroup 和 WorkerGroup:
bossGroup 和 workerGroup 是两个线程池, 它们默认线程数为 CPU 核心数乘以 2 bossGroup
用于接收客户端传过来的请求,接收到请求后将后续操作交由 workerGroup 处理Selector(选择器):
检测多个通道上是否有事件的发生
TaskQueue
(任务队列):上面的任务都是在当前的 NioEventLoop ( 反应器 Reactor 线程 ) 中的任务队列中排队执行 ,
在其它线程中也可以调度本线程的 Channel 通道与该线程对应的客户端进行数据读写
Channel:Channel 是框架自己定义的一个通道接口, Netty 实现的客户端 NIO 套接字通道是 NioSocketChannel
提供的服务器端 NIO 套接字通道是 NioServerSocketChannel
当服务端和客户端建立一个新的连接时, 一个新的 Channel 将被创建,同时它会被自动地分配到它专属的 ChannelPipeline :
ChannelPipeline
是一个拦截流经 Channel 的入站和出站事件的 ChannelHandler 实例链,并定义了用于在该链上传播入站和出站事件流的 API
ChannelHandler:分为 ChannelInBoundHandler 和 ChannelOutboundHandler 两种 如果一个入站 IO
事件被触发,这个事件会从第一个开始依次通过 ChannelPipeline中的 ChannelInBoundHandler,先添加的先执行。
若是一个出站 I/O 事件,则会从最后一个开始依次通过 ChannelPipeline 中的
ChannelOutboundHandler,后添加的先执行,然后通过调用在 ChannelHandlerContext
中定义的事件传播方法传递给最近的 ChannelHandler。 在 ChannelPipeline 传播事件时,它会测试
ChannelPipeline 中的下一个 ChannelHandler 的类型是否和事件的运动方向相匹配。
如果某个ChannelHandler不能处理则会跳过,并将事件传递到下一个ChannelHandler,直到它找到和该事件所期望的方向相匹配的为止。
io.netty netty-all
/*** @Author: GZ* @CreateTime: 2023-03-20 11:51* @Description: * @Version: 1.0*/
@Component
public class WebSocketChannelConfig extends ChannelInitializer {@Resourceprivate WebSocketFrameHandler webSocketFrameHandler;@Overrideprotected void initChannel(Channel channel) throws Exception {ChannelPipeline pipeline = channel.pipeline();// netty中http协议的编解码pipeline.addLast(new HttpServerCodec());//最大内容长度pipeline.addLast(new HttpObjectAggregator(65536));//压缩pipeline.addLast(new WebSocketServerCompressionHandler());//协议pipeline.addLast(new WebSocketServerProtocolHandler("/ws", null, true));//主要逻辑pipeline.addLast(webSocketFrameHandler);}
}
package com.insound.commontest.component;import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketServerCompressionHandler;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;
import javax.annotation.Resource;/*** @Author: GZ* @CreateTime: 2023-03-20 10:45* @Description: WebSocketServer* @Version: 1.0*/
@Component
@Slf4j
public class WebSocketServer {@Autowiredprivate static WebSocketChannelConfig webSocketChannelConfig;@PostConstructprivate void init() {bind(8084);}public static void bind(int port) {//老板线程,用于接受线程EventLoopGroup bossGroup = new NioEventLoopGroup(1);//工人线程,用于处理任务EventLoopGroup workerGroup = new NioEventLoopGroup(1);try {//创建netty启动类ServerBootstrap serverBootstrap = new ServerBootstrap();//设置线程组serverBootstrap.group(bossGroup, workerGroup)//设置通道非阻塞IO.channel(NioServerSocketChannel.class)//设置日志.handler(new LoggingHandler(LogLevel.INFO)).childHandler(webSocketChannelConfig);//服务器异步创建绑定通道ChannelFuture channelFuture = serverBootstrap.bind(port).sync();log.info("-----netty服务器端启动成功-----");//关闭服务器通道channelFuture.channel().closeFuture().sync();} catch (Exception e) {log.error("WebSocketServer is error",e);} finally {//释放线程池资源bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}
}
@Component
@Slf4j
public class WebSocketFrameHandler extends SimpleChannelInboundHandler {@Overrideprotected void channelRead0(ChannelHandlerContext channelHandlerContext, TextWebSocketFrame textWebSocketFrame) throws Exception {log.info("接受消息");}@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {log.info("建立连接");}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {log.info("断开连接");}
}