赞
踩
为了展示具体的IdleStateHandler实现的心跳机制, 下面构造一个具体的EchoServer 的例子, 这个例子的行为如下:
1. 在这个例子中, 客户端和服务器通过 TCP 长连接进行通信.
下面所使用的代码例子可以在https://github.com/yongshun/some_java_code上找到。实现心跳的通用部分 CustomHeartbeatHandler:
- public abstract class CustomHeartbeatHandler extends SimpleChannelInboundHandler<ByteBuf> {
- public static final byte PING_MSG = 1;
- public static final byte PONG_MSG = 2;
- public static final byte CUSTOM_MSG = 3;
- protected String name;
- private int heartbeatCount = 0;
-
- public CustomHeartbeatHandler(String name) {
- this.name = name;
- }
-
- @Override
- protected void channelRead0(ChannelHandlerContext context, ByteBuf byteBuf) throws Exception {
- if (byteBuf.getByte(4) == PING_MSG) {
- sendPongMsg(context);
- } else if (byteBuf.getByte(4) == PONG_MSG){
- System.out.println(name + " get pong msg from " + context.channel().remoteAddress());
- } else {
- handleData(context, byteBuf);
- }
- }
-
- protected void sendPingMsg(ChannelHandlerContext context) {
- ByteBuf buf = context.alloc().buffer(5);
- buf.writeInt(5);
- buf.writeByte(PING_MSG);
- buf.retain();
- context.writeAndFlush(buf);
- heartbeatCount++;
- System.out.println(name + " sent ping msg to " + context.channel().remoteAddress() + ", count: " + heartbeatCount);
- }
-
- private void sendPongMsg(ChannelHandlerContext context) {
- ByteBuf buf = context.alloc().buffer(5);
- buf.writeInt(5);
- buf.writeByte(PONG_MSG);
- context.channel().writeAndFlush(buf);
- heartbeatCount++;
- System.out.println(name + " sent pong msg to " + context.channel().remoteAddress() + ", count: " + heartbeatCount);
- }
-
- protected abstract void handleData(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf);
-
- @Override
- public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
- // IdleStateHandler 所产生的 IdleStateEvent 的处理逻辑.
- if (evt instanceof IdleStateEvent) {
- IdleStateEvent e = (IdleStateEvent) evt;
- switch (e.state()) {
- case READER_IDLE:
- handleReaderIdle(ctx);
- break;
- case WRITER_IDLE:
- handleWriterIdle(ctx);
- break;
- case ALL_IDLE:
- handleAllIdle(ctx);
- break;
- default:
- break;
- }
- }
- }
-
- @Override
- public void channelActive(ChannelHandlerContext ctx) throws Exception {
- System.err.println("---" + ctx.channel().remoteAddress() + " is active---");
- }
-
- @Override
- public void channelInactive(ChannelHandlerContext ctx) throws Exception {
- System.err.println("---" + ctx.channel().remoteAddress() + " is inactive---");
- }
-
- protected void handleReaderIdle(ChannelHandlerContext ctx) {
- System.err.println("---READER_IDLE---");
- }
-
- protected void handleWriterIdle(ChannelHandlerContext ctx) {
- System.err.println("---WRITER_IDLE---");
- }
-
- protected void handleAllIdle(ChannelHandlerContext ctx) {
- System.err.println("---ALL_IDLE---");
- }
- }
CustomHeartbeatHandler 负责心跳的发送和接收, IdleStateHandler 是实现心跳的关键, 它会根据不同的 IO idle 类型来产生不同的 IdleStateEvent 事件, 而这个事件的捕获, 其实就是在 userEventTriggered 方法中实现的。
- public class Client {
- public static void main(String[] args) {
- NioEventLoopGroup workGroup = new NioEventLoopGroup(4);
- Random random = new Random(System.currentTimeMillis());
- try {
- Bootstrap bootstrap = new Bootstrap();
- bootstrap
- .group(workGroup)
- .channel(NioSocketChannel.class)
- .handler(new ChannelInitializer<SocketChannel>() {
- protected void initChannel(SocketChannel socketChannel) throws Exception {
- ChannelPipeline p = socketChannel.pipeline();
- p.addLast(new IdleStateHandler(0, 0, 5));
- p.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, -4, 0));
- p.addLast(new ClientHandler());
- }
- });
-
- Channel ch = bootstrap.remoteAddress("127.0.0.1", 12345).connect().sync().channel();
- for (int i = 0; i < 10; i++) {
- String content = "client msg " + i;
- ByteBuf buf = ch.alloc().buffer();
- buf.writeInt(5 + content.getBytes().length);
- buf.writeByte(CustomHeartbeatHandler.CUSTOM_MSG);
- buf.writeBytes(content.getBytes());
- ch.writeAndFlush(buf);
-
- Thread.sleep(random.nextInt(20000));
- }
- } catch (Exception e) {
- throw new RuntimeException(e);
- } finally {
- workGroup.shutdownGracefully();
- }
- }
- }
在ChannelInitializer.initChannel部分,给 pipeline 添加了三个Handler, IdleStateHandler 这个 handler 是心跳机制的核心,为客户端端设置了读写 idle 超时, 时间间隔是5s, 即如果客户端在间隔5s后都没有收到服务器的消息或向服务器发送消息, 则产生 ALL_IDLE 事件。LengthFieldBasedFrameDecoder, 它是负责解析TCP报文。最后一个Handler是ClientHandler,它继承于 CustomHeartbeatHandler,是处理业务逻辑部分。
- public class ClientHandler extends CustomHeartbeatHandler {
- public ClientHandler() {
- super("client");
- }
-
- @Override
- protected void handleData(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) {
- byte[] data = new byte[byteBuf.readableBytes() - 5];
- byteBuf.skipBytes(5);
- byteBuf.readBytes(data);
- String content = new String(data);
- System.out.println(name + " get content: " + content);
- }
-
- @Override
- protected void handleAllIdle(ChannelHandlerContext ctx) {
- super.handleAllIdle(ctx);
- sendPingMsg(ctx);
- }
- }
ClientHandler继承于CustomHeartbeatHandler, 它重写了两个方法, 一个是 handleData, 在这里面实现 仅仅打印收到的消息。第二个重写的方法是handleAllIdle。客户端负责发送心跳的PING消息, 当客户端产生一个ALL_IDLE事件后,会导致父类的 CustomHeartbeatHandler.userEventTriggered 调用, 而userEventTriggered中会根据 e.state() 来调用不同的方法, 因此最后调用的是 ClientHandler.handleAllIdle, 在这个方法中, 客户端调用sendPingMsg向服务器发送一个PING消息。
- public class Server {
- public static void main(String[] args) {
- NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
- NioEventLoopGroup workGroup = new NioEventLoopGroup(4);
- try {
- ServerBootstrap bootstrap = new ServerBootstrap();
- bootstrap
- .group(bossGroup, workGroup)
- .channel(NioServerSocketChannel.class)
- .childHandler(new ChannelInitializer<SocketChannel>() {
- protected void initChannel(SocketChannel socketChannel) throws Exception {
- ChannelPipeline p = socketChannel.pipeline();
- p.addLast(new IdleStateHandler(10, 0, 0));
- p.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, -4, 0));
- p.addLast(new ServerHandler());
- }
- });
-
- Channel ch = bootstrap.bind(12345).sync().channel();
- ch.closeFuture().sync();
- } catch (Exception e) {
- throw new RuntimeException(e);
- } finally {
- bossGroup.shutdownGracefully();
- workGroup.shutdownGracefully();
- }
- }
- }
- public class ServerHandler extends CustomHeartbeatHandler {
- public ServerHandler() {
- super("server");
- }
-
- @Override
- protected void handleData(ChannelHandlerContext channelHandlerContext, ByteBuf buf) {
- byte[] data = new byte[buf.readableBytes() - 5];
- ByteBuf responseBuf = Unpooled.copiedBuffer(buf);
- buf.skipBytes(5);
- buf.readBytes(data);
- String content = new String(data);
- System.out.println(name + " get content: " + content);
- channelHandlerContext.write(responseBuf);
- }
-
- @Override
- protected void handleReaderIdle(ChannelHandlerContext ctx) {
- super.handleReaderIdle(ctx);
- System.err.println("---client " + ctx.channel().remoteAddress().toString() + " reader timeout, close it---");
- ctx.close();
- }
- }
ServerHandler继承于CustomHeartbeatHandler, 它重写了两个方法, 一个是handleData, 在这里面实现EchoServer的功能: 即收到客户端的消息后, 立即原封不动地将消息回复给客户端.
- public class Client {
- private NioEventLoopGroup workGroup = new NioEventLoopGroup(4);
- private Channel channel;
- private Bootstrap bootstrap;
-
- public static void main(String[] args) throws Exception {
- Client client = new Client();
- client.start();
- client.sendData();
- }
-
- public void sendData() throws Exception {
- Random random = new Random(System.currentTimeMillis());
- for (int i = 0; i < 10000; i++) {
- if (channel != null && channel.isActive()) {
- String content = "client msg " + i;
- ByteBuf buf = channel.alloc().buffer(5 + content.getBytes().length);
- buf.writeInt(5 + content.getBytes().length);
- buf.writeByte(CustomHeartbeatHandler.CUSTOM_MSG);
- buf.writeBytes(content.getBytes());
- channel.writeAndFlush(buf);
- }
-
- Thread.sleep(random.nextInt(20000));
- }
- }
-
- public void start() {
- try {
- bootstrap = new Bootstrap();
- bootstrap
- .group(workGroup)
- .channel(NioSocketChannel.class)
- .handler(new ChannelInitializer<SocketChannel>() {
- protected void initChannel(SocketChannel socketChannel) throws Exception {
- ChannelPipeline p = socketChannel.pipeline();
- p.addLast(new IdleStateHandler(0, 0, 5));
- p.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, -4, 0));
- p.addLast(new ClientHandler(Client.this));
- }
- });
- doConnect();
-
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- protected void doConnect() {
- if (channel != null && channel.isActive()) {
- return;
- }
-
- ChannelFuture future = bootstrap.connect("127.0.0.1", 12345);
-
- future.addListener(new ChannelFutureListener() {
- public void operationComplete(ChannelFuture futureListener) throws Exception {
- if (futureListener.isSuccess()) {
- channel = futureListener.channel();
- System.out.println("Connect to server successfully!");
- } else {
- System.out.println("Failed to connect to server, try connect after 10s");
-
- futureListener.channel().eventLoop().schedule(new Runnable() {
- @Override
- public void run() {
- doConnect();
- }
- }, 10, TimeUnit.SECONDS);
- }
- }
- });
- }
- }
上面的代码中,抽象出 doConnect 方法, 它负责客户端和服务器的 TCP 连接的建立, 并且当 TCP 连接失败时, doConnect 会 通过 "channel().eventLoop().schedule" 来延时10s 后尝试重新连接。
- public class ClientHandler extends CustomHeartbeatHandler {
- private Client client;
- public ClientHandler(Client client) {
- super("client");
- this.client = client;
- }
-
- @Override
- protected void handleData(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) {
- byte[] data = new byte[byteBuf.readableBytes() - 5];
- byteBuf.skipBytes(5);
- byteBuf.readBytes(data);
- String content = new String(data);
- System.out.println(name + " get content: " + content);
- }
-
- @Override
- protected void handleAllIdle(ChannelHandlerContext ctx) {
- super.handleAllIdle(ctx);
- sendPingMsg(ctx);
- }
-
- @Override
- public void channelInactive(ChannelHandlerContext ctx) throws Exception {
- super.channelInactive(ctx);
- client.doConnect();
- }
- }
断线重连的关键一点是检测连接是否已经断开. 因此改写了ClientHandler,重写了channelInactive方法. 当TCP连接断开时,会回调channelInactive方法,因此在这个方法中调用 client.doConnect() 来进行重连。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。