当前位置:   article > 正文

Netty做为客户端实现socket通信_netty socket客户端

netty socket客户端

依赖准备

<dependency>
	<groupId>io.netty</groupId>
	<artifactId>netty-all</artifactId>
	<version>4.1.42.Final</version>
</dependency>


<dependency>
	<groupId>org.projectlombok</groupId>
	<artifactId>lombok</artifactId>
	<version>1.18.20</version>
 </dependency>

 <dependency>
	<groupId>com.alibaba</groupId>
	<artifactId>fastjson</artifactId>
	<version>1.2.75</version>
 </dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

1.定义消息实体

@Data
public class NettyMsg {
 
    /**
     * 设备id
     */
    private String deviceId;

    /**
     * 消息类型 1=定位 2=闯红灯事件 3=车流量事件
     */
    private Integer msgType;
 
    /**
     * 发送的消息
     */
    private String msg;

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

2.定义Netty服务端

import java.nio.charset.StandardCharsets;

import org.springframework.stereotype.Component;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import lombok.extern.slf4j.Slf4j;

/**
 * Netty 服务端
 */
@Component
@Slf4j
public class NettyServer {
    /**
     * Netty服务端的端口
     */
    private static final int PORT = 28888;

    /**
     * 服务端接收连接队列的长度
     */
    private static final int QUEUE_SIZE = 1024*1024;

    /**
     * 粘包的分隔符
     */
    private static final String DELIMITER = "\r\n";

    /**
     * 分隔的最大长度
     */
    private static final int DELIMITER_MAX_LENGTH = 1024;

    /**
     * 服务启动绑定端口
     */
    public void bind() {

        // 配置服务端的NIO线程组
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            // 创建服务器端的启动对象,配置参数
            ServerBootstrap bootstrap = new ServerBootstrap();
            // 设置两个线程组
            bootstrap.group(bossGroup, workerGroup)
                    // 使用NioServerSocketChannel 作为服务器的通道实现
                    .channel(NioServerSocketChannel.class)
                    // 设置线程队列得到连接个数
                    .option(ChannelOption.SO_BACKLOG, QUEUE_SIZE)
                    // 设置保持活动连接状态
                    .childOption(ChannelOption.SO_KEEPALIVE, true)
                    // 通过NoDelay禁用Nagle,使消息立即发出去,不用等待到一定的数据量才发出去
                    .childOption(ChannelOption.TCP_NODELAY, true)
                    // 给workerGroup的EventLoop对应的管道设置处理器
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        // 给pipeline 设置处理器
                        @Override
                        protected void initChannel(SocketChannel socketChannel) {
                            ChannelPipeline pipeline = socketChannel.pipeline();
                            pipeline.addLast(new DelimiterBasedFrameDecoder(DELIMITER_MAX_LENGTH,
                                    Unpooled.wrappedBuffer(DELIMITER.getBytes(StandardCharsets.UTF_8))));
                            // 字符串编解码器
                            pipeline.addLast(new StringDecoder(StandardCharsets.UTF_8),
                                    new StringEncoder(StandardCharsets.UTF_8));
                            pipeline.addLast(new NettyServerHandler());
                        }
                    });

            // 绑定端口,开始接收进来的连接
            ChannelFuture future = bootstrap.bind(PORT).sync();
            if (future.isSuccess()) {
                log.info("======socket server start======");
            }

            // 对关闭通道进行监听
            future.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            // 发送异常关闭
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
            Thread.currentThread().interrupt();
        }
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99

3.定义 netty服务端处理器

package com.huawei.edu.iot.lab.provider.netty;

import java.net.InetSocketAddress;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;

import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component;

import com.alibaba.fastjson.JSON;
import com.huawei.edu.iot.lab.provider.enums.NettyMsgTypeEnum;
import com.huawei.edu.iot.lab.provider.manager.NettyManager;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelId;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import lombok.extern.slf4j.Slf4j;

/**
 * netty服务端处理器
 */
@Slf4j
@Component
public class NettyServerHandler extends ChannelInboundHandlerAdapter {

    @Resource
    private NettyManager nettyManager;

    /**
     * 存放连接的客户端信息
     */
    private static final Map<String, ChannelHandlerContext> MAP = new ConcurrentHashMap<>();

    private static NettyManager staticNettyManager;


    @PostConstruct
    public void init() {
        staticNettyManager = nettyManager;
    }

    /**
     * 通道激活时触发,当客户端connect成功后,服务端就会接收到这个事件,从而可以把客户端的Channel记录下来,供后面复用
     *
     * @param ctx ctx
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        ChannelId channelId = ctx.channel().id();
        InetSocketAddress inetSocketAddress = (InetSocketAddress) ctx.channel().remoteAddress();
        String clientIp = inetSocketAddress.getAddress().getHostAddress();
        int clientPort = inetSocketAddress.getPort();
        log.info("客户端:{}连接netty服务器[IP: {},PORT: {}]", channelId, clientIp, clientPort);
        String channelKey = ctx.channel().id().toString();
        // 如果map中不包含此连接,就保存连接
        if (!hasConnection(channelKey)) {
            MAP.put(channelKey, ctx);
        }
    }

    /**
     * 有客户端断开连接服务器会触发此函数
     *
     * @param ctx ctx
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) {
        ChannelId channelId = ctx.channel().id();
        log.info("有客户端断开连接,绑定的信道是:{}", channelId);
        String channelKey = channelId. toString();
        // 如果map中不包含此连接,就删除连接
        if (hasConnection(channelKey)) {
            ctx.close();
            MAP.remove(channelKey);
        }
    }

    /**
     * 当客户端发来消息后,就会触发,参数msg就是发来的信息,可以是基础类型,也可以是序列化的复杂对象
     *
     * @param ctx ctx
     * @param msg 发来的消息
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        log.info("加载客户端报文[信道:{},消息:{}]", ctx.channel().id(), msg);
        // 解析客户端发来的消息
        NettyMsg nettyMsg = JSON.parseObject(msg.toString(), NettyMsg.class);

        if (Objects.isNull(nettyMsg)) {
            // 消息解析出现异常
            log.info("消息解析异常,msg:{}", msg);
            return;
        }

        // 处理消息
        this.handlerMsg(nettyMsg);
    }

    /**
     * 处理客户端发来的消息
     *
     * @param nettyMsg msg
     */
    private void handlerMsg(NettyMsg nettyMsg) {
        NettyMsgTypeEnum.isInclude(nettyMsg.getMsgType());

        // 处理闯红灯事件
        if (NettyMsgTypeEnum.RED_LIGHT_RUNNING_EVENT.getValue() == nettyMsg.getMsgType()) {
            staticNettyManager.processRedLightRunningEvent(nettyMsg.getMsg());
        }

        // 处理车流量事件
        if (NettyMsgTypeEnum.TRAFFIC_CONGESTION_EVENT.getValue() == nettyMsg.getMsgType()) {
            staticNettyManager.processCongestion(nettyMsg.getMsg());
        }

        // 定位
        if (NettyMsgTypeEnum.POSITIONING.getValue() == nettyMsg.getMsgType()) {
            staticNettyManager.processPosition(nettyMsg);
        }

        // 禁止推流处理
        if (NettyMsgTypeEnum.PUSH_STREAM_CONTROL.getValue() == nettyMsg.getMsgType()) {
            staticNettyManager.processForbiddenPushStream(nettyMsg.getMsg());
        }
    }

    /**
     * 出错是会触发,做一些错误处理
     *
     * @param ctx ctx
     * @param cause cause
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        log.error("exceptionCaught", cause);
    }

    /**
     * 服务端给客户端发送消息
     *
     * @param nettyMsg 消息内容
     */
    public void sendMsgToClient(NettyMsg nettyMsg) {
        Iterator<String> iterator = MAP.keySet().iterator();
        if (iterator.hasNext()) {
            String key = iterator.next();
            if (hasConnection(key)) {
                ChannelHandlerContext ctx = MAP.get(key);
                if (ctx.channel().isActive()) {
                    String jsonNettyMsg = JSON.toJSONString(nettyMsg);
                    log.info("服务端发送信息为:{}", jsonNettyMsg);
                    ctx.writeAndFlush(jsonNettyMsg);
                } else {
                    MAP.remove(key);
                }
            }
        }
    }

    /**
     * 判断当前连接是否存在
     *
     * @param channelKey channelId
     * @return ture/false
     */
    public boolean hasConnection(String channelKey) {
        if (StringUtils.isEmpty(channelKey)) {
            return false;
        }
        return MAP.containsKey(channelKey);
    }

    /**
     * 心跳机制,超时处理
     *
     * @param ctx ctx
     * @param evt evt
     */
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
        String socketString = ctx.channel().remoteAddress().toString();
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent event = (IdleStateEvent) evt;
            if (event.state() == IdleState.READER_IDLE) {
                log.info("Client: {} READER_IDLE 读超时", socketString);
                ctx.channel().close();
            } else if (event.state() == IdleState.WRITER_IDLE) {
                log.info("Client: {} WRITER_IDLE 写超时", socketString);
                ctx.channel().close();
            } else if (event.state() == IdleState.ALL_IDLE) {
                log.info("Client: {} ALL_IDLE 总超时", socketString);
                ctx.channel().close();
            }
        }
    }

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161
  • 162
  • 163
  • 164
  • 165
  • 166
  • 167
  • 168
  • 169
  • 170
  • 171
  • 172
  • 173
  • 174
  • 175
  • 176
  • 177
  • 178
  • 179
  • 180
  • 181
  • 182
  • 183
  • 184
  • 185
  • 186
  • 187
  • 188
  • 189
  • 190
  • 191
  • 192
  • 193
  • 194
  • 195
  • 196
  • 197
  • 198
  • 199
  • 200
  • 201
  • 202
  • 203
  • 204
  • 205
  • 206
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/笔触狂放9/article/detail/973940
推荐阅读
相关标签
  

闽ICP备14008679号