当前位置:   article > 正文

netty+springboot+dubbo+zk整合tcp项目_springboot 整合dubbo+netty

springboot 整合dubbo+netty

1.描述

     为了能快速开发netty集成springboot整合tcp项目,本篇博客搭建勒一个空的架子,只需要修改对应的业务逻辑代码就行,很好的提高勒开发速度,降低勒netty开发的难度。开发人员,只需要关注于业务逻辑就行。

      建议:有一定的netty和springboot,dubbbo基础。

     下载socket调试工具

给一个测试的demo,十六进制字符串

2454585858007A0E320460047A3F00000330473336303020646D67642000000BE7130C040F0A00000000002D00390021004D0024004B0026005200300090060026005309000000000000000000000000000004AA04AA0A04A80700484A05470700278227830427820A000057B800559E0050C50500840D0A0000

2.netty

     2.1  服务器端   

              2.1.1 pom.xml 

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  3. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  4. <modelVersion>4.0.0</modelVersion>
  5. <groupId>com.cloudtech</groupId>
  6. <artifactId>dipper</artifactId>
  7. <version>0.0.1-SNAPSHOT</version>
  8. <packaging>jar</packaging>
  9. <name>dipper</name>
  10. <description>Demo project for Spring Boot</description>
  11. <parent>
  12. <groupId>org.springframework.boot</groupId>
  13. <artifactId>spring-boot-starter-parent</artifactId>
  14. <version>2.0.5.RELEASE</version>
  15. <relativePath /> <!-- lookup parent from repository -->
  16. </parent>
  17. <properties>
  18. <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  19. <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
  20. <java.version>1.8</java.version>
  21. </properties>
  22. <dependencies>
  23. <dependency>
  24. <groupId>org.springframework.boot</groupId>
  25. <artifactId>spring-boot-starter-web</artifactId>
  26. </dependency>
  27. <dependency>
  28. <groupId>org.springframework.boot</groupId>
  29. <artifactId>spring-boot-starter-test</artifactId>
  30. <scope>test</scope>
  31. </dependency>
  32. <!-- https://mvnrepository.com/artifact/com.alibaba.boot/dubbo-spring-boot-starter -->
  33. <dependency>
  34. <groupId>com.alibaba.boot</groupId>
  35. <artifactId>dubbo-spring-boot-starter</artifactId>
  36. <version>0.2.0</version>
  37. </dependency>
  38. <!-- netty -->
  39. <dependency>
  40. <groupId>io.netty</groupId>
  41. <artifactId>netty-all</artifactId>
  42. <version>5.0.0.Alpha2</version>
  43. </dependency>
  44. <!-- 配置提示 -->
  45. <dependency>
  46. <groupId>org.springframework.boot</groupId>
  47. <artifactId>spring-boot-configuration-processor</artifactId>
  48. <optional>true</optional>
  49. </dependency>
  50. </dependencies>
  51. <build>
  52. <!-- 打成jar包的名称 -->
  53. <finalName>dipper</finalName>
  54. <plugins>
  55. <!-- 解决web.xml报错的问题 -->
  56. <plugin>
  57. <groupId>org.apache.maven.plugins</groupId>
  58. <artifactId>maven-war-plugin</artifactId>
  59. <configuration>
  60. <failOnMissingWebXml>false</failOnMissingWebXml>
  61. </configuration>
  62. </plugin>
  63. <plugin>
  64. <groupId>org.springframework.boot</groupId>
  65. <artifactId>spring-boot-maven-plugin </artifactId>
  66. </plugin>
  67. <plugin>
  68. <groupId>org.springframework.boot</groupId>
  69. <artifactId>spring-boot-maven-plugin</artifactId>
  70. <executions>
  71. <execution>
  72. <goals>
  73. <goal>repackage</goal>
  74. </goals>
  75. </execution>
  76. </executions>
  77. </plugin>
  78. </plugins>
  79. </build>
  80. </project>

2.1.2  application.yml

  1. ####自定义配置
  2. netty:
  3. port: 4002 ###分钟监听端口
  4. module-type: 0 ####0标识主通讯模块 1标识从通讯模块
  5. stationTypeId: 17
  6. ###最大缓存长度
  7. maxBuffLen: 10240
  8. logging:
  9. config: classpath:logback-boot.xml
  10. ####dubbot版本
  11. demo:
  12. service:
  13. version: 1.0.0
  14. dubbo:
  15. reference:
  16. check: false
  17. com:
  18. cloudtech:
  19. web:
  20. dubbo:
  21. check: false
  22. consumers:
  23. check: false
  24. scan:
  25. basePackages: com.cloudtech.web.dubbo
  26. application:
  27. id: dipper-provider-demo
  28. name: dipper-provider-demo
  29. provider:
  30. retries: -1
  31. timeout: 30000000
  32. protocol:
  33. id: dubbo
  34. name: dubbo
  35. port: -1
  36. registry:
  37. protocol: zookeeper
  38. check: false
  39. address: zookeeper://127.0.0.1:2181

2.1.3 启动类

  1. package com.cloudtech.demo;
  2. import org.slf4j.Logger;
  3. import org.slf4j.LoggerFactory;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.beans.factory.annotation.Value;
  6. import org.springframework.boot.CommandLineRunner;
  7. import org.springframework.boot.WebApplicationType;
  8. import org.springframework.boot.autoconfigure.SpringBootApplication;
  9. import org.springframework.boot.builder.SpringApplicationBuilder;
  10. import org.springframework.context.annotation.ComponentScan;
  11. import com.cloudtech.config.NettyConfig;
  12. import com.cloudtech.server.Server;
  13. import com.cloudtech.util.Consts;
  14. @SpringBootApplication
  15. @ComponentScan(value = {"com.cloudtech"})
  16. public class DipperApplication implements CommandLineRunner {
  17. protected static final Logger LOGGER = LoggerFactory.getLogger(DipperApplication.class);
  18. @Autowired
  19. NettyConfig nettyConfig;
  20. @Autowired
  21. Server server;
  22. @Value("${stationTypeId}")
  23. private Integer stationTypeId;
  24. @Value("${maxBuffLen}")
  25. private Integer maxBuffLen;
  26. public static void main(String[] args) {
  27. new SpringApplicationBuilder(DipperApplication.class)
  28. .web(WebApplicationType.NONE)
  29. .run(args);
  30. //SpringApplication.run(Aws310Application.class, args);
  31. }
  32. /**
  33. * spring boot启动后,会进入该方法
  34. */
  35. @Override
  36. public void run(String... args) throws Exception {
  37. init();
  38. //加载分钟监听线程
  39. new Thread("Dipper-recevice-thread") {
  40. @Override
  41. public void run() {
  42. super.run();
  43. server.start(Consts.SERVER_PORT);
  44. }
  45. }.start();
  46. }
  47. /**
  48. * 初始化信息
  49. */
  50. @SuppressWarnings("unused")
  51. private void init() {
  52. Consts.SERVER_PORT = nettyConfig.getPort();
  53. Consts.MODULE_TYPE = nettyConfig.getModuleType();
  54. Consts.STATION_TYPE_ID = stationTypeId;
  55. Consts.MAX_BUFF_LEN = maxBuffLen;
  56. }
  57. }
  1. package com.cloudtech.config;
  2. import org.springframework.beans.factory.annotation.Value;
  3. import org.springframework.context.annotation.PropertySource;
  4. import org.springframework.stereotype.Component;
  5. /**
  6. * 读取yml配置文件中的信息
  7. * @ClassName: NettyConfig
  8. * @Description:
  9. * @author wude
  10. * @date 2018年9月19日
  11. *
  12. */
  13. @Component
  14. @PropertySource(value={"classpath:application.yml"})
  15. public class NettyConfig {
  16. @Value("${netty.port}")
  17. private int port;
  18. @Value("${netty.module-type}")
  19. private int moduleType;
  20. public int getPort() {
  21. return port;
  22. }
  23. public void setPort(int port) {
  24. this.port = port;
  25. }
  26. public int getModuleType() {
  27. return moduleType;
  28. }
  29. public void setModuleType(int moduleType) {
  30. this.moduleType = moduleType;
  31. }
  32. }
  1. package com.cloudtech.util;
  2. import com.cloudtech.enums.ModuleType;
  3. public class Consts {
  4. /** 新版本服务器端口号 */
  5. public static Integer SERVER_PORT = 4000;
  6. /** 保存session的的属性值 */
  7. public static final String PILE_ID = "pileId";
  8. /** 请求成功 **/
  9. public static Integer SUCCESS = 1;
  10. /** 请求失败 **/
  11. public static Integer FAILED = 0;
  12. /** 主模块 */
  13. public static Integer MODULE_TYPE = ModuleType.MASTER.getCode();
  14. /** 模块类型id */
  15. public static Integer STATION_TYPE_ID = 7;
  16. /** 设置最大缓存长度 */
  17. public static Integer MAX_BUFF_LEN=10240;
  18. }
  1. package com.cloudtech.server;
  2. import javax.annotation.Resource;
  3. import org.slf4j.Logger;
  4. import org.slf4j.LoggerFactory;
  5. import org.springframework.beans.factory.annotation.Autowired;
  6. import org.springframework.stereotype.Component;
  7. import io.netty.bootstrap.ServerBootstrap;
  8. import io.netty.channel.ChannelFuture;
  9. import io.netty.channel.ChannelInitializer;
  10. import io.netty.channel.ChannelOption;
  11. import io.netty.channel.EventLoopGroup;
  12. import io.netty.channel.nio.NioEventLoopGroup;
  13. import io.netty.channel.socket.SocketChannel;
  14. import io.netty.channel.socket.nio.NioServerSocketChannel;
  15. import io.netty.handler.codec.string.StringDecoder;
  16. import io.netty.handler.timeout.IdleStateHandler;
  17. /**
  18. *
  19. * @ClassName: Server
  20. * @Description:netty服务端
  21. * @author wude
  22. * @date 201897
  23. *
  24. */
  25. @Component
  26. public class Server {
  27. protected static final Logger logger = LoggerFactory.getLogger(Server.class);
  28. @Autowired
  29. private ServerHandler serverHandler;
  30. /**
  31. * 启动
  32. */
  33. public void start(int port) {
  34. // 服务类
  35. ServerBootstrap b = new ServerBootstrap();
  36. // 创建boss和worker
  37. EventLoopGroup bossGroup = new NioEventLoopGroup();
  38. EventLoopGroup workerGroup = new NioEventLoopGroup();
  39. //业务线程池,实现消息串行化
  40. EventLoopGroup busyGroup = new NioEventLoopGroup();
  41. try {
  42. // 设置循环线程组事例
  43. b.group(bossGroup, workerGroup);
  44. // 设置channel工厂
  45. b.channel(NioServerSocketChannel.class);
  46. // 设置管道
  47. b.childHandler(new ChannelInitializer<SocketChannel>() {
  48. @Override
  49. public void initChannel(SocketChannel ch) throws Exception {
  50. //第一个参数 读超时
  51. //第二个参数 写超时
  52. //第三个参数读写超时
  53. ch.pipeline().addLast(new IdleStateHandler(5, 5, 60));
  54. ch.pipeline().addLast(new RequestDecoder());
  55. //ch.pipeline().addLast(new DelimiterBasedFrameDecoder(Consts.MAX_BUFF_LEN,delimiter));
  56. ch.pipeline().addLast(new StringDecoder());
  57. ch.pipeline().addLast(busyGroup,serverHandler);
  58. }
  59. });
  60. b.option(ChannelOption.SO_BACKLOG, 2048);//serverSocketchannel的设置,链接缓冲池的大小
  61. b.childOption(ChannelOption.SO_KEEPALIVE, true);//socketchannel的设置,维持链接的活跃,清除死链接
  62. b.childOption(ChannelOption.TCP_NODELAY, true);//socketchannel的设置,关闭延迟发送
  63. logger.info("启动Dipper采集成功!port={}",port);
  64. //绑定端口
  65. ChannelFuture future = b.bind(port);
  66. //等待服务端关闭
  67. future.channel().closeFuture().sync();
  68. } catch (Exception e) {
  69. e.printStackTrace();
  70. } finally{
  71. //释放资源
  72. bossGroup.shutdownGracefully();
  73. workerGroup.shutdownGracefully();
  74. busyGroup.shutdownGracefully();
  75. }
  76. }
  77. }

2.2.3  编码器 

  1. package com.cloudtech.server;
  2. import java.util.List;
  3. import org.slf4j.Logger;
  4. import org.slf4j.LoggerFactory;
  5. import com.cloudtech.util.BufferUtil;
  6. import com.cloudtech.web.vo.RequestVo;
  7. import io.netty.buffer.ByteBuf;
  8. import io.netty.channel.ChannelHandlerContext;
  9. import io.netty.handler.codec.ByteToMessageDecoder;
  10. /**
  11. * 数据包解码器
  12. * <pre>
  13. * 数据包格式
  14. * +——----——--------+——-----——+——----———+——----——-+-----------+---------+--------+---------|--------|-------
  15. * |内容(5)|整个包长度(n)|用户地址 (3)|信息类别(1)|发信方地址(3)| 发信时间(2)|电文长度(n-20)|电文内容|CRC校验(1)|校验和(1)
  16. * +——----——--------+——-----——+——----———+——----——-+-----------+---------+--------+---------|--------|-------
  17. *
  18. *
  19. * </pre>
  20. * 一个完整包的长度至少为20
  21. *
  22. * @author wude
  23. * @date 201897
  24. *
  25. */
  26. public class RequestDecoder extends ByteToMessageDecoder {
  27. protected static Logger LOGGER = LoggerFactory.getLogger(RequestDecoder.class);
  28. /**
  29. * 数据包基本长度
  30. */
  31. public static int BASE_LENTH = 20;
  32. @Override
  33. protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) throws Exception {
  34. //防止socket攻击,也就是说可能会存在有人恶意攻击程序,定义Integer最大值,向服务器发请求,造成服务器cpu,内存的开销增多
  35. try {
  36. if(buffer.readableBytes() > 2048){
  37. buffer.skipBytes(buffer.readableBytes());
  38. }
  39. /* while(true){*/
  40. if(buffer.readableBytes() >= BASE_LENTH){ //包最小长度应为20
  41. //第一个可读数据包的起始位置
  42. while(true) {
  43. //标记初始读游标位置
  44. buffer.markReaderIndex();
  45. String str = BufferUtil.readFixLenAscStr(buffer, 5);
  46. //判断头部
  47. if(str.equals("$TXXX")){ //通讯信息
  48. buffer.resetReaderIndex();
  49. break;
  50. }
  51. //未读到包头标识略过一个字节
  52. buffer.resetReaderIndex();
  53. buffer.readByte();
  54. //不满足
  55. if(buffer.readableBytes() < BASE_LENTH - 5){
  56. return;
  57. }
  58. }
  59. //标记初始读游标位置
  60. buffer.markReaderIndex();
  61. //内容
  62. String content = BufferUtil.readFixLenAscStr(buffer, 5);
  63. //长度
  64. Short len = buffer.readShort();
  65. if(buffer.readableBytes() <len-7){
  66. return;
  67. }
  68. RequestVo requestVo = new RequestVo();
  69. //地址
  70. String userAddress =BufferUtil.readFixHexLenStr(buffer, 3);
  71. //信息类别
  72. byte infoType = buffer.readByte();
  73. //发信方地
  74. String sendStr = BufferUtil.readFixHexLenStr(buffer, 3);
  75. //发信时间
  76. byte hh = buffer.readByte();
  77. byte mm = buffer.readByte();
  78. //电文长度
  79. int contenLen = buffer.readShort() / 8;
  80. if(len !=(contenLen + 20) || contenLen <= 78){
  81. //说明报文长度不对
  82. buffer.resetReaderIndex();
  83. BufferUtil.readFixLen(buffer, len);
  84. return;
  85. }
  86. //数据
  87. byte[] datas = new byte[contenLen];
  88. buffer.readBytes(datas);
  89. //CRC校验
  90. byte crc = buffer.readByte();
  91. //校验和
  92. byte checkSum = buffer.readByte();
  93. requestVo.setContent(content);
  94. requestVo.setLen(len.intValue());
  95. requestVo.setUserAddress(userAddress);
  96. requestVo.setInfoType(infoType);
  97. requestVo.setSendAddress(sendStr);
  98. requestVo.setHh(hh);
  99. requestVo.setMm(mm);
  100. requestVo.setContenLen(contenLen);
  101. requestVo.setData(datas);
  102. requestVo.setCrc(crc);
  103. requestVo.setCheckSum(checkSum);
  104. buffer.resetReaderIndex();
  105. //原始报文
  106. String bytes2Hex = BufferUtil.readFixHexLen(buffer, len);
  107. requestVo.setReportStr(bytes2Hex);
  108. out.add(requestVo);
  109. }else{
  110. //数据不完整,等待完整的数据包
  111. return;
  112. }
  113. /* }*/
  114. //数据不完整,等待完整的数据包
  115. return;
  116. } catch (Exception e) {
  117. return;
  118. }
  119. }
  120. }

 2.2.4 业务处理类

  1. package com.cloudtech.server;
  2. import java.net.InetSocketAddress;
  3. import org.slf4j.Logger;
  4. import org.slf4j.LoggerFactory;
  5. import org.springframework.stereotype.Component;
  6. import com.cloudtech.server.comm.ModuleId;
  7. import com.cloudtech.server.scanner.Invoker;
  8. import com.cloudtech.server.scanner.InvokerHoler;
  9. import com.cloudtech.server.serial.BufferFactory;
  10. import com.cloudtech.server.session.Session;
  11. import com.cloudtech.server.session.SessionImpl;
  12. import com.cloudtech.util.BufferUtil;
  13. import com.cloudtech.web.dubbo.BaseDataResult;
  14. import com.cloudtech.web.vo.RequestVo;
  15. import com.cloudtech.web.vo.StationVo;
  16. import io.netty.buffer.ByteBuf;
  17. import io.netty.channel.ChannelHandlerContext;
  18. import io.netty.channel.SimpleChannelInboundHandler;
  19. /**
  20. *
  21. * @ClassName: SessionManager
  22. * @Description: 消息接受处理类
  23. * @author wude
  24. * @date 2018年9月7日
  25. */
  26. @Component
  27. public class ServerHandler extends SimpleChannelInboundHandler<RequestVo> {
  28. private static Logger LOGGER = LoggerFactory.getLogger(ServerHandler.class);
  29. @Reference(version = "${demo.service.version}",timeout=30000,check=false)
  30. DubboService dubboService;
  31. @Override
  32. protected void messageReceived(ChannelHandlerContext ctx, RequestVo requestVo) throws Exception {
  33. System.out.println(requestVo.toString());
  34. byte[] data = requestVo.getData();
  35. ByteBuf buffer = BufferFactory.getBuffer(data);
  36. String stationNum = BufferUtil.readFixLenStr(buffer, 5);
  37. System.out.println(stationNum);
  38. }
  39. }

@Component标识加入容器中

可再接收内中,调用dubboserver接口

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家小花儿/article/detail/854840
推荐阅读
相关标签
  

闽ICP备14008679号