# SpringBoot 集成 Netty
admin
2024-02-03 06:20:57
0

SpringBoot 集成 Netty

文章目录

  • SpringBoot 集成 Netty
    • 背景描述
    • Netty与SpringBoot整合关注点
    • Netty组件
      • Bootstrap、ServerBootstrap
      • Channel
      • EventLoop、EventLoopGroup
      • ChannelHandler
      • ChannelPipeline
      • ByteBuf
    • Pom依赖
    • Yml 配置
    • 整合Netty步骤
      • 服务端
      • 客户端

背景描述

  • 如果需要在SpringBoot开发的app中,提供Socket服务,那么Netty是不错的选择。

Netty与SpringBoot整合关注点

  • NettySpringboot生命周期保持一致,同生共死
  • Netty能用上ioc中的Bean
  • Netty能读取到全局的配置

Netty组件

Bootstrap、ServerBootstrap

  • 帮助 Netty 使用者更加方便地组装和配置 Netty ,也可以更方便地启动 Netty 应用程序
  • Bootstrap 用于启动一个 Netty TCP 客户端,或者 UDP 的一端。
  • ServerBootstrap 往往是用于启动一个 Netty 服务端。

Channel

  • ChannelNetty 网络操作抽象类,它除了包括基本的 I/O 操作,如 bind、connect、read、write 之外,还包括了 Netty 框架相关的一些功能,如获取该 ChannelEventLoop
  • 其实就是我们平常网络编程中经常使用的socket套接字对象

EventLoop、EventLoopGroup

  • EventLoop定义了Netty的核心对象,用于处理IO事件,多线程模型、并发
  • 一个EventLoopGroup包含一个或者多个EventLoop
  • 一个EventLoop在它的生命周期内只和一个Thread绑定
  • 所有有EventLoop处理的I/O事件都将在它专有的Thread上被处理
  • 一个Channel在它的生命周期内只注册于一个EventLoop
  • 一个EventLoop可能会被分配给一个或者多个Channel

ChannelHandler

  • ChannelHandler其实就是用于负责处理接收和发送数据的的业务逻辑,Netty中可以注册多个handler,以链式的方式进行处理,根据继承接口的不同,实现的顺序也不同。
  • ChannelHandler主要用于对出站和入站数据进行处理,它有两个重要的子接口:
    • ChannelInboundHandler——处理入站数据
    • ChannelOutboundHandler——处理出站数据

ChannelPipeline

  • ChannelPipelineChannelHandler的容器,通过ChannelPipeline可以将ChannelHandler组织成一个逻辑链,该逻辑链可以用来拦截流经Channel的入站和出站事件,当 Channel被创建时,它会被自动地分配到它的专属的 ChannelPipeline

ByteBuf

  • ByteBuf就是字节缓冲区,用于高效处理输入输出。

Pom依赖

  • 引入springboot starter web netty

org.springframework.bootspring-boot-starter-web2.3.5.RELEASE

io.nettynetty-all4.1.85.Final

Yml 配置

# Springboot 端口
server:port: 2345netty:websocket:# Websocket服务端口port: 1024# 绑定的网卡ip: 0.0.0.0# 消息帧最大体积max-frame-size: 10240# URI路径path: /channel

整合Netty步骤

服务端

  • 使用 SpringBoot Runner 机制启动 Netty 服务。
@Component
@Order
public class NettyStartListener implements ApplicationRunner {@Resourceprivate SocketServer socketServer;@Overridepublic void run(ApplicationArguments args) {this.socketServer.start();}}
  • SocketServe.java
@Component
public class SocketServer {private static final Logger logger = LoggerFactory.getLogger(SocketServer.class);/*** 负责初始化 netty 服务器*/private ServerBootstrap serverBootstrap;@Autowiredprivate SocketInitializer socketInitializer;@Value("${netty.websocket.port}")private int port;/*** 启动 netty 服务器*/public void start() {this.init();this.serverBootstrap.bind(this.port);logger.info("Netty started on port: {} (TCP) with boss thread {}", this.port, 2);}/*** 初始化 netty 配置*/private void init() {// 创建两个线程组 bossGroup 为接收请求的线程组 一般1-2个就行NioEventLoopGroup bossGroup = new NioEventLoopGroup(2);// 实际工作的线程组NioEventLoopGroup workerGroup = new NioEventLoopGroup();this.serverBootstrap = new ServerBootstrap();// 两个线程组加入进来 加入自己的初始化器this.serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(this.socketInitializer);}
}
  • 编写Netty服务端监听消息处理器
public class SocketHandler extends ChannelInboundHandlerAdapter {private static final Logger log = LoggerFactory.getLogger(SocketHandler.class);public static final ChannelGroup clients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);/*** 读取到客户端发来的消息** @param ctx ChannelHandlerContext* @param msg msg*/@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {// 由于我们配置的是 字节数组 编解码器,所以这里取到的用户发来的数据是 byte数组byte[] data = (byte[]) msg;log.info("收到消息: " + new String(data));// 给其他人转发消息for (Channel client : clients) {if (!client.equals(ctx.channel())) {client.writeAndFlush(data);}}}@Overridepublic void handlerAdded(ChannelHandlerContext ctx) {log.info("新的客户端链接:" + ctx.channel().id().asShortText());clients.add(ctx.channel());}@Overridepublic void handlerRemoved(ChannelHandlerContext ctx) {clients.remove(ctx.channel());}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {cause.printStackTrace();ctx.channel().close();clients.remove(ctx.channel());}
}
  • 设置出站解码器和入站编码器
@Component
public class SocketInitializer extends ChannelInitializer {@Overrideprotected void initChannel(SocketChannel socketChannel) {ChannelPipeline pipeline = socketChannel.pipeline();// 添加对byte数组的编解码,netty提供了很多编解码器,你们可以根据需要选择pipeline.addLast(new ByteArrayDecoder());pipeline.addLast(new ByteArrayEncoder());// 添加上自己的处理器pipeline.addLast(new SocketHandler());}
}

客户端

  • 编写 socket连接
public class ChatClient {public void start(String name) throws IOException {SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 1024));socketChannel.configureBlocking(false);Selector selector = Selector.open();socketChannel.register(selector, SelectionKey.OP_READ);// 监听服务端发来得消息new Thread(new ClientThread(selector)).start();// 监听用户输入Scanner scanner = new Scanner(System.in);while (scanner.hasNextLine()) {String message = scanner.next();if (StringUtils.hasText(message)) {socketChannel.write(StandardCharsets.UTF_8.encode(name + ": " + message));}}}private class ClientThread implements Runnable {private final Logger logger = LoggerFactory.getLogger(ClientThread.class);private final Selector selector;public ClientThread(Selector selector) {this.selector = selector;}@Overridepublic void run() {try {while (true) {int channels = selector.select();if (channels == 0) {continue;}Set selectionKeySet = selector.selectedKeys();Iterator keyIterator = selectionKeySet.iterator();while (keyIterator.hasNext()) {SelectionKey selectionKey = keyIterator.next();// 移除集合当前得selectionKey,避免重复处理keyIterator.remove();if (selectionKey.isReadable()) {handleRead(selector, selectionKey);}}}} catch (IOException e) {logger.error(e.getMessage(), e);}}}// 处理可读状态private void handleRead(Selector selector, SelectionKey selectionKey) throws IOException {SocketChannel channel = (SocketChannel) selectionKey.channel();ByteBuffer byteBuffer = ByteBuffer.allocate(1024);StringBuilder message = new StringBuilder();if (channel.read(byteBuffer) > 0) {byteBuffer.flip();message.append(StandardCharsets.UTF_8.decode(byteBuffer));}// 再次注册到选择器上,继续监听可读状态channel.register(selector, SelectionKey.OP_READ);System.out.println(message);}
}
  • start()
public static void main(String[] args) throws IOException {new ChatClient().start("张三");
}

相关内容

热门资讯

货币政策加大宏观调控强度 为贯彻中央政治局会议精神,进一步实施好适度宽松的货币政策,中国人民银行行长潘功胜近日在国新办新闻发布...
国投白银LOF:将于1月22日... 国投瑞银白银期货证券投资基金(LOF)A类基金份额发布二级市场交易价格溢价风险提示及停复牌公告。近期...
2024年度个人所得税综合所得... 一、什么是年度汇算? 年度汇算指的是年度终了后,纳税人汇总一个纳税年度内取得的综合所得收入额,减除费...
原创 三... 2026年的春节脚步渐近,当电影市场摩拳擦掌争夺票房时,休闲零食赛道的“互联网鼻祖”三只松鼠却再次选...
通源环境大宗交易溢价成交3.8... 通源环境01月21日大宗交易平台共发生1笔成交,合计成交量3.88万股,成交金额202.46万元。成...
眼内手术自主机器人:重塑眼科手... 国中科院自动化研究所边桂彬团队研发的眼内手术自主机器人系统,正为眼科手术领域注入革命性力量,有望彻底...
从全球第四、全球第二,看中国汽... 近日,各大车企及行业媒体陆续发布了2024年汽车销量数据。在各类榜单中,比亚迪排名均十分优异:202...
大唐御医的误诊:以为是绝症肿瘤... 声明:本文内容结合公开史料与中医典籍进行艺术创作,旨在人文科普,不传播封建迷信,请读者朋友保持理性阅...
赛英电子治理“黑洞”:IPO前... 本文时代商业研究院 作者:陆烁宜 来源丨时代商业研究院 作者丨陆烁宜 编辑丨郑琳 IPO前夕董秘及...
万亿市值迫近 北交所托举实体创... 【编者按】新质生产力加速成长、产业升级步履铿锵、首都功能不断提升……“十四五”时期,我们见证了北京高...
确认了!她接棒父亲任董事长 公司召开第四届董事会第十九次会议选举公司副董事长石思慧担任公司董事长,石平湘担任副董事长。此外,公司...
促消费!6部门发布19条举措加... 6月24日 为推动大力提振消费 中国人民银行等6部门对外发布 《关于金融支持提振和扩大消费的指导意见...
退税更“丝滑” 多地提供“即买... 近日,商务部等6部门发布通知,进一步优化离境退税政策。文件出台后,一些城市增加了退税商店,提供更加便...
突发!王健林旗下大连万达集团所... 红星资本局3月23日消息,日前,王健林旗下大连万达集团股份有限公司(以下简称大连万达集团)又新增一条...
银行间主要利率债收益率升幅扩大 每经AI快讯,3月14日,银行间主要利率债收益率升幅扩大,10年期国开债“25国开05”收益率上行1...
国内期货夜盘开盘多数上涨 【国内期货夜盘开盘多数上涨】沪银涨近3%,铁矿石、沪镍、沪锡、焦炭等均涨超1%,沪金涨近1%;跌幅方...
6国心脏外科医生到北医三院,首... 近日 首期北京大学“一带一路” 微创冠脉搭桥国际高级学习班 在北医三院顺利举行 来自 葡萄牙、以色列...
去年净利预亏约10亿!“国产G... 摩尔线程(688795.SH)今日公告称,公司预计2025年年度实现营业收入14.5亿元到15.2亿...
华能旗下上市公司资产重组过审! 12月12日,内蒙古蒙电华能热电股份有限公司(以下简称“内蒙华电”)发布公告称,拟通过发行股份及支付...
金价,暴跌! 10月21日,黄金白银再次急跌。 截至16:43发稿时,伦敦现货黄金大跌近2%,交投于4270美元/...