# SpringBoot 集成 Netty
admin
2024-02-03 06:20:57
0

SpringBoot 集成 Netty

文章目录

  • SpringBoot 集成 Netty
    • 背景描述
    • Netty与SpringBoot整合关注点
    • Netty组件
      • Bootstrap、ServerBootstrap
      • Channel
      • EventLoop、EventLoopGroup
      • ChannelHandler
      • ChannelPipeline
      • ByteBuf
    • Pom依赖
    • Yml 配置
    • 整合Netty步骤
      • 服务端
      • 客户端

背景描述

  • 如果需要在SpringBoot开发的app中,提供Socket服务,那么Netty是不错的选择。

Netty与SpringBoot整合关注点

  • NettySpringboot生命周期保持一致,同生共死
  • Netty能用上ioc中的Bean
  • Netty能读取到全局的配置

Netty组件

Bootstrap、ServerBootstrap

  • 帮助 Netty 使用者更加方便地组装和配置 Netty ,也可以更方便地启动 Netty 应用程序
  • Bootstrap 用于启动一个 Netty TCP 客户端,或者 UDP 的一端。
  • ServerBootstrap 往往是用于启动一个 Netty 服务端。

Channel

  • ChannelNetty 网络操作抽象类,它除了包括基本的 I/O 操作,如 bind、connect、read、write 之外,还包括了 Netty 框架相关的一些功能,如获取该 ChannelEventLoop
  • 其实就是我们平常网络编程中经常使用的socket套接字对象

EventLoop、EventLoopGroup

  • EventLoop定义了Netty的核心对象,用于处理IO事件,多线程模型、并发
  • 一个EventLoopGroup包含一个或者多个EventLoop
  • 一个EventLoop在它的生命周期内只和一个Thread绑定
  • 所有有EventLoop处理的I/O事件都将在它专有的Thread上被处理
  • 一个Channel在它的生命周期内只注册于一个EventLoop
  • 一个EventLoop可能会被分配给一个或者多个Channel

ChannelHandler

  • ChannelHandler其实就是用于负责处理接收和发送数据的的业务逻辑,Netty中可以注册多个handler,以链式的方式进行处理,根据继承接口的不同,实现的顺序也不同。
  • ChannelHandler主要用于对出站和入站数据进行处理,它有两个重要的子接口:
    • ChannelInboundHandler——处理入站数据
    • ChannelOutboundHandler——处理出站数据

ChannelPipeline

  • ChannelPipelineChannelHandler的容器,通过ChannelPipeline可以将ChannelHandler组织成一个逻辑链,该逻辑链可以用来拦截流经Channel的入站和出站事件,当 Channel被创建时,它会被自动地分配到它的专属的 ChannelPipeline

ByteBuf

  • ByteBuf就是字节缓冲区,用于高效处理输入输出。

Pom依赖

  • 引入springboot starter web netty

org.springframework.bootspring-boot-starter-web2.3.5.RELEASE

io.nettynetty-all4.1.85.Final

Yml 配置

# Springboot 端口
server:port: 2345netty:websocket:# Websocket服务端口port: 1024# 绑定的网卡ip: 0.0.0.0# 消息帧最大体积max-frame-size: 10240# URI路径path: /channel

整合Netty步骤

服务端

  • 使用 SpringBoot Runner 机制启动 Netty 服务。
@Component
@Order
public class NettyStartListener implements ApplicationRunner {@Resourceprivate SocketServer socketServer;@Overridepublic void run(ApplicationArguments args) {this.socketServer.start();}}
  • SocketServe.java
@Component
public class SocketServer {private static final Logger logger = LoggerFactory.getLogger(SocketServer.class);/*** 负责初始化 netty 服务器*/private ServerBootstrap serverBootstrap;@Autowiredprivate SocketInitializer socketInitializer;@Value("${netty.websocket.port}")private int port;/*** 启动 netty 服务器*/public void start() {this.init();this.serverBootstrap.bind(this.port);logger.info("Netty started on port: {} (TCP) with boss thread {}", this.port, 2);}/*** 初始化 netty 配置*/private void init() {// 创建两个线程组 bossGroup 为接收请求的线程组 一般1-2个就行NioEventLoopGroup bossGroup = new NioEventLoopGroup(2);// 实际工作的线程组NioEventLoopGroup workerGroup = new NioEventLoopGroup();this.serverBootstrap = new ServerBootstrap();// 两个线程组加入进来 加入自己的初始化器this.serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(this.socketInitializer);}
}
  • 编写Netty服务端监听消息处理器
public class SocketHandler extends ChannelInboundHandlerAdapter {private static final Logger log = LoggerFactory.getLogger(SocketHandler.class);public static final ChannelGroup clients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);/*** 读取到客户端发来的消息** @param ctx ChannelHandlerContext* @param msg msg*/@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {// 由于我们配置的是 字节数组 编解码器,所以这里取到的用户发来的数据是 byte数组byte[] data = (byte[]) msg;log.info("收到消息: " + new String(data));// 给其他人转发消息for (Channel client : clients) {if (!client.equals(ctx.channel())) {client.writeAndFlush(data);}}}@Overridepublic void handlerAdded(ChannelHandlerContext ctx) {log.info("新的客户端链接:" + ctx.channel().id().asShortText());clients.add(ctx.channel());}@Overridepublic void handlerRemoved(ChannelHandlerContext ctx) {clients.remove(ctx.channel());}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {cause.printStackTrace();ctx.channel().close();clients.remove(ctx.channel());}
}
  • 设置出站解码器和入站编码器
@Component
public class SocketInitializer extends ChannelInitializer {@Overrideprotected void initChannel(SocketChannel socketChannel) {ChannelPipeline pipeline = socketChannel.pipeline();// 添加对byte数组的编解码,netty提供了很多编解码器,你们可以根据需要选择pipeline.addLast(new ByteArrayDecoder());pipeline.addLast(new ByteArrayEncoder());// 添加上自己的处理器pipeline.addLast(new SocketHandler());}
}

客户端

  • 编写 socket连接
public class ChatClient {public void start(String name) throws IOException {SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 1024));socketChannel.configureBlocking(false);Selector selector = Selector.open();socketChannel.register(selector, SelectionKey.OP_READ);// 监听服务端发来得消息new Thread(new ClientThread(selector)).start();// 监听用户输入Scanner scanner = new Scanner(System.in);while (scanner.hasNextLine()) {String message = scanner.next();if (StringUtils.hasText(message)) {socketChannel.write(StandardCharsets.UTF_8.encode(name + ": " + message));}}}private class ClientThread implements Runnable {private final Logger logger = LoggerFactory.getLogger(ClientThread.class);private final Selector selector;public ClientThread(Selector selector) {this.selector = selector;}@Overridepublic void run() {try {while (true) {int channels = selector.select();if (channels == 0) {continue;}Set selectionKeySet = selector.selectedKeys();Iterator keyIterator = selectionKeySet.iterator();while (keyIterator.hasNext()) {SelectionKey selectionKey = keyIterator.next();// 移除集合当前得selectionKey,避免重复处理keyIterator.remove();if (selectionKey.isReadable()) {handleRead(selector, selectionKey);}}}} catch (IOException e) {logger.error(e.getMessage(), e);}}}// 处理可读状态private void handleRead(Selector selector, SelectionKey selectionKey) throws IOException {SocketChannel channel = (SocketChannel) selectionKey.channel();ByteBuffer byteBuffer = ByteBuffer.allocate(1024);StringBuilder message = new StringBuilder();if (channel.read(byteBuffer) > 0) {byteBuffer.flip();message.append(StandardCharsets.UTF_8.decode(byteBuffer));}// 再次注册到选择器上,继续监听可读状态channel.register(selector, SelectionKey.OP_READ);System.out.println(message);}
}
  • start()
public static void main(String[] args) throws IOException {new ChatClient().start("张三");
}

相关内容

热门资讯

股价腰斩,“章建平”割肉了 短短4个多月,江淮汽车股价腰斩,知名游资章建平也扛不住了。最新公告显示,6月10日至6月23日,章建...
扩张提速、店铺“加密”,“硬折... 北京“硬折扣”超市“迎新”。6月26日,盒马旗下平价社区超市超盒算NB首批6家门店同步开业,网点覆盖...
外媒:黄金白银遭遇“完美风暴” 参考消息网6月26日报道据西班牙《经济学家报》网站6月23日报道,贵金属在金融市场正经历一场名副其实...
原创 法... 巴黎《费加罗报》给中国扣上"拯救者"的帽子,纽约《华尔街日报》隔着大西洋默默点头。 两家立场南辕北辙...
金价大跌!有商家囤货资产缩水百... 近期,国际金价持续大幅下行。6月26日19时30分左右,伦敦金现货价格报4050美元/盎司,较年内高...
原创 人... 大家好,这里是史记文谭,闲中着色,笑里有情,不废观星问月,亦赏市井浮生。 前言 咱们每天兜里揣着的钱...
交运股份告别六年扣非亏损,更名... 本报记者 张蓓 陈炳衡 北京报道 日前,上海交运集团股份有限公司(600676.SH)召开2026年...
原创 星... 马斯克的手又伸长了。这次不是火箭回收,也不是把"星链"塞进乌克兰战壕,而是直接杀进美国消费者的手机号...
视频丨一部剧带火一座城 “追剧... 第31届上海电视节各奖项昨晚(26日)揭晓,谍战题材电视剧《沉默的荣耀》在5项重磅提名中,最终斩获评...
东京经济论坛现场观察:日本华商... 作者 | 东京谢社长 6月26日,我去东京丽嘉皇家酒店参加了东京国际商学院EMBA二期开学典礼暨...
苏州投资人问:土耳其20年免税... 苏州投资人问:土耳其20年免税到底怎么理解? 最近一段时间,苏州工业园区和外企圈子里,关于土耳其20...
刘强东为70万京东物流人员规划... 职业被智能化设备迭代替代,已经成为当下众多从业者共同的内心顾虑。近期刘强东在行业论坛的发言,再度引发...
富国基金换帅:裴长江退休卸任,... 6月26日,富国基金发布高级管理人员变更的公告,董事长裴长江因退休离任,申万宏源证券执行委员会成员王...
两部门最新发布!事关黄金及黄金... 6月26日,中国人民银行、海关总署联合发布通知,就《黄金及黄金制品进出口管理办法(征求意见稿)》向社...
中信重工重构全球矿山装备供应链... 文丨承承 编辑丨李壮 2026年盛夏,第四届“中国国际供应链促进博览会”在北京顺义拉开帷幕。在中信集...
全球爆火的ETF,纳入中国存储... 史上增长最快的新发ETF,刚刚把"中国存储龙头"买成了前十大重仓! 6月,Roundhill Mem...
金价暴跌!重回“3字头”时代 继6月24日、25日伦敦金现连续两日盘中跌破4000美元/盎司后,6月26日国际金价延续跌势。 截至...
一批站在“光”里的基金经理们,... 【导读】一批绩优“追光者”密集出手限购,年内业绩前十均已“闭门谢客” 中国基金报记者 曹雯璟 仅过了...
赛场出圈,多品类业务破局,蒙牛... 2026世界杯加持,股价逆势走高! 文/每日财报 南黎 夏日的墨西哥城阿兹特克体育场,伴随着202...
IPO抢着给科技输血,钱却在选... 2026年上半年,A股IPO市场交出81家上市、1057亿元募资的成绩单,新股首日回报率233%创近...