# SpringBoot 集成 Netty
admin
2024-02-03 06:20:57
0

SpringBoot 集成 Netty

文章目录

  • SpringBoot 集成 Netty
    • 背景描述
    • Netty与SpringBoot整合关注点
    • Netty组件
      • Bootstrap、ServerBootstrap
      • Channel
      • EventLoop、EventLoopGroup
      • ChannelHandler
      • ChannelPipeline
      • ByteBuf
    • Pom依赖
    • Yml 配置
    • 整合Netty步骤
      • 服务端
      • 客户端

背景描述

  • 如果需要在SpringBoot开发的app中,提供Socket服务,那么Netty是不错的选择。

Netty与SpringBoot整合关注点

  • NettySpringboot生命周期保持一致,同生共死
  • Netty能用上ioc中的Bean
  • Netty能读取到全局的配置

Netty组件

Bootstrap、ServerBootstrap

  • 帮助 Netty 使用者更加方便地组装和配置 Netty ,也可以更方便地启动 Netty 应用程序
  • Bootstrap 用于启动一个 Netty TCP 客户端,或者 UDP 的一端。
  • ServerBootstrap 往往是用于启动一个 Netty 服务端。

Channel

  • ChannelNetty 网络操作抽象类,它除了包括基本的 I/O 操作,如 bind、connect、read、write 之外,还包括了 Netty 框架相关的一些功能,如获取该 ChannelEventLoop
  • 其实就是我们平常网络编程中经常使用的socket套接字对象

EventLoop、EventLoopGroup

  • EventLoop定义了Netty的核心对象,用于处理IO事件,多线程模型、并发
  • 一个EventLoopGroup包含一个或者多个EventLoop
  • 一个EventLoop在它的生命周期内只和一个Thread绑定
  • 所有有EventLoop处理的I/O事件都将在它专有的Thread上被处理
  • 一个Channel在它的生命周期内只注册于一个EventLoop
  • 一个EventLoop可能会被分配给一个或者多个Channel

ChannelHandler

  • ChannelHandler其实就是用于负责处理接收和发送数据的的业务逻辑,Netty中可以注册多个handler,以链式的方式进行处理,根据继承接口的不同,实现的顺序也不同。
  • ChannelHandler主要用于对出站和入站数据进行处理,它有两个重要的子接口:
    • ChannelInboundHandler——处理入站数据
    • ChannelOutboundHandler——处理出站数据

ChannelPipeline

  • ChannelPipelineChannelHandler的容器,通过ChannelPipeline可以将ChannelHandler组织成一个逻辑链,该逻辑链可以用来拦截流经Channel的入站和出站事件,当 Channel被创建时,它会被自动地分配到它的专属的 ChannelPipeline

ByteBuf

  • ByteBuf就是字节缓冲区,用于高效处理输入输出。

Pom依赖

  • 引入springboot starter web netty

org.springframework.bootspring-boot-starter-web2.3.5.RELEASE

io.nettynetty-all4.1.85.Final

Yml 配置

# Springboot 端口
server:port: 2345netty:websocket:# Websocket服务端口port: 1024# 绑定的网卡ip: 0.0.0.0# 消息帧最大体积max-frame-size: 10240# URI路径path: /channel

整合Netty步骤

服务端

  • 使用 SpringBoot Runner 机制启动 Netty 服务。
@Component
@Order
public class NettyStartListener implements ApplicationRunner {@Resourceprivate SocketServer socketServer;@Overridepublic void run(ApplicationArguments args) {this.socketServer.start();}}
  • SocketServe.java
@Component
public class SocketServer {private static final Logger logger = LoggerFactory.getLogger(SocketServer.class);/*** 负责初始化 netty 服务器*/private ServerBootstrap serverBootstrap;@Autowiredprivate SocketInitializer socketInitializer;@Value("${netty.websocket.port}")private int port;/*** 启动 netty 服务器*/public void start() {this.init();this.serverBootstrap.bind(this.port);logger.info("Netty started on port: {} (TCP) with boss thread {}", this.port, 2);}/*** 初始化 netty 配置*/private void init() {// 创建两个线程组 bossGroup 为接收请求的线程组 一般1-2个就行NioEventLoopGroup bossGroup = new NioEventLoopGroup(2);// 实际工作的线程组NioEventLoopGroup workerGroup = new NioEventLoopGroup();this.serverBootstrap = new ServerBootstrap();// 两个线程组加入进来 加入自己的初始化器this.serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(this.socketInitializer);}
}
  • 编写Netty服务端监听消息处理器
public class SocketHandler extends ChannelInboundHandlerAdapter {private static final Logger log = LoggerFactory.getLogger(SocketHandler.class);public static final ChannelGroup clients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);/*** 读取到客户端发来的消息** @param ctx ChannelHandlerContext* @param msg msg*/@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {// 由于我们配置的是 字节数组 编解码器,所以这里取到的用户发来的数据是 byte数组byte[] data = (byte[]) msg;log.info("收到消息: " + new String(data));// 给其他人转发消息for (Channel client : clients) {if (!client.equals(ctx.channel())) {client.writeAndFlush(data);}}}@Overridepublic void handlerAdded(ChannelHandlerContext ctx) {log.info("新的客户端链接:" + ctx.channel().id().asShortText());clients.add(ctx.channel());}@Overridepublic void handlerRemoved(ChannelHandlerContext ctx) {clients.remove(ctx.channel());}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {cause.printStackTrace();ctx.channel().close();clients.remove(ctx.channel());}
}
  • 设置出站解码器和入站编码器
@Component
public class SocketInitializer extends ChannelInitializer {@Overrideprotected void initChannel(SocketChannel socketChannel) {ChannelPipeline pipeline = socketChannel.pipeline();// 添加对byte数组的编解码,netty提供了很多编解码器,你们可以根据需要选择pipeline.addLast(new ByteArrayDecoder());pipeline.addLast(new ByteArrayEncoder());// 添加上自己的处理器pipeline.addLast(new SocketHandler());}
}

客户端

  • 编写 socket连接
public class ChatClient {public void start(String name) throws IOException {SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 1024));socketChannel.configureBlocking(false);Selector selector = Selector.open();socketChannel.register(selector, SelectionKey.OP_READ);// 监听服务端发来得消息new Thread(new ClientThread(selector)).start();// 监听用户输入Scanner scanner = new Scanner(System.in);while (scanner.hasNextLine()) {String message = scanner.next();if (StringUtils.hasText(message)) {socketChannel.write(StandardCharsets.UTF_8.encode(name + ": " + message));}}}private class ClientThread implements Runnable {private final Logger logger = LoggerFactory.getLogger(ClientThread.class);private final Selector selector;public ClientThread(Selector selector) {this.selector = selector;}@Overridepublic void run() {try {while (true) {int channels = selector.select();if (channels == 0) {continue;}Set selectionKeySet = selector.selectedKeys();Iterator keyIterator = selectionKeySet.iterator();while (keyIterator.hasNext()) {SelectionKey selectionKey = keyIterator.next();// 移除集合当前得selectionKey,避免重复处理keyIterator.remove();if (selectionKey.isReadable()) {handleRead(selector, selectionKey);}}}} catch (IOException e) {logger.error(e.getMessage(), e);}}}// 处理可读状态private void handleRead(Selector selector, SelectionKey selectionKey) throws IOException {SocketChannel channel = (SocketChannel) selectionKey.channel();ByteBuffer byteBuffer = ByteBuffer.allocate(1024);StringBuilder message = new StringBuilder();if (channel.read(byteBuffer) > 0) {byteBuffer.flip();message.append(StandardCharsets.UTF_8.decode(byteBuffer));}// 再次注册到选择器上,继续监听可读状态channel.register(selector, SelectionKey.OP_READ);System.out.println(message);}
}
  • start()
public static void main(String[] args) throws IOException {new ChatClient().start("张三");
}

相关内容

热门资讯

青海:“五一”假期推出60余场... 中新网西宁5月6日电 (潘雨洁)记者6日从青海省商务厅获悉,“五一”假期,青海省各级商务部门、金融机...
银行业竞争逻辑正在经历一场变革 证券时报记者 马传茂 步入低利率时代,银行业关于“规模情结”的讨论持续发酵。一个耐人寻味的现象是:银...
基本面与新技术共振,锂电新周期... 5月国内电池排产达172.4GWh,环比增8%,同比猛增65%。另一边,钠电逻辑迎来里程碑:宁德时代...
曾是OpenAI前董事会成员!... 快科技5月7日消息,据媒体报道,日前,马斯克起诉了他的两位OpenAI联合创始人——CEO萨姆·奥特...
监护仪警报后,我们在做什么 杨明明 河北医科大学第一医院 在医院的病房、急诊室和ICU,监护仪的“滴滴”警报声是最常见的声音。很...
原创 美... 美股三大指数昨晚(当地时间5月5日)集体收涨,标普500和纳斯达克双双再创历史新高。 最惊人的一幕发...
华泰证券:地产板块估值筑底、配... 华泰证券研报指出,深圳、广州、武汉、苏州、济南等核心城市相继出台楼市优化政策,从公积金放宽、以旧换新...
第四批全国中成药联盟采购开标 ... 4月30日,第四批全国中成药联盟采购(以下简称“全国联采”)在武汉开标产生拟中选结果。本次集采纳入2...
跟踪800自由现金流的ETF有... 随着A股市场从估值修复转向盈利驱动,投资者对企业"真金白银"创造能力的关注度显著提升。自由现金流策略...
金价强势反弹,投资者怎么操作? 经历持续阴跌后,黄金市场迎来一轮强势反弹行情。 5月6日,国际金价短暂震荡后强势拉升。截至记者发稿,...
A股5月“开门红” 两市成交额... 科创50日K线图   张大伟 制图 ◎记者 费天元 5月首个交易日,A股主要股指全线上攻,盘面热点延...
首发|又一个核聚变独角兽:星环... 投资界获悉,星环聚能完成5亿元人民币A+轮融资,投资方阵容依旧豪华:包括达晨财智、金浦投资、上海申能...
为何翻倍提高CPU市场展望?苏... 财联社5月7日讯(编辑 史正丞)随着超威半导体(AMD)的最新财报再度唤起市场对CPU需求周期的关注...
“1页纸”让欧美亚股市大涨,油... 新华社援引美国媒体5月6日报称道,两名美国官员及另外两名知情人士透露,白宫认为,与伊朗接近达成一份一...
拟上市企业股权激励的注意事项 在企业的发展进程中,拟上市企业的股权激励是一个至关重要的环节。它不仅能够吸引和留住核心人才,还能激发...
5月7日每日研选丨基本面与新技... 5月国内电池排产达172.4GWh,环比增8%,同比猛增65%。另一边,钠电逻辑迎来里程碑:宁德时代...
美股收盘:纳指、标普再创新高 ... 财联社5月7日讯(编辑 史正丞)昨夜今晨,随着AI相关资产接力暴涨,标普500指数和纳斯达克指数连续...
美联储古尔斯比就通胀与消费者行... 芝加哥联邦储备银行行长奥斯滕・古尔斯比对通胀形势发出谨慎警示:美国通胀不仅未能持续回落至美联储 2%...
原创 北... 实验室里长出的,不只是论文还有独角兽。 又是一年五四,北大迎来了128岁生日。 未名湖畔的故事讲了...