赞
踩
Netty是一款基于NIO(Nonblocking I/O,非阻塞IO)开发的网络通信框架,相比传统Socket,在并发性方面有着很大的提升。关于NIO,BIO,AIO之间的区别,可以参考这篇博客:NIO 、 BIO与AIO之间的区别_谁念西风独自凉-CSDN博客
首先我们启动一个tcp服务,这里我用到了Redis与RabbitMQ,主要是与分布式WEB平台之间好对接
- @Component
- public class ApplicationEventListener implements CommandLineRunner {
- @Value("${spring.application.name}")
- private String nodeName;
-
- @Value("${gnss.mqttserver.tcpPort}")
- private int tcpPort;
-
- @Override
- public void run(String... args) throws Exception {
- //启动TCP服务
- startTcpServer();
-
- //清除Redis所有此节点的在线终端
- RedisService redisService = SpringBeanService.getBean(RedisService.class);
- redisService.deleteAllOnlineTerminals(nodeName);
-
- //将所有此节点的终端设置为离线
- RabbitMessageSender messageSender = SpringBeanService.getBean(RabbitMessageSender.class);
- messageSender.noticeAllOffline(nodeName);
- }
-
- /**
- * 启动TCP服务
- *
- * @throws Exception
- */
- private void startTcpServer() throws Exception {
- //计数器,必须等到所有服务启动成功才能进行后续的操作
- final CountDownLatch countDownLatch = new CountDownLatch(1);
- //启动TCP服务
- TcpServer tcpServer = new TcpServer(tcpPort, ProtocolEnum.MqttCommon, countDownLatch);
- tcpServer.start();
- //等待启动完成
- countDownLatch.await();
- }
- }
接下来我们编写一个TcpServer类实现TCP服务
- @Slf4j
- public class TcpServer extends Thread{
- private int port;
-
- private ProtocolEnum protocolType;
-
- private EventLoopGroup bossGroup;
-
- private EventLoopGroup workerGroup;
-
- private ServerBootstrap serverBootstrap = new ServerBootstrap();
-
- private CountDownLatch countDownLatch;
-
- public TcpServer(int port, ProtocolEnum protocolType, CountDownLatch countDownLatch) {
- this.port = port;
- this.protocolType = protocolType;
- this.countDownLatch = countDownLatch;
-
- bossGroup = new NioEventLoopGroup(1);
- workerGroup = SpringBeanService.getBean("workerGroup", EventLoopGroup.class);
- final EventExecutorGroup executorGroup = SpringBeanService.getBean("executorGroup", EventExecutorGroup.class);
- serverBootstrap.group(bossGroup, workerGroup)
- .channel(NioServerSocketChannel.class)
- .option(ChannelOption.SO_BACKLOG, 1024)
- .childOption(ChannelOption.SO_KEEPALIVE, true)
- .childOption(ChannelOption.TCP_NODELAY, true)
- .childHandler(new ChannelInitializer<SocketChannel>() {
-
- @Override
- protected void initChannel(SocketChannel ch) throws Exception {
- ch.pipeline().addLast(new IdleStateHandler(MqttConstant.READER_IDLE_TIME, 0, 0, TimeUnit.SECONDS));
- ch.pipeline().addLast("encoder", MqttEncoder.INSTANCE);
- ch.pipeline().addLast("decoder", new MqttDecoder());
- ch.pipeline().addLast(executorGroup, MqttBusinessHandler.INSTANCE);
- }
- });
- }
-
- @Override
- public void run() {
- bind();
- }
-
- /**
- * 绑定端口启动服务
- */
- private void bind() {
- serverBootstrap.bind(port).addListener(future -> {
- if (future.isSuccess()) {
- log.info("{} MQTT服务器启动,端口:{}", protocolType, port);
- countDownLatch.countDown();
- } else {
- log.error("{} MQTT服务器启动失败,端口:{}", protocolType, port, future.cause());
- System.exit(-1);
- }
- });
- }
-
- /**
- * 关闭服务端
- */
- public void shutdown() {
- workerGroup.shutdownGracefully();
- bossGroup.shutdownGracefully();
- log.info("{} TCP服务器关闭,端口:{}", protocolType, port);
- }
- }
编写一个解码器MqttBusinessHandler,实现对MQTT消息接收与处理
- @Slf4j
- @ChannelHandler.Sharable
- public class MqttBusinessHandler extends SimpleChannelInboundHandler<Object> {
- public static final MqttBusinessHandler INSTANCE = new MqttBusinessHandler();
- private MqttMsgBack mqttMsgBack;
- private MqttBusinessHandler() {
- mqttMsgBack= MqttMsgBack.INSTANCE;
- }
-
- /**
- * 接收到消息后处理
- * @param ctx
- * @param msg
- * @throws Exception
- */
- @Override
- protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
- if (null != msg) {
- MqttMessage mqttMessage = (MqttMessage) msg;
- MqttFixedHeader mqttFixedHeader = mqttMessage.fixedHeader();
- Channel channel = ctx.channel();
- if(mqttFixedHeader.messageType().equals(MqttMessageType.CONNECT)){
- //在一个网络连接上,客户端只能发送一次CONNECT报文。服务端必须将客户端发送的第二个CONNECT报文当作协议违规处理并断开客户端的连接
- //建议connect消息单独处理,用来对客户端进行认证管理等 这里直接返回一个CONNACK消息
- mqttMsgBack.connectionAck(ctx, mqttMessage);
- }
-
- switch (mqttFixedHeader.messageType()){
- //客户端发布消息
- case PUBLISH:
- mqttMsgBack.publishAck(ctx, mqttMessage);
- break;
- //发布释放
- case PUBREL:
- mqttMsgBack.publishComp(ctx, mqttMessage);
- break;
- //订阅主题
- case SUBSCRIBE:
- mqttMsgBack.subscribeAck(ctx, mqttMessage);
- break;
- //取消订阅主题
- case UNSUBSCRIBE:
- mqttMsgBack.unsubscribeAck(ctx, mqttMessage);
- break;
- //客户端发送心跳报文
- case PINGREQ:
- mqttMsgBack.pingResp(ctx, mqttMessage);
- break;
- //客户端主动断开连接
- case DISCONNECT:
- break;
- default:
- break;
- }
- }
- }
-
- @Override
- public void channelInactive(ChannelHandlerContext ctx) throws Exception {
- log.info("终端关闭连接,IP信息:{}", CommonUtil.getClientAddress(ctx));
- }
-
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
- ctx.close();
- log.error("终端连接异常,IP信息:{}", CommonUtil.getClientAddress(ctx), cause);
- }
-
- /**
- * 服务端当读超时时会调用这个方法
- */
- @Override
- public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception, IOException {
- ctx.close();
- log.error("读超时,IP信息:{}", CommonUtil.getClientAddress(ctx), evt);
- }
我们对接收到的消息进行业务处理
- @Slf4j
- public class MqttMsgBack {
- public static final MqttMsgBack INSTANCE = new MqttMsgBack();
- private RedisService redisService;
- private RabbitMessageSender messageSender;
- private Environment environment;
- private MessageServiceProvider messageServiceProvider;
-
- private MqttMsgBack() {
- redisService = SpringBeanService.getBean(RedisService.class);
- messageSender = SpringBeanService.getBean(RabbitMessageSender.class);
- environment = SpringBeanService.getBean(Environment.class);
- messageServiceProvider = SpringBeanService.getBean(MessageServiceProvider.class);
- }
-
- /**
- * 确认连接请求
- * @param ctx
- * @param mqttMessage
- */
- public void connectionAck (ChannelHandlerContext ctx, MqttMessage mqttMessage) {
- MqttConnectMessage mqttConnectMessage = (MqttConnectMessage) mqttMessage;
- MqttFixedHeader mqttFixedHeaderInfo = mqttConnectMessage.fixedHeader();
- MqttConnectVariableHeader mqttConnectVariableHeaderInfo = mqttConnectMessage.variableHeader();
- //构建返回报文, 可变报头
- MqttConnAckVariableHeader mqttConnAckVariableHeaderBack = new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_ACCEPTED, mqttConnectVariableHeaderInfo.isCleanSession());
- //构建返回报文, 固定报头
- MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.CONNACK,mqttFixedHeaderInfo.isDup(), MqttQoS.AT_MOST_ONCE, mqttFixedHeaderInfo.isRetain(), 0x02);
- //构建连接回复消息体
- MqttConnAckMessage connAck = new MqttConnAckMessage(mqttFixedHeaderBack, mqttConnAckVariableHeaderBack);
- ctx.writeAndFlush(connAck);
- //获取连接者的ClientId
- String clientIdentifier = mqttConnectMessage.payload().clientIdentifier();
- //查询终端号码有无在平台注册
- TerminalProto terminalInfo = redisService.getTerminalInfoByTerminalNum(clientIdentifier);
- if (terminalInfo == null) {
- log.error("终端登录失败,未找到终端信息,终端号:{},IP信息:{}", clientIdentifier, CommonUtil.getClientAddress(ctx));
- ctx.close();
- return;
- }
- //设置节点名
- terminalInfo.setNodeName(environment.getProperty("spring.application.name"));
- //保存终端信息和消息流水号到上下文属性中
- Session session = new Session(terminalInfo);
- ChannelHandlerContext oldCtx = SessionUtil.bindSession(session, ctx);
- if (oldCtx == null) {
- log.info("终端登录成功,终端ID:{},终端号:{},IP信息:{}", terminalInfo.getTerminalStrId(), clientIdentifier, CommonUtil.getClientAddress(ctx));
- } else {
- log.info("终端重复登录关闭上一个连接,终端ID:{},终端号:{},IP信息:{}", terminalInfo.getTerminalStrId(), clientIdentifier, CommonUtil.getClientAddress(ctx));
- oldCtx.close();
- }
- //通知上线
- messageSender.noticeOnline(terminalInfo);
- log.info("终端登录成功,终端号:{},IP信息:{}", clientIdentifier, CommonUtil.getClientAddress(ctx));
- }
-
- /**
- * 根据qos发布确认
- * @param ctx
- * @param mqttMessage
- */
- public void publishAck (ChannelHandlerContext ctx, MqttMessage mqttMessage) {
- MqttPublishMessage mqttPublishMessage = (MqttPublishMessage) mqttMessage;
- MqttFixedHeader mqttFixedHeaderInfo = mqttPublishMessage.fixedHeader();
- MqttQoS qos = (MqttQoS) mqttFixedHeaderInfo.qosLevel();
- //得到主题
- String topicName = mqttPublishMessage.variableHeader().topicName();
- //获取消息体
- ByteBuf msgBodyBuf = mqttPublishMessage.payload();
- log.info("收到:{}", ByteBufUtil.hexDump(msgBodyBuf));
- MqttCommonMessage msg=new MqttCommonMessage();
- msg.setTerminalNum(SessionUtil.getTerminalInfo(ctx).getTerminalNum());
- msg.setStrMsgId(topicName);
- //根据主题获取对应的主题消息处理器
- BaseMessageService messageService = messageServiceProvider.getMessageService(topicName);
- try {
- Object result = messageService.process(ctx, msg, msgBodyBuf);
- log.info("收到{}({}),终端ID:{},内容:{}", messageService.getDesc(), topicName,msg.getTerminalNum(), msg.getMsgBodyItems());
- } catch (Exception e) {
- log.error("收到{}({}),消息异常,终端ID:{},消息体:{}", messageService.getDesc(), topicName,msg.getTerminalNum(),ByteBufUtil.hexDump(msgBodyBuf), e);
- }
- switch (qos) {
- //至多一次
- case AT_MOST_ONCE:
- break;
- //至少一次
- case AT_LEAST_ONCE:
- //构建返回报文, 可变报头
- MqttMessageIdVariableHeader mqttMessageIdVariableHeaderBack = MqttMessageIdVariableHeader.from(mqttPublishMessage.variableHeader().packetId());
- //构建返回报文, 固定报头
- MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.PUBACK,mqttFixedHeaderInfo.isDup(), MqttQoS.AT_MOST_ONCE, mqttFixedHeaderInfo.isRetain(), 0x02);
- //构建PUBACK消息体
- MqttPubAckMessage pubAck = new MqttPubAckMessage(mqttFixedHeaderBack, mqttMessageIdVariableHeaderBack);
- log.info("Qos:AT_LEAST_ONCE:{}",pubAck.toString());
- ctx.writeAndFlush(pubAck);
- break;
- //刚好一次
- case EXACTLY_ONCE:
- //构建返回报文,固定报头
- MqttFixedHeader mqttFixedHeaderBack2 = new MqttFixedHeader(MqttMessageType.PUBREC,false, MqttQoS.AT_LEAST_ONCE,false,0x02);
- //构建返回报文,可变报头
- MqttMessageIdVariableHeader mqttMessageIdVariableHeaderBack2 = MqttMessageIdVariableHeader.from(mqttPublishMessage.variableHeader().packetId());
- MqttMessage mqttMessageBack = new MqttMessage(mqttFixedHeaderBack2,mqttMessageIdVariableHeaderBack2);
- log.info("Qos:EXACTLY_ONCE回复:{}"+mqttMessageBack.toString());
- ctx.writeAndFlush(mqttMessageBack);
- break;
- default:
- break;
- }
- }
-
- /**
- * 发布完成 qos2
- * @param ctx
- * @param mqttMessage
- */
- public void publishComp (ChannelHandlerContext ctx, MqttMessage mqttMessage) {
-
- MqttMessageIdVariableHeader messageIdVariableHeader = (MqttMessageIdVariableHeader) mqttMessage.variableHeader();
- //构建返回报文, 固定报头
- MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.PUBCOMP,false, MqttQoS.AT_MOST_ONCE,false,0x02);
- //构建返回报文, 可变报头
- MqttMessageIdVariableHeader mqttMessageIdVariableHeaderBack = MqttMessageIdVariableHeader.from(messageIdVariableHeader.messageId());
- MqttMessage mqttMessageBack = new MqttMessage(mqttFixedHeaderBack,mqttMessageIdVariableHeaderBack);
- log.info("发布完成回复:{}"+mqttMessageBack.toString());
- ctx.writeAndFlush(mqttMessageBack);
- }
-
- /**
- * 订阅确认
- * @param ctx
- * @param mqttMessage
- */
- public void subscribeAck(ChannelHandlerContext ctx, MqttMessage mqttMessage) {
- MqttSubscribeMessage mqttSubscribeMessage = (MqttSubscribeMessage) mqttMessage;
- MqttMessageIdVariableHeader messageIdVariableHeader = mqttSubscribeMessage.variableHeader();
- //构建返回报文, 可变报头
- MqttMessageIdVariableHeader variableHeaderBack = MqttMessageIdVariableHeader.from(messageIdVariableHeader.messageId());
- Set<String> topics = mqttSubscribeMessage.payload().topicSubscriptions().stream().map(mqttTopicSubscription -> mqttTopicSubscription.topicName()).collect(Collectors.toSet());
- List<Integer> grantedQoSLevels = new ArrayList<>(topics.size());
- for (int i = 0; i < topics.size(); i++) {
- grantedQoSLevels.add(mqttSubscribeMessage.payload().topicSubscriptions().get(i).qualityOfService().value());
- }
- // 构建返回报文 有效负载
- MqttSubAckPayload payloadBack = new MqttSubAckPayload(grantedQoSLevels);
- // 构建返回报文 固定报头
- MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.SUBACK, false, MqttQoS.AT_MOST_ONCE, false, 2+topics.size());
- // 构建返回报文 订阅确认
- MqttSubAckMessage subAck = new MqttSubAckMessage(mqttFixedHeaderBack,variableHeaderBack, payloadBack);
- log.info("订阅回复:{}", subAck.toString());
- ctx.writeAndFlush(subAck);
- }
-
- /**
- * 取消订阅确认
- * @param ctx
- * @param mqttMessage
- */
- public void unsubscribeAck(ChannelHandlerContext ctx, MqttMessage mqttMessage) {
- MqttMessageIdVariableHeader messageIdVariableHeader = (MqttMessageIdVariableHeader) mqttMessage.variableHeader();
- // 构建返回报文 可变报头
- MqttMessageIdVariableHeader variableHeaderBack = MqttMessageIdVariableHeader.from(messageIdVariableHeader.messageId());
- // 构建返回报文 固定报头
- MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.UNSUBACK, false, MqttQoS.AT_MOST_ONCE, false, 2);
- // 构建返回报文 取消订阅确认
- MqttUnsubAckMessage unSubAck = new MqttUnsubAckMessage(mqttFixedHeaderBack,variableHeaderBack);
- log.info("取消订阅回复:{}",unSubAck.toString());
- ctx.writeAndFlush(unSubAck);
- }
-
- /**
- * 心跳响应
- * @param ctx
- * @param mqttMessage
- */
- public void pingResp (ChannelHandlerContext ctx, MqttMessage mqttMessage) {
- MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PINGRESP, false, MqttQoS.AT_MOST_ONCE, false, 0);
- MqttMessage mqttMessageBack = new MqttMessage(fixedHeader);
- log.info("心跳回复:{}", mqttMessageBack.toString());
- ctx.writeAndFlush(mqttMessageBack);
- }
- }
我们可以根据客户端发布消息的主题匹配不同的处理器
最后,我们在对应的处理器里面实现对主题消息的处理逻辑,比如:定位消息,指令消息等等,比如简单实现对定位数据Location主题的消息处理
- @Slf4j
- @MessageService(strMessageId = "Location", desc = "定位")
- public class LocationMessageService extends BaseMessageService<MqttCommonMessage> {
- @Autowired
- private RabbitMessageSender messageSender;
-
- @Override
- public Object process(ChannelHandlerContext ctx, MqttCommonMessage msg, ByteBuf msgBodyBuf) throws Exception {
- byte[] msgByteArr = new byte[msgBodyBuf.readableBytes()];
- msgBodyBuf.readBytes(msgByteArr);
- String data = new String(msgByteArr);
- msg.putMessageBodyItem("位置", data);
- return null;
- }
- }
目前仅仅是实现MQTT服务端消息接收与消息回复,后续可以根据接入的物联网设备进行对应主题消息的业务处理
有物联网兴趣的同学可以对加我微信一起交流学习
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。