# SpringBoot 集成 Netty
admin
2024-02-03 06:20:57
0

SpringBoot 集成 Netty

文章目录

  • SpringBoot 集成 Netty
    • 背景描述
    • Netty与SpringBoot整合关注点
    • Netty组件
      • Bootstrap、ServerBootstrap
      • Channel
      • EventLoop、EventLoopGroup
      • ChannelHandler
      • ChannelPipeline
      • ByteBuf
    • Pom依赖
    • Yml 配置
    • 整合Netty步骤
      • 服务端
      • 客户端

背景描述

  • 如果需要在SpringBoot开发的app中,提供Socket服务,那么Netty是不错的选择。

Netty与SpringBoot整合关注点

  • NettySpringboot生命周期保持一致,同生共死
  • Netty能用上ioc中的Bean
  • Netty能读取到全局的配置

Netty组件

Bootstrap、ServerBootstrap

  • 帮助 Netty 使用者更加方便地组装和配置 Netty ,也可以更方便地启动 Netty 应用程序
  • Bootstrap 用于启动一个 Netty TCP 客户端,或者 UDP 的一端。
  • ServerBootstrap 往往是用于启动一个 Netty 服务端。

Channel

  • ChannelNetty 网络操作抽象类,它除了包括基本的 I/O 操作,如 bind、connect、read、write 之外,还包括了 Netty 框架相关的一些功能,如获取该 ChannelEventLoop
  • 其实就是我们平常网络编程中经常使用的socket套接字对象

EventLoop、EventLoopGroup

  • EventLoop定义了Netty的核心对象,用于处理IO事件,多线程模型、并发
  • 一个EventLoopGroup包含一个或者多个EventLoop
  • 一个EventLoop在它的生命周期内只和一个Thread绑定
  • 所有有EventLoop处理的I/O事件都将在它专有的Thread上被处理
  • 一个Channel在它的生命周期内只注册于一个EventLoop
  • 一个EventLoop可能会被分配给一个或者多个Channel

ChannelHandler

  • ChannelHandler其实就是用于负责处理接收和发送数据的的业务逻辑,Netty中可以注册多个handler,以链式的方式进行处理,根据继承接口的不同,实现的顺序也不同。
  • ChannelHandler主要用于对出站和入站数据进行处理,它有两个重要的子接口:
    • ChannelInboundHandler——处理入站数据
    • ChannelOutboundHandler——处理出站数据

ChannelPipeline

  • ChannelPipelineChannelHandler的容器,通过ChannelPipeline可以将ChannelHandler组织成一个逻辑链,该逻辑链可以用来拦截流经Channel的入站和出站事件,当 Channel被创建时,它会被自动地分配到它的专属的 ChannelPipeline

ByteBuf

  • ByteBuf就是字节缓冲区,用于高效处理输入输出。

Pom依赖

  • 引入springboot starter web netty

org.springframework.bootspring-boot-starter-web2.3.5.RELEASE

io.nettynetty-all4.1.85.Final

Yml 配置

# Springboot 端口
server:port: 2345netty:websocket:# Websocket服务端口port: 1024# 绑定的网卡ip: 0.0.0.0# 消息帧最大体积max-frame-size: 10240# URI路径path: /channel

整合Netty步骤

服务端

  • 使用 SpringBoot Runner 机制启动 Netty 服务。
@Component
@Order
public class NettyStartListener implements ApplicationRunner {@Resourceprivate SocketServer socketServer;@Overridepublic void run(ApplicationArguments args) {this.socketServer.start();}}
  • SocketServe.java
@Component
public class SocketServer {private static final Logger logger = LoggerFactory.getLogger(SocketServer.class);/*** 负责初始化 netty 服务器*/private ServerBootstrap serverBootstrap;@Autowiredprivate SocketInitializer socketInitializer;@Value("${netty.websocket.port}")private int port;/*** 启动 netty 服务器*/public void start() {this.init();this.serverBootstrap.bind(this.port);logger.info("Netty started on port: {} (TCP) with boss thread {}", this.port, 2);}/*** 初始化 netty 配置*/private void init() {// 创建两个线程组 bossGroup 为接收请求的线程组 一般1-2个就行NioEventLoopGroup bossGroup = new NioEventLoopGroup(2);// 实际工作的线程组NioEventLoopGroup workerGroup = new NioEventLoopGroup();this.serverBootstrap = new ServerBootstrap();// 两个线程组加入进来 加入自己的初始化器this.serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(this.socketInitializer);}
}
  • 编写Netty服务端监听消息处理器
public class SocketHandler extends ChannelInboundHandlerAdapter {private static final Logger log = LoggerFactory.getLogger(SocketHandler.class);public static final ChannelGroup clients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);/*** 读取到客户端发来的消息** @param ctx ChannelHandlerContext* @param msg msg*/@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {// 由于我们配置的是 字节数组 编解码器,所以这里取到的用户发来的数据是 byte数组byte[] data = (byte[]) msg;log.info("收到消息: " + new String(data));// 给其他人转发消息for (Channel client : clients) {if (!client.equals(ctx.channel())) {client.writeAndFlush(data);}}}@Overridepublic void handlerAdded(ChannelHandlerContext ctx) {log.info("新的客户端链接:" + ctx.channel().id().asShortText());clients.add(ctx.channel());}@Overridepublic void handlerRemoved(ChannelHandlerContext ctx) {clients.remove(ctx.channel());}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {cause.printStackTrace();ctx.channel().close();clients.remove(ctx.channel());}
}
  • 设置出站解码器和入站编码器
@Component
public class SocketInitializer extends ChannelInitializer {@Overrideprotected void initChannel(SocketChannel socketChannel) {ChannelPipeline pipeline = socketChannel.pipeline();// 添加对byte数组的编解码,netty提供了很多编解码器,你们可以根据需要选择pipeline.addLast(new ByteArrayDecoder());pipeline.addLast(new ByteArrayEncoder());// 添加上自己的处理器pipeline.addLast(new SocketHandler());}
}

客户端

  • 编写 socket连接
public class ChatClient {public void start(String name) throws IOException {SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 1024));socketChannel.configureBlocking(false);Selector selector = Selector.open();socketChannel.register(selector, SelectionKey.OP_READ);// 监听服务端发来得消息new Thread(new ClientThread(selector)).start();// 监听用户输入Scanner scanner = new Scanner(System.in);while (scanner.hasNextLine()) {String message = scanner.next();if (StringUtils.hasText(message)) {socketChannel.write(StandardCharsets.UTF_8.encode(name + ": " + message));}}}private class ClientThread implements Runnable {private final Logger logger = LoggerFactory.getLogger(ClientThread.class);private final Selector selector;public ClientThread(Selector selector) {this.selector = selector;}@Overridepublic void run() {try {while (true) {int channels = selector.select();if (channels == 0) {continue;}Set selectionKeySet = selector.selectedKeys();Iterator keyIterator = selectionKeySet.iterator();while (keyIterator.hasNext()) {SelectionKey selectionKey = keyIterator.next();// 移除集合当前得selectionKey,避免重复处理keyIterator.remove();if (selectionKey.isReadable()) {handleRead(selector, selectionKey);}}}} catch (IOException e) {logger.error(e.getMessage(), e);}}}// 处理可读状态private void handleRead(Selector selector, SelectionKey selectionKey) throws IOException {SocketChannel channel = (SocketChannel) selectionKey.channel();ByteBuffer byteBuffer = ByteBuffer.allocate(1024);StringBuilder message = new StringBuilder();if (channel.read(byteBuffer) > 0) {byteBuffer.flip();message.append(StandardCharsets.UTF_8.decode(byteBuffer));}// 再次注册到选择器上,继续监听可读状态channel.register(selector, SelectionKey.OP_READ);System.out.println(message);}
}
  • start()
public static void main(String[] args) throws IOException {new ChatClient().start("张三");
}

相关内容

热门资讯

苏见信张含韵林峯等众星祝贺李纯... 搜狐娱乐讯 10月2日,李纯、马頔在社交平台上晒照官宣结婚!当天,王菊、张含韵、徐僧、张瑶、隔壁老樊...
江都农商银行:以金融科普筑牢网... 为切实增强社会公众金融风险防范意识与技能,筑牢县域金融安全屏障,在扬州市江都区委宣传部、区委网信办、...
交易商协会:加强对科技创新债券... 【大河财立方消息】9月29日消息,中国银行间市场交易商协会发布《关于非金融企业债务融资工具一般主承销...
港股江西银行、青岛银行跌超2% 新京报贝壳财经讯 10月2日,港股内银股普跌,其中,江西银行、青岛银行、重庆农村商业银行跌超2%,九...
原创 北... 近年来,北京豪宅市场的价格波动,引发了广泛的关注。根据“中指研究院”最新监测的数据显示,位于朝阳区的...
交付超49.7万辆,特斯拉最新... 每经编辑|杜宇 据特斯拉官方微博10月2日晚间消息,2025年第三季度,特斯拉全球生产了超过44....
海控免税创新场景激活消费,离岛... 超级月饼免费吃、爆品奶茶免费送、购物赢大奖、零食铲铲乐……国庆黄金周首日,海控全球精品免税城“魅力国...
爆肝!华闻期货——跟着炒股群炒... 爆肝!华闻期货——跟着炒股群炒股是否违法 在当今社会,随着互联网金融的快速发展,各类商品交易平...
7天!国联期货——亏损了怎么办... 7天!国联期货——亏损了怎么办~炒期货被骗能要回来吗 在当今社会,随着互联网金融的快速发展,各...
破防!国联期货——跟着老师网上... 破防!国联期货——跟着老师网上投资炒期货被骗 在当今社会,随着互联网金融的快速发展,各类商品交...
抽象!天富期货——亏了好多钱怎... 抽象!天富期货——亏了好多钱怎么办~炒期货被骗能要回来吗 在当今社会,随着互联网金融的快速发展...
碾压!国联期货——老师带单炒期... 碾压!国联期货——老师带单炒期货黄金 在当今社会,随着互联网金融的快速发展,各类商品交易平台如...
告发!‌华闻期货——被老师诱导... 告发!‌华闻期货——被老师诱导炒期货亏损严重 在当今社会,随着互联网金融的快速发展,各类商品交...
追责!国联期货——正规合法吗~... 追责!国联期货——正规合法吗~炒期货被骗能要回来吗 在当今社会,随着互联网金融的快速发展,各类...
‌标杆!天富期货——亏了好多钱... ‌标杆!天富期货——亏了好多钱怎么办~平台投资被骗报警找得回吗 在当今社会,随着互联网金融的快...
可信!天富期货——亏损了怎么办... 可信!天富期货——亏损了怎么办~平台投资被骗报警找得回吗 在当今社会,随着互联网金融的快速发展...
确凿!国联期货——亏损了怎么办... 确凿!国联期货——亏损了怎么办~投资被骗报警钱能追回吗 在当今社会,随着互联网金融的快速发展,...
披露!国联期货——亏了好多钱怎... 披露!国联期货——亏了好多钱怎么办~在平台操作亏了正常吗? 在当今社会,随着互联网金融的快速发...
开挂!国联期货——期货平台投资... 开挂!国联期货——期货平台投资被骗报警找得回吗 在当今社会,随着互联网金融的快速发展,各类商品...
客观!天富期货——亏了好多钱怎... 客观!天富期货——亏了好多钱怎么办~跟着老师炒期货可信吗 在当今社会,随着互联网金融的快速发展...