Netty快速入门

Java NIO 核心概念


Linux五种I/O模型比较

Socket通信模型

Java NIO Buffer

一个Buffer本质上是内存中的一块, 可以将数据写入这块内存, 从这块内存获取数据

java.nio 定义了以下几个Buffer的实现:

Java NIO Buffer三大核心概念:positionlimitcapacity

  • 最好理解的当然是 capacity,它代表这个缓冲区的容量,一旦设定就不可以更改。比如 capacity1024IntBuffer,代表其一次可以存放 1024int 类型的值。

  • 一旦 Buffer 的容量达到 capacity,需要清空 Buffer,才能重新写入值

  • 从写操作模式到读操作模式切换的时候(flip),position 都会归零,这样就可以从头开始读写了。

  • 写操作模式下,limit 代表的是最大能写入的数据,这个时候 limit 等于 capacity

  • 写结束后,切换到读模式,此时的 limit 等于 Buffer 中实际的数据大小,因为 Buffer 不一定被写满了

DirectByteBuffer & HeapByteBuffer

DirectByteBuffer HeapByteBuffer
描述 底层存储在非JVM堆上,通过native代码操作 -神器:MaxDirectMemorySize= 标准java类,维护一份byte[]在JVM堆上
创建开销
存储位置 Native Heap Java Heap
数据拷贝 无需临时缓冲区做拷贝 拷贝到临时DirectByteBuffer,但临时缓冲区使用缓存, 聚集写/发散读时没有缓存临时缓冲区
GC影响 每次创建或者释放的时候都调用一次System.gc() java垃圾回收机制自动回收

JAVA NIO Channel

所有的NIO操作始于通道,通道是数据来源或数据写入的目的地java.nio 包中主要实现的以下几个 Channel:

  • FileChannel:文件通道,用于文件的读和写
  • DatagramChannel:用于 UDP 连接的接收和发送
  • SocketChannel:把它理解为 TCP 连接通道,简单理解就是 TCP 客户端
  • ServerSocketChannel:TCP 对应的服务端,用于监听某个端口进来的请求

Java NIO Selector

  • java.nio.channels.Selector

  • 支持IO多路复用的抽象实体

  • 注册Selectable Channel

  • SelectionKey —— 表示Selector和被注册的channel之间关系的一份凭证

    • SelectionKey保存channel感兴趣的事件
  • Selector.select 更新所有就绪的 SelectionKey 的状态, 并返回就绪的channel个数

    • 迭代Selected Key集合并处理就绪channel

Selector基本操作

  • 创建Selector
1
Selector selector = Selector.open();
  • 注册Channel到Selector
1
2
3
SocketChannel channel = SocketChannel.open();
channel.configureBlocking(false);
SelectionKey key = channel.register(selector, SelectionKey.OP_READ);
  • register的第二个参数是一个“关注集合”,代表关注的channel状态,有四种基础类型可供监听, 用SelectionKey中的常量表示如下:
1
2
3
4
SelectionKey.OP_CONNECT
SelectionKey.OP_ACCEPT
SelectionKey.OP_READ
SelectionKey.OP_WRITE
  • 从Selector中选择channel
    一旦向Selector注册了一个或多个channel后,就可以调用select来获取channel, select()方法会返回所有处于就绪状态的channel, select方法具体如下:
1
2
3
int select()
int select(long timeout)
int selectNow()

select()方法的返回值是一个int,代表有多少channel处于就绪了。也就是自上一次select后有多少channel进入就绪。

  • selectedKeys()
    在调用select并返回了有channel就绪之后,可以通过选中的key集合来获取channel,这个操作通过调用selectedKeys()方法:
1
Set<SelectionKey> selectedKeys = selector.selectedKeys();
  • Selector编程模板
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
Set<SelectionKey> selectedKeys = selector.selectedKeys();

Iterator<SelectionKey> keyIterator = selectedKeys.iterator();

while(keyIterator.hasNext()) {

SelectionKey key = keyIterator.next();

if(key.isAcceptable()) {
// a connection was accepted by a ServerSocketChannel.

} else if (key.isConnectable()) {
// a connection was established with a remote server.

} else if (key.isReadable()) {
// a channel is ready for reading

} else if (key.isWritable()) {
// a channel is ready for writing
}

keyIterator.remove();
}

Netty核心概念

Netty 提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。


Netty主要组件

  • Netty Server启动主要流程

    • 设置服务端ServerBootStrap启动参数

      1
      2
      3
      4
      group(parentGroup, childGroup):
      channel(NioServerSocketChannel): 设置通道类型
      handler():设置NioServerSocketChannel的ChannelHandlerPipeline
      childHandler(): 设置NioSocketChannel的ChannelHandlerPipeline
    • 通过ServerBootStrap的bind方法启动服务端,bind方法会在parentGroup中注册NioServerScoketChannel,监听客户端的连接请求

      • 会创建一个NioServerSocketChannel实例,并将其在parentGroup中进行注册
  • Netty Server执行主要流程

    • Client发起连接CONNECT请求,parentGroup中的NioEventLoop不断轮循是否有新的客户端请求,如果有,ACCEPT事件触发

    • ACCEPT事件触发后,parentGroup中NioEventLoop会通过NioServerSocketChannel获取到对应的代表客户端的NioSocketChannel,并将其注册到childGroup中

    • childGroup中的NioEventLoop不断检测自己管理的NioSocketChannel是否有读写事件准备好,如果有的话,调用对应的ChannelHandler进行处理

Netty EventLoop

  • EventLoopGroup

    • 包括多个EventLoop
    • 多个EventLoop之间不交互
  • EventLoop

    • 每个EventLoop对应一个线程
    • 所有连接(channel)都将注册到一个EventLoop,并且只注册到一个,整个生命周期中都不会变化
    • 每个EventLoop管理着多个连接(channel)
    • EventLoop来处理连接(Channel)上的读写事件
  • ServerBootstrap

    • 包括2个不同类型的EventLoopGroup:
      • Parent EventLoop: 负责处理Accept事件,接收请求
      • Child EventLoop:负责处理读写事件

EventExecutor视图

  1. EventExecutorGroup里面有一个EventExecutor数组,保存了多个EventExecutor;
    1. EventExecutorGroup是不干什么事情的,当收到一个请后,他就调用next()获得一个它里面的EventExecutor,再调用这个executor的方法;
    2. next(): EventExecutorChooser.next()定义选择EventExecutor的策略;

ByteBuf类型

  • 根据内存的位置

    • HeapByteBuf

      • 基于数组- 内部为一个字节数组 (byte array)
      • hasArray()返回True
      • array()返回其内部的数组,可以对数组进行直接操作
    • DirectByteBuf

      • 堆外内存
      • 具有更好的性能
      • 创建和释放开销更大
  • 根据是否使用内存池

    • Pooled vs Unpooled
  • 根据是否使用Unsafe操作(Unsafe)

    • Safe vs Unsafe

复合缓冲区(CompositeByteBuf)

  • 多个ByteBuf组合的视图
  • 一个ByteBuf列表,可动态的添加和删除其中的 ByteBuf
  • 可能既包含堆缓冲区,也包含直接缓冲区

ByteBuf分配

不直接通过new来创建,而是通过ByteBufAllocator来创建

  • UnpooledByteBufAllocator
  • PooledByteBufAllocator

ChannelHandler

业务处理核心逻辑,用户自定义, Netty 提供2个重要的 ChannelHandler 子接口:

  • ChannelInboundHandler - 处理进站数据和所有状态更改事件
  • ChannelOutboundHandler - 处理出站数据,允许拦截各种操作

ChannelPipline

ChannelPipeline是ChannelHandler容器

  • 包括一系列的ChannelHandler实例,用于拦截流经一个 Channel 的入站和出站事件

  • 每个Channel都有一个其ChannelPipeline

  • 可以修改 ChannelPipeline 通过动态添加和删除 ChannelHandler

  • 定义了丰富的API调用来回应入站和出站事件

ChannelHandlerContext

ChannelHandlerContext表示 ChannelHandler 和 ChannelPipeline 之间的关联,在 ChannelHandler 添加到 ChannelPipeline 时创建

Netty线程模型


Reactor模式 - Doug Lea

  • 单线程Reactor
    -w822
  • 多线程Reactor

所有逻辑都在I/O线程中完成,不开启单独线程。图中对应的TheadPool是在io处理handler中额外开启的业务线程池。

  • Multiple Reactor
    -w735

Netty与Reactor模式

  • 单线程Reactor
1
2
3
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
ServerBootStrap bootStrap = new ServerBootStrap();
bootStrap.group(bossGroup, bossGroup); // 监听和处理都由一个线程完成
  • 多线程Reactor
1
2
3
4
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
ServerBootStrap bootStrap = new ServerBootStrap();
bootStrap.group(bossGroup, bossGroup); // 监听和处理都由一个线程完成
// 在handler中额外使用线程池处理业务
  • Multiple Reactor
1
2
3
4
5
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootStrap bootStrap = new ServerBootStrap();
bootStrap.group(bossGroup, workerGroup);
// 在handler中额外使用线程池处理业务

Boss EventLoopGroup

Worker EventLoopGroup

Netty Start Process

ServerBootStrap

Netty编码解码


半包粘包问题

  • TCP/IP协议
    • 面向“流”协议
    • MSS: Maxitum Segment Size 最大分段大小,表示TCP数据包每次能够传输的最大数据分段
    • 发送方/接收方缓冲区 (Nagle算法)

解决思路

  • 基本思路就是不断从TCP缓冲区中读取数据,每次读取完都需要判断是否是一个完整的数据包

    • 若当前读取的数据不足以拼接成一个完整的业务数据包,那就保留该数据,继续从tcp缓冲区中读取,直到得到一个完整的数据包

      • 定长
      • 分隔符
      • 基于长度的变长包
    • 若当前读到的数据加上已经读取的数据足够拼接成一个数据包,那就将已经读取的数据拼接上本次读取的数据,够成一个完整的业务数据包传递到业务逻辑,多余的数据仍然保留,以便和下次读到的数据尝试拼接

常用编码解码器

编码解码器的作用就是将原始的字节数据与自定义的消息对象进行互相转换,目前业界主流的序列化框架有:

  • ProtoBuf
  • Jboss Marshalling
  • Java Serialization

Netty常用的自带编解码器有:

  • LineBasedFrameDecoder(\n, \r\n)

    • 回车换行解码器
    • 配合StringDecoder
  • DelimiterBasedFrameDecoder

    • 分隔符解码器
  • FixedLengthFrameDecoder

    • 固定长度解码器
  • LengthFieldBasedFrameDecoder

    • 基于包头’不固定长度‘解码器(私有协议最常用)
    • 参数说明
      • maxFrameLength:包的最大长度
      • lengthFieldOffset:长度属性的起始位(偏移位),包中存放长度属性字段的起始位置
      • lengthFieldLength:长度属性的长度
      • lengthAdjustment:长度调节值,在总长被定义为包含包头长度时,修正信息长度
      • initialBytesToStrip:跳过的字节数,根据需要跳过lengthFieldLength个字节,以便接收端直接接受到不含“长度属性”的内容

Netty拆包的基类 - ByteToMessageDecoder

  • 内部维护了一个数据累积器cumulation,每次读取到数据都会不断累加,然后尝试对累加到的数据进行拆包,拆成一个完整的业务数据包

  • 每次都将读取到的数据通过内存拷贝的方式, 累积到cumulation

  • 调用子类的decode方法对累积的数据尝试进行拆包