赞
踩
为了能快速开发netty集成springboot整合tcp项目,本篇博客搭建勒一个空的架子,只需要修改对应的业务逻辑代码就行,很好的提高勒开发速度,降低勒netty开发的难度。开发人员,只需要关注于业务逻辑就行。
建议:有一定的netty和springboot,dubbbo基础。
下载socket调试工具
给一个测试的demo,十六进制字符串
2454585858007A0E320460047A3F00000330473336303020646D67642000000BE7130C040F0A00000000002D00390021004D0024004B0026005200300090060026005309000000000000000000000000000004AA04AA0A04A80700484A05470700278227830427820A000057B800559E0050C50500840D0A0000
- <?xml version="1.0" encoding="UTF-8"?>
- <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
-
- <groupId>com.cloudtech</groupId>
- <artifactId>dipper</artifactId>
- <version>0.0.1-SNAPSHOT</version>
- <packaging>jar</packaging>
-
- <name>dipper</name>
- <description>Demo project for Spring Boot</description>
-
- <parent>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-parent</artifactId>
- <version>2.0.5.RELEASE</version>
- <relativePath /> <!-- lookup parent from repository -->
- </parent>
-
- <properties>
- <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
- <java.version>1.8</java.version>
- </properties>
-
- <dependencies>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-web</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-test</artifactId>
- <scope>test</scope>
- </dependency>
-
- <!-- https://mvnrepository.com/artifact/com.alibaba.boot/dubbo-spring-boot-starter -->
- <dependency>
- <groupId>com.alibaba.boot</groupId>
- <artifactId>dubbo-spring-boot-starter</artifactId>
- <version>0.2.0</version>
- </dependency>
-
-
- <!-- netty -->
- <dependency>
- <groupId>io.netty</groupId>
- <artifactId>netty-all</artifactId>
- <version>5.0.0.Alpha2</version>
- </dependency>
-
- <!-- 配置提示 -->
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-configuration-processor</artifactId>
- <optional>true</optional>
- </dependency>
- </dependencies>
-
- <build>
- <!-- 打成jar包的名称 -->
- <finalName>dipper</finalName>
- <plugins>
- <!-- 解决web.xml报错的问题 -->
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-war-plugin</artifactId>
- <configuration>
- <failOnMissingWebXml>false</failOnMissingWebXml>
- </configuration>
- </plugin>
-
- <plugin>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-maven-plugin </artifactId>
- </plugin>
- <plugin>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-maven-plugin</artifactId>
- <executions>
- <execution>
- <goals>
- <goal>repackage</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
- </plugins>
- </build>
- </project>
- ####自定义配置
- netty:
- port: 4002 ###分钟监听端口
- module-type: 0 ####0标识主通讯模块 1标识从通讯模块
-
-
- stationTypeId: 17
- ###最大缓存长度
- maxBuffLen: 10240
-
- logging:
- config: classpath:logback-boot.xml
- ####dubbot版本
- demo:
- service:
- version: 1.0.0
-
- dubbo:
- reference:
- check: false
- com:
- cloudtech:
- web:
- dubbo:
- check: false
- consumers:
- check: false
- scan:
- basePackages: com.cloudtech.web.dubbo
- application:
- id: dipper-provider-demo
- name: dipper-provider-demo
- provider:
- retries: -1
- timeout: 30000000
- protocol:
- id: dubbo
- name: dubbo
- port: -1
- registry:
- protocol: zookeeper
- check: false
- address: zookeeper://127.0.0.1:2181
- package com.cloudtech.demo;
-
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.boot.CommandLineRunner;
- import org.springframework.boot.WebApplicationType;
- import org.springframework.boot.autoconfigure.SpringBootApplication;
- import org.springframework.boot.builder.SpringApplicationBuilder;
- import org.springframework.context.annotation.ComponentScan;
-
- import com.cloudtech.config.NettyConfig;
- import com.cloudtech.server.Server;
- import com.cloudtech.util.Consts;
-
- @SpringBootApplication
- @ComponentScan(value = {"com.cloudtech"})
- public class DipperApplication implements CommandLineRunner {
- protected static final Logger LOGGER = LoggerFactory.getLogger(DipperApplication.class);
-
- @Autowired
- NettyConfig nettyConfig;
-
- @Autowired
- Server server;
-
- @Value("${stationTypeId}")
- private Integer stationTypeId;
- @Value("${maxBuffLen}")
- private Integer maxBuffLen;
-
- public static void main(String[] args) {
- new SpringApplicationBuilder(DipperApplication.class)
- .web(WebApplicationType.NONE)
- .run(args);
- //SpringApplication.run(Aws310Application.class, args);
- }
-
- /**
- * spring boot启动后,会进入该方法
- */
- @Override
- public void run(String... args) throws Exception {
- init();
- //加载分钟监听线程
- new Thread("Dipper-recevice-thread") {
- @Override
- public void run() {
- super.run();
- server.start(Consts.SERVER_PORT);
- }
- }.start();
- }
-
- /**
- * 初始化信息
- */
- @SuppressWarnings("unused")
- private void init() {
- Consts.SERVER_PORT = nettyConfig.getPort();
- Consts.MODULE_TYPE = nettyConfig.getModuleType();
- Consts.STATION_TYPE_ID = stationTypeId;
- Consts.MAX_BUFF_LEN = maxBuffLen;
- }
-
-
- }
- package com.cloudtech.config;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.context.annotation.PropertySource;
- import org.springframework.stereotype.Component;
-
- /**
- * 读取yml配置文件中的信息
- * @ClassName: NettyConfig
- * @Description:
- * @author wude
- * @date 2018年9月19日
- *
- */
- @Component
- @PropertySource(value={"classpath:application.yml"})
- public class NettyConfig {
- @Value("${netty.port}")
- private int port;
- @Value("${netty.module-type}")
- private int moduleType;
-
- public int getPort() {
- return port;
- }
-
- public void setPort(int port) {
- this.port = port;
- }
-
- public int getModuleType() {
- return moduleType;
- }
-
- public void setModuleType(int moduleType) {
- this.moduleType = moduleType;
- }
- }
- package com.cloudtech.util;
-
- import com.cloudtech.enums.ModuleType;
-
- public class Consts {
- /** 新版本服务器端口号 */
- public static Integer SERVER_PORT = 4000;
- /** 保存session的的属性值 */
- public static final String PILE_ID = "pileId";
- /** 请求成功 **/
- public static Integer SUCCESS = 1;
- /** 请求失败 **/
- public static Integer FAILED = 0;
- /** 主模块 */
- public static Integer MODULE_TYPE = ModuleType.MASTER.getCode();
- /** 模块类型id */
- public static Integer STATION_TYPE_ID = 7;
- /** 设置最大缓存长度 */
- public static Integer MAX_BUFF_LEN=10240;
- }
- package com.cloudtech.server;
-
- import javax.annotation.Resource;
-
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Component;
-
- import io.netty.bootstrap.ServerBootstrap;
- import io.netty.channel.ChannelFuture;
- import io.netty.channel.ChannelInitializer;
- import io.netty.channel.ChannelOption;
- import io.netty.channel.EventLoopGroup;
- import io.netty.channel.nio.NioEventLoopGroup;
- import io.netty.channel.socket.SocketChannel;
- import io.netty.channel.socket.nio.NioServerSocketChannel;
- import io.netty.handler.codec.string.StringDecoder;
- import io.netty.handler.timeout.IdleStateHandler;
-
- /**
- *
- * @ClassName: Server
- * @Description:netty服务端
- * @author wude
- * @date 2018年9月7日
- *
- */
- @Component
- public class Server {
- protected static final Logger logger = LoggerFactory.getLogger(Server.class);
- @Autowired
- private ServerHandler serverHandler;
-
-
- /**
- * 启动
- */
- public void start(int port) {
-
- // 服务类
- ServerBootstrap b = new ServerBootstrap();
-
- // 创建boss和worker
- EventLoopGroup bossGroup = new NioEventLoopGroup();
- EventLoopGroup workerGroup = new NioEventLoopGroup();
- //业务线程池,实现消息串行化
- EventLoopGroup busyGroup = new NioEventLoopGroup();
-
- try {
- // 设置循环线程组事例
- b.group(bossGroup, workerGroup);
-
- // 设置channel工厂
- b.channel(NioServerSocketChannel.class);
-
- // 设置管道
- b.childHandler(new ChannelInitializer<SocketChannel>() {
- @Override
- public void initChannel(SocketChannel ch) throws Exception {
- //第一个参数 读超时
- //第二个参数 写超时
- //第三个参数读写超时
- ch.pipeline().addLast(new IdleStateHandler(5, 5, 60));
- ch.pipeline().addLast(new RequestDecoder());
- //ch.pipeline().addLast(new DelimiterBasedFrameDecoder(Consts.MAX_BUFF_LEN,delimiter));
- ch.pipeline().addLast(new StringDecoder());
- ch.pipeline().addLast(busyGroup,serverHandler);
- }
- });
-
- b.option(ChannelOption.SO_BACKLOG, 2048);//serverSocketchannel的设置,链接缓冲池的大小
- b.childOption(ChannelOption.SO_KEEPALIVE, true);//socketchannel的设置,维持链接的活跃,清除死链接
- b.childOption(ChannelOption.TCP_NODELAY, true);//socketchannel的设置,关闭延迟发送
-
- logger.info("启动Dipper采集成功!port={}",port);
- //绑定端口
- ChannelFuture future = b.bind(port);
-
- //等待服务端关闭
- future.channel().closeFuture().sync();
-
- } catch (Exception e) {
- e.printStackTrace();
- } finally{
- //释放资源
- bossGroup.shutdownGracefully();
- workerGroup.shutdownGracefully();
- busyGroup.shutdownGracefully();
- }
- }
-
- }
- package com.cloudtech.server;
-
- import java.util.List;
-
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
-
- import com.cloudtech.util.BufferUtil;
- import com.cloudtech.web.vo.RequestVo;
-
- import io.netty.buffer.ByteBuf;
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.handler.codec.ByteToMessageDecoder;
- /**
- * 数据包解码器
- * <pre>
- * 数据包格式
- * +——----——--------+——-----——+——----———+——----——-+-----------+---------+--------+---------|--------|-------
- * |内容(5)|整个包长度(n)|用户地址 (3)|信息类别(1)|发信方地址(3)| 发信时间(2)|电文长度(n-20)|电文内容|CRC校验(1)|校验和(1)
- * +——----——--------+——-----——+——----———+——----——-+-----------+---------+--------+---------|--------|-------
- *
- *
- * </pre>
- * 一个完整包的长度至少为20
- *
- * @author wude
- * @date 2018年9月7日
- *
- */
- public class RequestDecoder extends ByteToMessageDecoder {
- protected static Logger LOGGER = LoggerFactory.getLogger(RequestDecoder.class);
-
- /**
- * 数据包基本长度
- */
- public static int BASE_LENTH = 20;
-
- @Override
- protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) throws Exception {
- //防止socket攻击,也就是说可能会存在有人恶意攻击程序,定义Integer最大值,向服务器发请求,造成服务器cpu,内存的开销增多
- try {
- if(buffer.readableBytes() > 2048){
- buffer.skipBytes(buffer.readableBytes());
- }
- /* while(true){*/
- if(buffer.readableBytes() >= BASE_LENTH){ //包最小长度应为20
- //第一个可读数据包的起始位置
- while(true) {
- //标记初始读游标位置
- buffer.markReaderIndex();
-
- String str = BufferUtil.readFixLenAscStr(buffer, 5);
- //判断头部
- if(str.equals("$TXXX")){ //通讯信息
- buffer.resetReaderIndex();
- break;
- }
- //未读到包头标识略过一个字节
- buffer.resetReaderIndex();
- buffer.readByte();
- //不满足
- if(buffer.readableBytes() < BASE_LENTH - 5){
- return;
- }
- }
- //标记初始读游标位置
- buffer.markReaderIndex();
-
- //内容
- String content = BufferUtil.readFixLenAscStr(buffer, 5);
- //长度
- Short len = buffer.readShort();
-
- if(buffer.readableBytes() <len-7){
- return;
- }
- RequestVo requestVo = new RequestVo();
- //地址
- String userAddress =BufferUtil.readFixHexLenStr(buffer, 3);
- //信息类别
- byte infoType = buffer.readByte();
- //发信方地
- String sendStr = BufferUtil.readFixHexLenStr(buffer, 3);
- //发信时间
- byte hh = buffer.readByte();
- byte mm = buffer.readByte();
- //电文长度
- int contenLen = buffer.readShort() / 8;
- if(len !=(contenLen + 20) || contenLen <= 78){
- //说明报文长度不对
- buffer.resetReaderIndex();
- BufferUtil.readFixLen(buffer, len);
- return;
- }
- //数据
- byte[] datas = new byte[contenLen];
- buffer.readBytes(datas);
- //CRC校验
- byte crc = buffer.readByte();
- //校验和
- byte checkSum = buffer.readByte();
- requestVo.setContent(content);
- requestVo.setLen(len.intValue());
- requestVo.setUserAddress(userAddress);
- requestVo.setInfoType(infoType);
- requestVo.setSendAddress(sendStr);
- requestVo.setHh(hh);
- requestVo.setMm(mm);
- requestVo.setContenLen(contenLen);
-
- requestVo.setData(datas);
- requestVo.setCrc(crc);
- requestVo.setCheckSum(checkSum);
-
- buffer.resetReaderIndex();
- //原始报文
- String bytes2Hex = BufferUtil.readFixHexLen(buffer, len);
- requestVo.setReportStr(bytes2Hex);
-
- out.add(requestVo);
- }else{
- //数据不完整,等待完整的数据包
- return;
- }
- /* }*/
- //数据不完整,等待完整的数据包
- return;
- } catch (Exception e) {
- return;
- }
- }
- }
- package com.cloudtech.server;
-
- import java.net.InetSocketAddress;
-
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.stereotype.Component;
-
- import com.cloudtech.server.comm.ModuleId;
- import com.cloudtech.server.scanner.Invoker;
- import com.cloudtech.server.scanner.InvokerHoler;
- import com.cloudtech.server.serial.BufferFactory;
- import com.cloudtech.server.session.Session;
- import com.cloudtech.server.session.SessionImpl;
- import com.cloudtech.util.BufferUtil;
- import com.cloudtech.web.dubbo.BaseDataResult;
- import com.cloudtech.web.vo.RequestVo;
- import com.cloudtech.web.vo.StationVo;
-
- import io.netty.buffer.ByteBuf;
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.channel.SimpleChannelInboundHandler;
-
- /**
- *
- * @ClassName: SessionManager
- * @Description: 消息接受处理类
- * @author wude
- * @date 2018年9月7日
- */
- @Component
- public class ServerHandler extends SimpleChannelInboundHandler<RequestVo> {
- private static Logger LOGGER = LoggerFactory.getLogger(ServerHandler.class);
- @Reference(version = "${demo.service.version}",timeout=30000,check=false)
- DubboService dubboService;
-
- @Override
- protected void messageReceived(ChannelHandlerContext ctx, RequestVo requestVo) throws Exception {
- System.out.println(requestVo.toString());
- byte[] data = requestVo.getData();
- ByteBuf buffer = BufferFactory.getBuffer(data);
- String stationNum = BufferUtil.readFixLenStr(buffer, 5);
- System.out.println(stationNum);
- }
- }
@Component标识加入容器中
可再接收内中,调用dubboserver接口
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。