netty学习笔记

Channel

底层网络传输 API 必须提供给应用 I/O操作的接口,如读,写,连接,绑定等等。

ChannelHandler

业务逻辑主要存活于此,主要子接口分别有ChannelInboundHandler,ChannelOutboundHandler。ChannelInboundHandler入站时回调主要处理入站逻辑,主要应用于解码器,获取客户端发送的数据等逻辑。相反ChannelOutboundHandler出站时回调处理出战逻辑,主要应用于编码逻辑,发送数据给客户端等逻辑。

ChannelInboundHandler

方法 描述
channelUnregistered channel从EventLoop注销时回调方法
channelRegistered 注册到EventLoop时回调方法
channelActive channel连接到远程端口可以接受和发送数据了时的回调方法
channelInactive channel 未连接到远程端口,或者连接中断回调
channelReadComplete channel读操作完成时回调
channelRead 数据准备好刻时回调
channelWritabilityChanged 当channel的可写状态发生改变时回调。用户可以确保写入速度不会太快(存在OutOfMemoryError的风险),或者可以在Channel可再次写入时恢复写入。Channel.isWritable()可用于检测通道的实际可写性。可写性的阈值可以通过Channel.config()。setWriteHighWaterMark()和Channel.config()。setWriteLowWaterMark()来设置。
userEventTriggered 当用户调用了ChannelHandlerContext.fireUserEventTriggered回调(不会在当前的handler生效,后面的handler才会生效)

注意⚠️

1.在 inBoundHandler 中调用了write ()方法,会直接把任务,交给该inBoundHandler 在ChannelPipeline中上面的outBoundHandler处理。

2.使用ReferenceCountUtil.release() 来丢弃收到的信息。

总结:如果消息是被 消耗/丢弃 并不会被传入下个 ChannelPipeline 的 ChannelOutboundHandler 或者ChannelInboundHandler,调用 ReferenceCountUtil.release(message) 。一旦消息经过实际的传输,在消息被写或者 Channel 关闭时,它将会自动释放。

ChannelOutboundHandler

方法 描述
bind 请求绑定channel到本地地址回调
connect channel连接到远程地址回调
disconnect channel从远程服务器上断开回调
close 关闭channel回调
deregister 取消channel在eventloop上的注册回调
read 在channel中读数据回调
flush flush数据到远程服务器
write 写数据到远程服务器

ChannelPipeline

ChannelHandler的容器,当Channel 创建时自动创建。ChannelHandler执行的顺序与添加到ChannelPipeline的顺序有关。具体就是ChannelInboundHandler顺序的,ChannelOutboundHandler时逆序的。

ChannelHandlerContext

接口 ChannelHandlerContext 代表 ChannelHandler 和ChannelPipeline 之间的关联,并在 ChannelHandler 添加到 ChannelPipeline 时创建一个实例。ChannelHandlerContext 的主要功能是管理通过同一个 ChannelPipeline 关联的 ChannelHandler 之间的交互。

EventLoop&EventLoopGroup

用于处理channel i/o 操作的的线程

1.所有 EventLoop 从 EventLoopGroup 分配。每个新的 channel 将会获得新的 EventLoop

2.EventLoop 分配给 channel 用于执行所有事件和任务

3.Channel 绑定到 EventLoop。一个 channel 属于一个连接

ChannelFuture

类似jdk中的future,但比jdk中的future 强大,可以调用addListener 方法添加channel完成io操作时的回调。

类似jdk中的future,但比jdk中的future 强大,可以调用addListener 方法添加channel完成io操作时的回调。

解码器

解码器是一种 ChannelInboundHandler 的抽象实现。

主要分两类:

  • 解码字节到消息(ByteToMessageDecoder 和 ReplayingDecoder)
  • 解码消息到消息(MessageToMessageDecoder)

只需要实现decode ,decodeLast方法

编码器

2种类型:

message到message的编码(MessageToMessageEncoder)

message到byte的编码(MessageToByteEncoder)

实现encode方法

解编码器

2种类型:

ByteToMessageCodec:message到byte的编解码

MessageToMessageCodec:message到message的编解码

实现encode,decode ,decodeLast方法

CombinedChannelDuplexHandler

结合解码器和编码器在一起可能会牺牲可重用性。为了避免这种方式,并且部署一个解码器和编码器到 ChannelPipeline 作为逻辑单元而不失便利性。关键是下面的类:

1
public class CombinedChannelDuplexHandler<I extends ChannelInboundHandler,O extends ChannelOutboundHandler>

空闲连接以及超时

IdleStateHandler:如果连接闲置时间过长,则会触发 IdleStateEvent 事件。在 ChannelInboundHandler 中可以覆盖 userEventTriggered(…) 方法来处理 IdleStateEvent。

ReadTimeoutHandler:在指定的时间间隔内没有接收到入站数据则会抛出 ReadTimeoutException 并关闭 Channel。ReadTimeoutException 可以通过覆盖 ChannelHandler 的 exceptionCaught(…) 方法检测到。

WriteTimeoutHandler:WriteTimeoutException 可以通过覆盖 ChannelHandler 的 exceptionCaught(…) 方法检测到。

解码分隔符和基于长度的协议

分隔符协议

DelimiterBasedFrameDecoder:接收ByteBuf由一个或多个分隔符拆分,如NUL或换行符

LineBasedFrameDecoder:接收ByteBuf以分割线结束,如”\n”和”\r\n”

基于长度的协议

FixedLengthFrameDecoder:提取固定长度

LengthFieldBasedFrameDecoder:读取头部长度并提取帧的长度

Bootstrap

用于引导客户端,以及无连接协议(udp)

下表是 Bootstrap 的常用方法,其中很多是继承自 AbstractBootstrap。

方法 描述
group 设置 EventLoopGroup 用于处理所有的 Channel 的事件
channel channelFactory channel() 指定 Channel 的实现类。如果类没有提供一个默认的构造函数,你可以调用 channelFactory() 来指定一个工厂类被 bind() 调用。
localAddress 指定应该绑定到本地地址 Channel。如果不提供,将由操作系统创建一个随机的。或者,您可以使用 bind() 或 connect()指定localAddress
option 设置 ChannelOption 应用于 新创建 Channel 的 ChannelConfig。这些选项将被 bind 或 connect 设置在通道,这取决于哪个被首先调用。这个方法在创建管道后没有影响。所支持 ChannelOption 取决于使用的管道类型。
attr 这些选项将被 bind 或 connect 设置在通道,这取决于哪个被首先调用。这个方法在创建管道后没有影响。
handler 设置添加到 ChannelPipeline 中的 ChannelHandler 接收事件通知。
clone 创建一个当前 Bootstrap的克隆拥有原来相同的设置。
remoteAddress 设置远程地址。此外,您可以通过 connect() 指定
connect 连接到远端,返回一个 ChannelFuture, 用于通知连接操作完成
bind 将通道绑定并返回一个 ChannelFuture,用于通知绑定操作完成后,必须调用 Channel.connect() 来建立连接。

如何引导客户端

Bootstrap 类负责创建管道给客户或应用程序,利用无连接协议和在调用 bind() 或 connect() 之后。下面演示了引导客户端,使用的是 NIO TCP 传输

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap(); //1
bootstrap.group(group) //2
.channel(NioSocketChannel.class) //3
.handler(new SimpleChannelInboundHandler<ByteBuf>() { //4
@Override
protected void channeRead0(
ChannelHandlerContext channelHandlerContext,
ByteBuf byteBuf) throws Exception {
System.out.println("Received data");
byteBuf.clear();
}
});
ChannelFuture future = bootstrap.connect(
new InetSocketAddress("www.manning.com", 80)); //5
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture)
throws Exception {
if (channelFuture.isSuccess()) {
System.out.println("Connection established");
} else {
System.err.println("Connection attempt failed");
channelFuture.cause().printStackTrace();
}
}
});

ServerBootstrap

下表显示了 ServerBootstrap 的方法

方法 描述
group 设置 EventLoopGroup 用于 ServerBootstrap。这个 EventLoopGroup 提供 ServerChannel 的 I/O 并且接收 Channel
channel
ChannelFactory
channel() 指定 Channel 的实现类。如果管道没有提供一个默认的构造函数,你可以提供一个 ChannelFactory。
localAddress 指定 ServerChannel 实例化的类。如果不提供,将由操作系统创建一个随机的。或者,您可以使用 bind() 或 connect()指定localAddress
option 指定一个 ChannelOption 来用于新创建的 ServerChannel 的 ChannelConfig 。这些选项将被设置在管道的 bind() 或 connect(),这取决于谁首先被调用。在此调用这些方法之后设置或更改 ChannelOption 是无效的。所支持 ChannelOption 取决于使用的管道类型。
childOption 当管道已被接受,指定一个 ChannelOption 应用于 Channel 的 ChannelConfig。
attr 指定 ServerChannel 的属性。这些属性可以被 管道的 bind() 设置。当调用 bind() 之后,修改它们不会生效。
childAttr 应用属性到接收到的管道上。后续调用没有效果。
handler 设置添加到 ServerChannel 的 ChannelPipeline 中的 ChannelHandler。 具体详见 childHandler() 描述
childHandler 设置添加到接收到的 Channel 的 ChannelPipeline 中的 ChannelHandler。handler() 和 childHandler()之间的区别是前者是接收和处理ServerChannel,同时 childHandler() 添加处理器用于处理和接收 Channel。后者代表一个套接字绑定到一个远端。
clone 克隆 ServerBootstrap 用于连接到不同的远端,通过设置相同的原始 ServerBoostrap。
bind 绑定 ServerChannel 并且返回一个 ChannelFuture,用于 通知连接操作完成了(结果可以是成功或者失败)

如何引导一个服务器

ServerBootstrap 中的 childHandler(), childAttr() 和 childOption() 是常用的服务器应用的操作。具体来说,ServerChannel实现负责创建子 Channel,它代表接受连接。因此 引导 ServerChannel 的 ServerBootstrap ,提供这些方法来简化接收的 Channel 对 ChannelConfig 应用设置的任务。

ServerChannel 创建 ServerBootstrap 在 bind(),后者管理大量的子 Channel。

1.当调用 bind() 后 ServerBootstrap 将创建一个新的管道,这个管道将会在绑定成功后接收子管道

2.接收新连接给每个子管道

3.接收连接的 Channel

child* 的方法都是操作在子的 Channel,被 ServerChannel 管理。

ServerBootstrap 时会创建一个 NioServerSocketChannel实例 bind() 。这个 NioServerChannel 负责接受新连接和创建NioSocketChannel 实例。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
public class HttpServer {
public static void startServer(int port) {

// 创建两个线程组 用于处理网路事件
// 一个用来接收客户端的连接
// 一个用来进行SocketChannel 的网络读写
@Cleanup(value = "shutdownGracefully") EventLoopGroup bossGroup = new NioEventLoopGroup();
@Cleanup(value = "shutdownGracefully") EventLoopGroup workGroup = new NioEventLoopGroup();
//辅助启动类
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap
.channel(NioServerSocketChannel.class)
//注册两个线程组
.group(bossGroup, workGroup)
//tcp 属性,不能处理的的请求放入队队列的队列大小
.option(ChannelOption.SO_BACKLOG, 1024)
//tcp 属性 长连接
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {

ch.pipeline().addLast("encoder", new HttpResponseEncoder());
// ChannelInboundHandlerAdapter 必须在 out之后, in 执行顺序为 顺序,out 为倒叙
ch.pipeline().addLast(new HttpOutHandler());
ch.pipeline().addLast(new HttpOutHandler2());
ch.pipeline().addLast("decoder", new HttpRequestDecoder());
//http 请求片段整合得到完整的FullHttpRequest
ch.pipeline().addLast("aggregator", new HttpObjectAggregator(10 * 1024 * 1024));
ch.pipeline().addLast(new HttpInHandler());
ch.pipeline().addLast(new HttpInHandler2());
ch.pipeline().addLast(new HttpInHandler3());
}
});

try {
//阻塞代码直到绑定成功
ChannelFuture future = serverBootstrap.bind(port).sync();
//阻塞代码直到连接关闭
future.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
}
}

// public static void main(String[] args) {
// startServer(8085);
// }
}

单元测试

1
2
3
4
5
6
7
@Test
public void test() {
EmbeddedChannel channel = new EmbeddedChannel(new HttpInHandler());
//写一个入站消息
System.out.println(channel.writeInbound(new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET,"/test")));

}
方法 描述
writeInbound 写一个入站消息到 EmbeddedChannel。 如果数据能从 EmbeddedChannel 通过 readInbound() 读到,则返回 true
readInbound 从 EmbeddedChannel 读到入站消息。任何返回遍历整个ChannelPipeline。如果读取还没有准备,则此方法返回 null
writeOutbound 写一个出站消息到 EmbeddedChannel。 如果数据能从 EmbeddedChannel 通过 readOutbound() 读到,则返回 true
readOutbound 从 EmbeddedChannel 读到出站消息。任何返回遍历整个ChannelPipeline。如果读取还没有准备,则此方法返回 null
Finish 如果从入站或者出站中能读到数据,标记 EmbeddedChannel 完成并且返回。这同时会调用 EmbeddedChannel 的关闭方法