SpringBoot 集成 Netty
文章目录
- SpringBoot 集成 Netty
- 背景描述
- Netty与SpringBoot整合关注点
- Netty组件
- Bootstrap、ServerBootstrap
- Channel
- EventLoop、EventLoopGroup
- ChannelHandler
- ChannelPipeline
- ByteBuf
- Pom依赖
- Yml 配置
- 整合Netty步骤
背景描述
- 如果需要在
SpringBoot
开发的app
中,提供Socket
服务,那么Netty
是不错的选择。
Netty与SpringBoot整合关注点
Netty
跟Springboot
生命周期保持一致,同生共死Netty
能用上ioc
中的Bean
Netty
能读取到全局的配置
Netty组件
Bootstrap、ServerBootstrap
- 帮助
Netty
使用者更加方便地组装和配置 Netty
,也可以更方便地启动 Netty
应用程序 Bootstrap
用于启动一个 Netty TCP
客户端,或者 UDP
的一端。ServerBootstrap
往往是用于启动一个 Netty
服务端。
Channel
Channel
是 Netty
网络操作抽象类,它除了包括基本的 I/O
操作,如 bind、connect、read、write
之外,还包括了 Netty
框架相关的一些功能,如获取该 Channel
的 EventLoop
。- 其实就是我们平常网络编程中经常使用的
socket
套接字对象
EventLoop、EventLoopGroup
EventLoop
定义了Netty
的核心对象,用于处理IO
事件,多线程模型、并发- 一个
EventLoopGroup
包含一个或者多个EventLoop
- 一个
EventLoop
在它的生命周期内只和一个Thread
绑定 - 所有有
EventLoop
处理的I/O
事件都将在它专有的Thread
上被处理 - 一个
Channel
在它的生命周期内只注册于一个EventLoop
- 一个
EventLoop
可能会被分配给一个或者多个Channel
ChannelHandler
ChannelHandler
其实就是用于负责处理接收和发送数据的的业务逻辑,Netty
中可以注册多个handler
,以链式的方式进行处理,根据继承接口的不同,实现的顺序也不同。ChannelHandler
主要用于对出站和入站数据进行处理,它有两个重要的子接口: ChannelInboundHandler
——处理入站数据ChannelOutboundHandler
——处理出站数据
ChannelPipeline
ChannelPipeline
是ChannelHandler
的容器,通过ChannelPipeline
可以将ChannelHandler
组织成一个逻辑链,该逻辑链可以用来拦截流经Channel
的入站和出站事件,当 Channel
被创建时,它会被自动地分配到它的专属的 ChannelPipeline
。
ByteBuf
ByteBuf
就是字节缓冲区,用于高效处理输入输出。
Pom依赖
- 引入
springboot starter web
和 netty
org.springframework.bootspring-boot-starter-web2.3.5.RELEASE
io.nettynetty-all4.1.85.Final
Yml 配置
# Springboot 端口
server:port: 2345netty:websocket:# Websocket服务端口port: 1024# 绑定的网卡ip: 0.0.0.0# 消息帧最大体积max-frame-size: 10240# URI路径path: /channel
整合Netty步骤
服务端
- 使用
SpringBoot Runner
机制启动 Netty
服务。
@Component
@Order
public class NettyStartListener implements ApplicationRunner {@Resourceprivate SocketServer socketServer;@Overridepublic void run(ApplicationArguments args) {this.socketServer.start();}}
@Component
public class SocketServer {private static final Logger logger = LoggerFactory.getLogger(SocketServer.class);/*** 负责初始化 netty 服务器*/private ServerBootstrap serverBootstrap;@Autowiredprivate SocketInitializer socketInitializer;@Value("${netty.websocket.port}")private int port;/*** 启动 netty 服务器*/public void start() {this.init();this.serverBootstrap.bind(this.port);logger.info("Netty started on port: {} (TCP) with boss thread {}", this.port, 2);}/*** 初始化 netty 配置*/private void init() {// 创建两个线程组 bossGroup 为接收请求的线程组 一般1-2个就行NioEventLoopGroup bossGroup = new NioEventLoopGroup(2);// 实际工作的线程组NioEventLoopGroup workerGroup = new NioEventLoopGroup();this.serverBootstrap = new ServerBootstrap();// 两个线程组加入进来 加入自己的初始化器this.serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(this.socketInitializer);}
}
public class SocketHandler extends ChannelInboundHandlerAdapter {private static final Logger log = LoggerFactory.getLogger(SocketHandler.class);public static final ChannelGroup clients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);/*** 读取到客户端发来的消息** @param ctx ChannelHandlerContext* @param msg msg*/@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {// 由于我们配置的是 字节数组 编解码器,所以这里取到的用户发来的数据是 byte数组byte[] data = (byte[]) msg;log.info("收到消息: " + new String(data));// 给其他人转发消息for (Channel client : clients) {if (!client.equals(ctx.channel())) {client.writeAndFlush(data);}}}@Overridepublic void handlerAdded(ChannelHandlerContext ctx) {log.info("新的客户端链接:" + ctx.channel().id().asShortText());clients.add(ctx.channel());}@Overridepublic void handlerRemoved(ChannelHandlerContext ctx) {clients.remove(ctx.channel());}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {cause.printStackTrace();ctx.channel().close();clients.remove(ctx.channel());}
}
@Component
public class SocketInitializer extends ChannelInitializer {@Overrideprotected void initChannel(SocketChannel socketChannel) {ChannelPipeline pipeline = socketChannel.pipeline();// 添加对byte数组的编解码,netty提供了很多编解码器,你们可以根据需要选择pipeline.addLast(new ByteArrayDecoder());pipeline.addLast(new ByteArrayEncoder());// 添加上自己的处理器pipeline.addLast(new SocketHandler());}
}
客户端
public class ChatClient {public void start(String name) throws IOException {SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 1024));socketChannel.configureBlocking(false);Selector selector = Selector.open();socketChannel.register(selector, SelectionKey.OP_READ);// 监听服务端发来得消息new Thread(new ClientThread(selector)).start();// 监听用户输入Scanner scanner = new Scanner(System.in);while (scanner.hasNextLine()) {String message = scanner.next();if (StringUtils.hasText(message)) {socketChannel.write(StandardCharsets.UTF_8.encode(name + ": " + message));}}}private class ClientThread implements Runnable {private final Logger logger = LoggerFactory.getLogger(ClientThread.class);private final Selector selector;public ClientThread(Selector selector) {this.selector = selector;}@Overridepublic void run() {try {while (true) {int channels = selector.select();if (channels == 0) {continue;}Set selectionKeySet = selector.selectedKeys();Iterator keyIterator = selectionKeySet.iterator();while (keyIterator.hasNext()) {SelectionKey selectionKey = keyIterator.next();// 移除集合当前得selectionKey,避免重复处理keyIterator.remove();if (selectionKey.isReadable()) {handleRead(selector, selectionKey);}}}} catch (IOException e) {logger.error(e.getMessage(), e);}}}// 处理可读状态private void handleRead(Selector selector, SelectionKey selectionKey) throws IOException {SocketChannel channel = (SocketChannel) selectionKey.channel();ByteBuffer byteBuffer = ByteBuffer.allocate(1024);StringBuilder message = new StringBuilder();if (channel.read(byteBuffer) > 0) {byteBuffer.flip();message.append(StandardCharsets.UTF_8.decode(byteBuffer));}// 再次注册到选择器上,继续监听可读状态channel.register(selector, SelectionKey.OP_READ);System.out.println(message);}
}
public static void main(String[] args) throws IOException {new ChatClient().start("张三");
}