发布于 ,更新于 

Netty实战

1. 什么是Java BIO

1.0 BIO、NIO、AIO 使用场景分析

  1. BIO 方式适用于连接数目比较小且固定的架构,这种方式对服务器资源要求比较高,并发局限于应用中,JDK1.4 以前的唯一选择,但程序简单易理解。
  2. NIO 方式适用于连接数目多且连接比较短(轻操作)的架构,比如聊天服务器,弹幕系统,服务器间通讯等。编程比较复杂,JDK1.4 开始支持。
  3. AIO 方式使用于连接数目多且连接比较长(重操作)的架构,比如相册服务器,充分调用 OS 参与并发操作,编程比较复杂,JDK7 开始支持。

1.1 Java BIO 基本介绍

Java BIO:同步并阻塞(传统阻塞型),服务器实现模式为一个连接一个线程,即客户端有连接请求时服务器端就需要启动一个线程进行处理,如果这个连接不做任何事情会造成不必要的线程开销。【简单示意图】

1.2 Java BIO 问题

  1. 每个请求都需要创建独立的线程,与对应的客户端进行数据 Read,业务处理,数据 Write
  2. 当并发数较大时,需要创建大量线程来处理连接,系统资源占用较大。
  3. 连接建立后,如果当前线程暂时没有数据可读,则线程就阻塞在 Read 操作上,造成线程资源浪费。

2. 什么是Java NIO

1.2 Java NIO 基本介绍

  1. Java NIO 全称 Java non-blocking IO ,是指 JDK 提供的新 API。从 JDK1.4 开始,Java 提供了一系列改进的输入/输出的新特性,被统称为 NIO(即 NewIO),是同步非阻塞的。
  2. NIO 相关类都被放在 java.nio 包及子包下,并且对原 java.io 包中的很多类进行改写。【基本案例】
  3. NIO 有三大核心部分: Channel(通道)、Buffer(缓冲区)、Selector(选择器)
  4. NIO面向缓冲区,或者面向块编程的。数据读取到一个它稍后处理的缓冲区,需要时可在缓冲区中前后移动,这就增加了处理过程中的灵活性,使用它可以提供非阻塞式的高伸缩性网络。
  5. Java NIO 的非阻塞模式,使一个线程从某通道发送请求或者读取数据,但是它仅能得到目前可用的数据,如果目前没有数据可用时,就什么都不会获取,而不是保持线程阻塞,所以直至数据变的可以读取之前,该线程可以继续做其他的事情。非阻塞写也是如此,一个线程请求写入一些数据到某通道,但不需要等待它完全写入,这个线程同时可以去做别的事情。【后面有案例说明】
  6. 通俗理解:NIO 是可以做到用一个线程来处理多个操作的。假设有 10000 个请求过来,根据实际情况,可以分配 50 或者 100 个线程来处理。不像之前的阻塞 IO 那样,非得分配 10000 个。
  7. HTTP 2.0 使用了多路复用的技术,做到同一个连接并发处理多个请求,而且并发请求的数量比 HTTP 1.1 大了好几个数量级。

1.3 NIO 与零拷贝

3. 什么是Netty

3.1 原生 NIO 存在的问题

  1. NIO 的类库和 API 繁杂,使用麻烦:需要熟练掌握 SelectorServerSocketChannelSocketChannelByteBuffer等。
  2. 需要具备其他的额外技能:要熟悉 Java 多线程编程,因为 NIO 编程涉及到 Reactor 模式,你必须对多线程和网络编程非常熟悉,才能编写出高质量的 NIO 程序。
  3. 开发工作量和难度都非常大:例如客户端面临断连重连、网络闪断、半包读写、失败缓存、网络拥塞和异常流的处理等等。4. JDK NIOBug:例如臭名昭著的 Epoll Bug,它会导致 Selector 空轮询,最终导致 CPU100%。直到 JDK1.7 版本该问题仍旧存在,没有被根本解决。

3.2 Netty 的优点

NettyJDK 自带的 NIOAPI 进行了封装,解决了上述问题。

  1. 设计优雅:适用于各种传输类型的统一 API 阻塞和非阻塞 Socket;基于灵活且可扩展的事件模型,可以清晰地分离关注点;高度可定制的线程模型-单线程,一个或多个线程池。
  2. 使用方便:详细记录的 Javadoc,用户指南和示例;没有其他依赖项,JDK5(Netty3.x)6(Netty4.x)就足够了。
  3. 高性能、吞吐量更高:延迟更低;减少资源消耗;最小化不必要的内存复制。
  4. 安全:完整的 SSL/TLSStartTLS 支持。
  5. 社区活跃、不断更新:社区活跃,版本迭代周期短,发现的 Bug 可以被及时修复,同时,更多的新功能会被加入。

3.3 Netty 模型结构

3.3.1 传统阻塞 I/O 服务模型

3.3.2 Reactor 模式

3.3.3 单 Reactor 单线程

  1. 优点:模型简单,没有多线程、进程通信、竞争的问题,全部都在一个线程中完成
  2. 缺点:性能问题,只有一个线程,无法完全发挥多核 CPU 的性能。Handler在处理某个连接上的业务时,整个进程无法处理其他连接事件,很容易导致性能瓶颈
  3. 缺点:可靠性问题,线程意外终止,或者进入死循环,会导致整个系统通信模块不可用,不能接收和处理外部消息,造成节点故障
  4. 使用场景:客户端的数量有限,业务处理非常快速,比如 Redis 在业务处理的时间复杂度 O(1) 的情况

3.3.4 单 Reactor 多线程

  1. 优点:可以充分的利用多核 cpu 的处理能力
  2. 缺点:多线程数据共享和访问比较复杂,Reactor 处理所有的事件的监听和响应,在单线程运行,在高并发场景容易出现性能瓶颈。

3.3.5 主从 Reactor 多线程

  1. 优点:父线程与子线程的数据交互简单职责明确,父线程只需要接收新连接,子线程完成后续的业务处理。
  2. 优点:父线程与子线程的数据交互简单,Reactor 主线程只需要把新连接传给子线程,子线程无需返回数据。
  3. 缺点:编程复杂度较高
  4. 结合实例:这种模型在许多项目中广泛使用,包括 Nginx 主从 Reactor 多进程模型,Memcached 主从多线程,Netty 主从多线程模型的支持

3.3.6 Netty 模型

  1. Netty 抽象出两组线程池 BossGroup 专门负责接收客户端的连接,WorkerGroup 专门负责网络的读写
  2. BossGroupWorkerGroup 类型都是 NioEventLoopGroup
  3. NioEventLoopGroup 相当于一个事件循环组,这个组中含有多个事件循环,每一个事件循环是 NioEventLoop
  4. NioEventLoop 表示一个不断循环的执行处理任务的线程,每个 NioEventLoop 都有一个 Selector,用于监听绑定在其上的 socket 的网络通讯
  5. NioEventLoopGroup 可以有多个线程,即可以含有多个 NioEventLoop
  6. 每个BossNioEventLoop循环执行的步骤3步
    • 轮询 accept 事件
    • 处理 accept 事件,与 client 建立连接,生成 NioScocketChannel,并将其注册到某个 worker NIOEventLoop 上的 Selector
    • 处理任务队列的任务,即 runAllTasks
  7. 每个Worker NIOEventLoop循环执行的步骤
    • 轮询 readwrite 事件
    • 处理 I/O 事件,即 readwrite 事件,在对应 NioScocketChannel 处理
    • 处理任务队列的任务,即 runAllTasks
  8. 每个 Worker NIOEventLoop 处理业务时,会使用 pipeline(管道),pipeline 中包含了 channel,即通过 pipeline 可以获取到对应通道,管道中维护了很多的处理器

4. Netty实战

4.1 Server端

NettyRemotingServer服务端
NettyRemotingServer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242

package com.huafu.basecommon.netty;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.huafu.basecommon.netty.codec.NettyDecoder;
import com.huafu.basecommon.netty.codec.NettyEncoder;
import com.huafu.basecommon.netty.codec.SimpleProtocolDecoder;
import com.huafu.basecommon.netty.codec.SimpleProtocolEncoder;
import com.huafu.basecommon.netty.config.NettyServerConfig;
import com.huafu.basecommon.netty.enums.MsgType;
import com.huafu.basecommon.netty.exceptions.RemoteException;
import com.huafu.basecommon.netty.handler.NettyServerHandler;
import com.huafu.basecommon.netty.handler.SimpleServerHander;
import com.huafu.basecommon.netty.processor.NettyRequestProcessor;
import com.huafu.basecommon.netty.utils.Constants;
import com.huafu.basecommon.netty.utils.NettyUtils;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.epoll.Epoll;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.timeout.IdleStateHandler;
import lombok.extern.slf4j.Slf4j;

import java.nio.charset.StandardCharsets;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* remoting netty server
*/
@Slf4j
public class NettyRemotingServer {

/**
* server bootstrap
*/
private final ServerBootstrap serverBootstrap = new ServerBootstrap();

/**
* todo 这个默认的server解析消息用的线程池有点随意了,可能会线程溢出,生产环境最好改一下
* default executor
*/
private final ExecutorService defaultExecutor = Executors.newFixedThreadPool(Constants.CPUS);

/**
* boss group
*/
private final EventLoopGroup bossGroup;

/**
* worker group
*/
private final EventLoopGroup workGroup;

/**
* server config
*/
private final NettyServerConfig serverConfig;

/**
* server handler
*/
private final NettyServerHandler serverHandler = new NettyServerHandler(this);

/**
* started flag
*/
private final AtomicBoolean isStarted = new AtomicBoolean(false);

/**
* Netty server bind fail message
*/
private static final String NETTY_BIND_FAILURE_MSG = "NettyRemotingServer bind %s fail";

/**
* server init
* 设置BossGroup和WorkerGroup线程
* @param serverConfig server config
*/
public NettyRemotingServer(final NettyServerConfig serverConfig) {
this.serverConfig = serverConfig;
ThreadFactory bossThreadFactory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat("NettyServerBossThread_%s").build();
ThreadFactory workerThreadFactory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat("NettyServerWorkerThread_%s").build();
if (Epoll.isAvailable()) {
this.bossGroup = new EpollEventLoopGroup(1, bossThreadFactory);
this.workGroup = new EpollEventLoopGroup(serverConfig.getWorkerThread(), workerThreadFactory);
} else {
this.bossGroup = new NioEventLoopGroup(1, bossThreadFactory);
this.workGroup = new NioEventLoopGroup(serverConfig.getWorkerThread(), workerThreadFactory);
}
}

/**
* server start
*/
public void start() {
if (isStarted.compareAndSet(false, true)) {
this.serverBootstrap
.group(this.bossGroup, this.workGroup)
// 设置一个服务器端的通道实现
.channel(NettyUtils.getServerSocketChannelClass())
// 地址可以复用
.option(ChannelOption.SO_REUSEADDR, true)
// 设置TCP全连接队列大小, 线程队列维护的连接个数
.option(ChannelOption.SO_BACKLOG, serverConfig.getSoBacklog())
// 设置连接状态行为, 保持连接状态
.childOption(ChannelOption.SO_KEEPALIVE, serverConfig.isSoKeepalive())
// nagle算法会使我们某些较小的数据包造成延迟,因为为了提升效率,nagle会等到收集到一定数据后进行发送,这样可能造成我们消息的延迟。
// 可以通过如下方式设置,开启无延迟的配置:
.childOption(ChannelOption.TCP_NODELAY, serverConfig.isTcpNoDelay())
// TCP数据发送缓冲区大小,即TCP发送滑动窗口
.childOption(ChannelOption.SO_SNDBUF, serverConfig.getSendBufferSize())
// TCP数据接收缓冲区大小
.childOption(ChannelOption.SO_RCVBUF, serverConfig.getReceiveBufferSize())
// 该方法用来设置业务处理类(自定义的handler)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
// initNettyChannelByByte(ch);
// initNettyChannelBySimple(ch);
initNettyChannel(ch);
}
});

ChannelFuture future;
try {
future = serverBootstrap.bind(serverConfig.getListenPort()).sync();
} catch (Exception e) {
log.error("NettyRemotingServer bind fail {}, exit", e.getMessage(), e);
throw new RemoteException(String.format(NETTY_BIND_FAILURE_MSG, serverConfig.getListenPort()));
}

//给cf注册监听器,监控我们关心的事件
future.addListener((ChannelFutureListener) cf -> {
if (cf.isSuccess()) {
log.info("NettyRemotingServer bind success at port : {}", serverConfig.getListenPort());
} else if (cf.cause() != null) {
throw new RemoteException(String.format(NETTY_BIND_FAILURE_MSG, serverConfig.getListenPort()), future.cause());
} else {
throw new RemoteException(String.format(NETTY_BIND_FAILURE_MSG, serverConfig.getListenPort()));
}
});

// if (future.isSuccess()) {
// log.info("NettyRemotingServer bind success at port : {}", serverConfig.getListenPort());
// } else if (future.cause() != null) {
// throw new RemoteException(String.format(NETTY_BIND_FAILURE_MSG, serverConfig.getListenPort()), future.cause());
// } else {
// throw new RemoteException(String.format(NETTY_BIND_FAILURE_MSG, serverConfig.getListenPort()));
// }
}
}

/**
* init netty channel
* 复杂协议对象 + ReplayingDecoder解码器
* handler 执行顺序 :https://zhuanlan.zhihu.com/p/658141949
* @param ch socket channel
*/
private void initNettyChannel(SocketChannel ch) {
ch.pipeline()
.addLast("encoder", new NettyEncoder())
.addLast("decoder", new NettyDecoder())
//加入一个netty 提供 IdleStateHandler
/**
1. IdleStateHandler 是netty 提供的处理空闲状态的处理器
2. long readerIdleTime : 表示多长时间没有读, 就会发送一个心跳检测包检测是否连接
3. long writerIdleTime : 表示多长时间没有写, 就会发送一个心跳检测包检测是否连接
4. long allIdleTime : 表示多长时间没有读写, 就会发送一个心跳检测包检测是否连接
5. 文档说明
triggers an {@link IdleStateEvent} when a {@link Channel} has not performed
read, write, or both operation for a while.
6. 当 IdleStateEvent 触发后 , 就会传递给管道 的下一个handler去处理
通过调用(触发)下一个handler 的 userEventTiggered , 在该方法中去处理 IdleStateEvent(读空闲,写空闲,读写空闲)
*/
.addLast("server-idle-handle", new IdleStateHandler(0, 0, Constants.NETTY_SERVER_HEART_BEAT_TIME, TimeUnit.MILLISECONDS))
.addLast("handler", serverHandler);

}

/** 简单协议对象 + LengthFieldBasedFrameDecoder解码器 */
private void initNettyChannelBySimple(SocketChannel ch) {
ch.pipeline()
.addLast("encoder", new SimpleProtocolEncoder())
.addLast("decoder", new SimpleProtocolDecoder(1024, 0, 4, 0, 0, true))
.addLast("server-idle-handle", new IdleStateHandler(0, 0, Constants.NETTY_SERVER_HEART_BEAT_TIME, TimeUnit.MILLISECONDS))
.addLast("handler", new SimpleServerHander());

}

/** 直接接收字节流,会发生粘包问题 */
private void initNettyChannelByByte(SocketChannel ch) {
ch.pipeline()
.addLast("server-idle-handle", new IdleStateHandler(0, 0, Constants.NETTY_SERVER_HEART_BEAT_TIME, TimeUnit.MILLISECONDS))
.addLast(new SimpleChannelInboundHandler<ByteBuf>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
System.out.println("Server received: " + msg.toString(StandardCharsets.UTF_8));
ctx.writeAndFlush(Unpooled.copiedBuffer("hello client", StandardCharsets.UTF_8));
}
});
}

public void registerProcessor(final MsgType msgType, final NettyRequestProcessor processor) {
this.registerProcessor(msgType, processor, null);
}

public void registerProcessor(final MsgType msgType, final NettyRequestProcessor processor, final ExecutorService executor) {
this.serverHandler.registerProcessor(msgType, processor, executor);
}

/**
* get default thread executor
*/
public ExecutorService getDefaultExecutor() {
return defaultExecutor;
}

public void close() {
if (isStarted.compareAndSet(true, false)) {
try {
if (bossGroup != null) {
this.bossGroup.shutdownGracefully();
}
if (workGroup != null) {
this.workGroup.shutdownGracefully();
}
defaultExecutor.shutdown();
} catch (Exception ex) {
log.error("netty server close exception", ex);
}
log.info("netty server closed");
}
}
}
NettyServerHandler服务端消息读取处理器
NettyServerHandler
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
package com.huafu.basecommon.netty.handler;

import com.huafu.basecommon.netty.NettyRemotingServer;
import com.huafu.basecommon.netty.enums.MsgType;
import com.huafu.basecommon.netty.processor.NettyRequestProcessor;
import com.huafu.basecommon.netty.protocol.MsgProtocol;
import com.huafu.basecommon.netty.utils.ChannelUtils;
import com.huafu.basecommon.netty.utils.Pair;
import io.netty.channel.*;
import io.netty.handler.timeout.IdleStateEvent;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;


/**
* netty server request handler
* @ChannelHandler.Sharable 这个注解可以理解为单例模式,共享NettyServerHandler中变量,所以需要注意线程安全
* @author zhaoyuzhe
*/
@ChannelHandler.Sharable
@Slf4j
public class NettyServerHandler extends ChannelInboundHandlerAdapter {

/**
* netty remote server
*/
private final NettyRemotingServer nettyRemotingServer;

/**
* server processors queue
*/
private final ConcurrentHashMap<MsgType, Pair<NettyRequestProcessor, ExecutorService>> processors = new ConcurrentHashMap<>();

public NettyServerHandler(NettyRemotingServer nettyRemotingServer) {
this.nettyRemotingServer = nettyRemotingServer;
}

/**
* When the current channel is not active,
* the current channel has reached the end of its life cycle
*
* @param ctx channel handler context
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) {
ctx.channel().close();
}

/**
* 读取消息
* The current channel reads data from the remote end
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
processReceived(ctx.channel(), (MsgProtocol) msg);
}

/**
* 按类型注册处理器
* register processor
*/
public void registerProcessor(final MsgType msgType, final NettyRequestProcessor processor) {
this.registerProcessor(msgType, processor, null);
}

/**
* 按类型注册处理器
* register processor
*/
public void registerProcessor(final MsgType msgType, final NettyRequestProcessor processor, final ExecutorService executor) {
ExecutorService executorRef = executor;
if (executorRef == null) {
executorRef = nettyRemotingServer.getDefaultExecutor();
}
this.processors.putIfAbsent(msgType, new Pair<>(processor, executorRef));
}

/**
* process received logic
*/
private void processReceived(final Channel channel, final MsgProtocol msg) {
final MsgType msgType = msg.getType();
// 接收到的如果是心跳, 则不做处理
if (MsgType.HEART_BEAT == msgType) {
if (log.isDebugEnabled()) {
log.debug("server receive heart beat from: host: {}", ChannelUtils.getRemoteAddress(channel));
}
return;
}
final Pair<NettyRequestProcessor, ExecutorService> pair = processors.get(msgType);
if (pair != null) {
Runnable r = () -> {
try {
pair.getLeft().process(channel, msg);
} catch (Exception ex) {
log.error("process msg {} error", msg, ex);
}
};
try {
pair.getRight().submit(r);
} catch (RejectedExecutionException e) {
log.warn("thread pool is full, discard msg {} from {}", msg, ChannelUtils.getRemoteAddress(channel));
}
} else {
log.warn("commandType {} not support", msgType);
}
}

/**
* caught exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
log.error("exceptionCaught : {}", cause.getMessage(), cause);
ctx.channel().close();
}

/**
* channel write changed
*/
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
Channel ch = ctx.channel();
ChannelConfig config = ch.config();

if (!ch.isWritable()) {
if (log.isWarnEnabled()) {
log.warn("{} is not writable, over high water level : {}",
ch, config.getWriteBufferHighWaterMark());
}

config.setAutoRead(false);
} else {
if (log.isWarnEnabled()) {
log.warn("{} is writable, to low water : {}",
ch, config.getWriteBufferLowWaterMark());
}
config.setAutoRead(true);
}
}

/**
* 长时间没有读写消息后 关闭channel
* @param ctx
* @param evt
* @throws Exception
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
ctx.channel().close();
} else {
super.userEventTriggered(ctx, evt);
}
}
}

4.2 编解码器 与 协议对象

消息发送编码器
编码器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
package com.huafu.basecommon.netty.codec;

import com.huafu.basecommon.netty.exceptions.RemotingException;
import com.huafu.basecommon.netty.protocol.MsgProtocol;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;


/**
* netty encoder
* @author zhaoyuzhe
*/
@Sharable
public class NettyEncoder extends MessageToByteEncoder<MsgProtocol> {

@Override
protected void encode(ChannelHandlerContext ctx, MsgProtocol msg, ByteBuf out) throws Exception {
if (msg == null) {
throw new RemotingException("encode msg is null");
}
// 按序编码MsgProtocol(协议对象)中字段
out.writeByte(MsgProtocol.MAGIC);
out.writeByte(MsgProtocol.VERSION);
out.writeByte(msg.getType().getCode());
out.writeLong(msg.getOpaque());
out.writeInt(msg.getBody().length);
out.writeBytes(msg.getBody());
}
}
消息读取解码器
消息解码器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
package com.huafu.basecommon.netty.codec;

import com.huafu.basecommon.netty.enums.MsgType;
import com.huafu.basecommon.netty.protocol.MsgProtocol;
import com.huafu.basecommon.netty.enums.MsgProtocolState;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ReplayingDecoder;
import lombok.extern.slf4j.Slf4j;

import java.util.List;
import java.util.Optional;

/**
* netty decoder
* 负载协议解码器
* 按状态 逐个解码 字段
* @author zhaoyuzhe
*/
@Slf4j
public class NettyDecoder extends ReplayingDecoder<MsgProtocolState> {

// 设置初始状态
public NettyDecoder() {
super(MsgProtocolState.MAGIC);
}

@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
// 参数设置为0L,防止使无参构造器时,opaque自増
MsgProtocol packet = new MsgProtocol(0L);
switch (state()) {
case MAGIC:
// 检查魔数
checkMagic(in.readByte());
checkpoint(MsgProtocolState.VERSION);
case VERSION:
// 检查版本
checkVersion(in.readByte());
checkpoint(MsgProtocolState.MSG_TYPE);
case MSG_TYPE:
int code = in.readByte() & 0xFF;
packet.setType(MsgType.fromCode(code));
checkpoint(MsgProtocolState.OPAQUE);
case OPAQUE:
packet.setOpaque(in.readLong());
checkpoint(MsgProtocolState.BODY_LENGTH);
case BODY_LENGTH:
packet.setBodyLength(in.readInt());
checkpoint(MsgProtocolState.BODY);
case BODY:
byte[] body = new byte[packet.getBodyLength()];
in.readBytes(body);
packet.setBody(body);
out.add(packet);
checkpoint(MsgProtocolState.MAGIC);
break;
default:
log.warn("unknown decoder state {}", state());
}
}

private void checkMagic(byte magic) {
if (magic != MsgProtocol.MAGIC) {
throw new IllegalArgumentException("illegal packet [magic]" + magic);
}
}

private void checkVersion(byte version) {
if (version != MsgProtocol.VERSION) {
throw new IllegalArgumentException("illegal protocol [version]" + version);
}
}
}
消息协议对象
消息协议对象
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
package com.huafu.basecommon.netty.protocol;

import com.huafu.basecommon.netty.enums.MsgType;
import lombok.*;
import org.springframework.stereotype.Service;

import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.atomic.AtomicLong;

/**
* 复杂协议对象
* 一般用于生产环境
* @author zhaoyuzhe
* @date 2024/5/30
*/
@Data
@AllArgsConstructor
public class MsgProtocol implements Serializable {
private static final long serialVersionUID = -6461943636704869891L;

private static final AtomicLong REQUEST_ID = new AtomicLong(1);
/** 魔数*/
public static final byte MAGIC = (byte) 0xbabe;
/** 协议版本号*/
public static final byte VERSION = 0;
/** 报文类型*/
private MsgType type;
/** 请求、响应 唯一标识码,类似于http的sessionid */
private long opaque;
/** 长度域字段*/
private int bodyLength;
/** 请求数据*/
private byte[] body;

public MsgProtocol() {
this.opaque = REQUEST_ID.getAndIncrement();
}
public MsgProtocol(long opaque) {
this.opaque = opaque;
}

@Override
public String toString() {
return "MsgProtocol [type=" + type
+ ", opaque=" + opaque
+ ", bodyLen=" + (body == null ? 0 : body.length)
+ ", body=" + (body == null ? "" : new String(body, StandardCharsets.UTF_8))
+ "]";
}
}

4.3 Client客户端

NettyRemotingClient客户端
NettyRemotingClient
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
@Slf4j
public class NettyRemotingClient implements AutoCloseable {

private final Bootstrap bootstrap = new Bootstrap();

private final NettyEncoder encoder = new NettyEncoder();

private final ConcurrentHashMap<Host, Channel> channels = new ConcurrentHashMap<>(128);

private final AtomicBoolean isStarted = new AtomicBoolean(false);

private final EventLoopGroup workerGroup;

private final NettyClientConfig clientConfig;

private final Semaphore asyncSemaphore = new Semaphore(200, true);

private final ExecutorService callbackExecutor;

private final NettyClientHandler clientHandler;

// private final ScheduledExecutorService responseFutureExecutor;

public NettyRemotingClient(final NettyClientConfig clientConfig) {
this.clientConfig = clientConfig;
if (Epoll.isAvailable()) {
this.workerGroup = new EpollEventLoopGroup(clientConfig.getWorkerThreads(), new NamedThreadFactory("NettyClient"));
} else {
this.workerGroup = new NioEventLoopGroup(clientConfig.getWorkerThreads(), new NamedThreadFactory("NettyClient"));
}
// 消息读取处理 时用的线程池
this.callbackExecutor = new ThreadPoolExecutor(
Constants.CPUS,
Constants.CPUS,
1,
TimeUnit.MINUTES,
new LinkedBlockingQueue<>(1000),
new NamedThreadFactory("CallbackExecutor"),
new CallerThreadExecutePolicy());
this.clientHandler = new NettyClientHandler(this, callbackExecutor);
//
// this.responseFutureExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("ResponseFutureExecutor"));

this.start();
}

private void start() {

this.bootstrap
.group(this.workerGroup)
.channel(NettyUtils.getSocketChannelClass())
.option(ChannelOption.SO_KEEPALIVE, clientConfig.isSoKeepalive())
.option(ChannelOption.TCP_NODELAY, clientConfig.isTcpNoDelay())
.option(ChannelOption.SO_SNDBUF, clientConfig.getSendBufferSize())
.option(ChannelOption.SO_RCVBUF, clientConfig.getReceiveBufferSize())
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, clientConfig.getConnectTimeoutMillis())
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ch.pipeline()
.addLast("client-idle-handler", new IdleStateHandler(Constants.NETTY_CLIENT_HEART_BEAT_TIME, 0, 0, TimeUnit.MILLISECONDS))
.addLast(new NettyDecoder(), clientHandler, encoder);
}
});
// this.responseFutureExecutor.scheduleAtFixedRate(ResponseFuture::scanFutureTable, 5000, 1000, TimeUnit.MILLISECONDS);
isStarted.compareAndSet(false, true);
}



/**
* send
*/
public void send(final Host host, final MsgProtocol msg) throws RemotingException {
Channel channel = getChannel(host);
if (channel == null) {
throw new RemotingException(String.format("connect to : %s fail", host));
}
try {
ChannelFuture future = channel.writeAndFlush(msg).await();
if (future.isSuccess()) {
log.debug("send msg : {} , to : {} successfully.", msg, host.getAddress());
} else {
String msgformat = String.format("send msg : %s , to :%s failed", msg, host.getAddress());
log.error(msgformat, future.cause());
throw new RemotingException(msgformat);
}
} catch (RemotingException remotingException) {
throw remotingException;
} catch (Exception e) {
log.error("Send msg {} to address {} encounter error.", msg, host.getAddress());
throw new RemotingException(String.format("Send msg : %s , to :%s encounter error", msg, host.getAddress()), e);
}
}

/**
* register processor
*/
public void registerProcessor(final MsgType msgType, final NettyRequestProcessor processor) {
this.registerProcessor(msgType, processor, null);
}

/**
* register processor
*
* @param commandType command type
* @param processor processor
* @param executor thread executor
*/
public void registerProcessor(final MsgType commandType, final NettyRequestProcessor processor, final ExecutorService executor) {
this.clientHandler.registerProcessor(commandType, processor, executor);
}

/**
* get channel
*/
public Channel getChannel(Host host) {
Channel channel = channels.get(host);
if (channel != null && channel.isActive()) {
return channel;
}
return createChannel(host, true);
}

/**
* create channel
* 用map对象 缓存channel
* @param host host
* @param isSync sync flag
* @return channel
*/
public Channel createChannel(Host host, boolean isSync) {
ChannelFuture future;
try {
synchronized (bootstrap) {
future = bootstrap.connect(new InetSocketAddress(host.getIp(), host.getPort()));
}
if (isSync) {
future.sync();
}
if (future.isSuccess()) {
Channel channel = future.channel();
channels.put(host, channel);
return channel;
}
} catch (Exception ex) {
log.warn(String.format("connect to %s error", host), ex);
}
return null;
}

@Override
public void close() {
if (isStarted.compareAndSet(true, false)) {
try {
closeChannels();
if (workerGroup != null) {
this.workerGroup.shutdownGracefully();
}
if (callbackExecutor != null) {
this.callbackExecutor.shutdownNow();
}
// if (this.responseFutureExecutor != null) {
// this.responseFutureExecutor.shutdownNow();
// }
log.info("netty client closed");
} catch (Exception ex) {
log.error("netty client close exception", ex);
}
}
}

/**
* close channels
*/
private void closeChannels() {
for (Channel channel : this.channels.values()) {
channel.close();
}
this.channels.clear();
}

/**
* close channel
*
* @param host host
*/
public void closeChannel(Host host) {
Channel channel = this.channels.remove(host);
if (channel != null) {
channel.close();
}
}
}

NettyClientHandler客户端消息读取处理器
NettyClientHandler
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
package com.huafu.basecommon.netty.handler;

import com.huafu.basecommon.netty.NettyRemotingClient;
import com.huafu.basecommon.netty.enums.MsgType;
import com.huafu.basecommon.netty.processor.NettyRequestProcessor;
import com.huafu.basecommon.netty.protocol.MsgProtocol;
import com.huafu.basecommon.netty.utils.ChannelUtils;
import com.huafu.basecommon.netty.utils.Constants;
import com.huafu.basecommon.netty.utils.Pair;
import io.netty.channel.*;
import io.netty.handler.timeout.IdleStateEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;

/**
* netty client request handler
*/
@ChannelHandler.Sharable
public class NettyClientHandler extends ChannelInboundHandlerAdapter {

private final Logger logger = LoggerFactory.getLogger(NettyClientHandler.class);

/**
* netty client
*/
private final NettyRemotingClient nettyRemotingClient;

private static byte[] heartBeatData = "heart_beat".getBytes();

/**
* callback thread executor
*/
private final ExecutorService callbackExecutor;

/**
* processors
*/
private final ConcurrentHashMap<MsgType, Pair<NettyRequestProcessor, ExecutorService>> processors;

/**
* default executor
*/
private final ExecutorService defaultExecutor = Executors.newFixedThreadPool(Constants.CPUS);


public NettyClientHandler(NettyRemotingClient nettyRemotingClient, ExecutorService callbackExecutor) {
this.nettyRemotingClient = nettyRemotingClient;
this.callbackExecutor = callbackExecutor;
this.processors = new ConcurrentHashMap<>();
}

/**
* When the current channel is not active,
* the current channel has reached the end of its life cycle
*
* @param ctx channel handler context
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) {
nettyRemotingClient.closeChannel(ChannelUtils.toAddress(ctx.channel()));
ctx.channel().close();
}

/**
* The current channel reads data from the remote
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
processReceived(ctx.channel(), (MsgProtocol) msg);
}

/**
* register processor
*/
public void registerProcessor(final MsgType msgType, final NettyRequestProcessor processor) {
this.registerProcessor(msgType, processor, null);
}

/**
* register processor
*/
public void registerProcessor(final MsgType msgType, final NettyRequestProcessor processor, final ExecutorService executor) {
ExecutorService executorRef = executor;
if (executorRef == null) {
executorRef = defaultExecutor;
}
this.processors.putIfAbsent(msgType, new Pair<>(processor, executorRef));
}

/**
* process received logic
*/
private void processReceived(final Channel channel, final MsgProtocol msg) {
// TODO: 2024/5/31 暂时不使用 ResponseFuture方式读取 响应消息
// ResponseFuture future = ResponseFuture.getFuture(command.getOpaque());
// if (future != null) {
// future.setResponseCommand(command);
// future.release();
// if (future.getInvokeCallback() != null) {
// future.removeFuture();
// this.callbackExecutor.submit(future::executeInvokeCallback);
// } else {
// future.putResponse(command);
// }
// } else {
processByMsgType(channel, msg);
// }


}

public void processByMsgType(final Channel channel, final MsgProtocol msg) {
final Pair<NettyRequestProcessor, ExecutorService> pair = processors.get(msg.getType());
if (pair != null) {
Runnable run = () -> {
try {
pair.getLeft().process(channel, msg);
} catch (Exception e) {
logger.error(String.format("process command %s exception", msg), e);
}
};
try {
pair.getRight().submit(run);
} catch (RejectedExecutionException e) {
logger.warn("thread pool is full, discard command {} from {}", msg, ChannelUtils.getRemoteAddress(channel));
}
} else {
logger.warn("receive response {}, but not matched any request ", msg);
}
}

/**
* caught exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
logger.error("exceptionCaught : {}", cause.getMessage(), cause);
nettyRemotingClient.closeChannel(ChannelUtils.toAddress(ctx.channel()));
ctx.channel().close();
}

/**
* 固定时间没有读写消息,则向netty服务端发送心跳
* @param ctx
* @param evt
* @throws Exception
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
MsgProtocol heartBeat = new MsgProtocol();
heartBeat.setType(MsgType.HEART_BEAT);
heartBeat.setBody(heartBeatData);
ctx.channel().writeAndFlush(heartBeat)
.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
if (logger.isDebugEnabled()) {
logger.debug("Client send heart beat to: {}", ChannelUtils.getRemoteAddress(ctx.channel()));
}
} else {
super.userEventTriggered(ctx, evt);
}
}
}