赞
踩
软件下载
网络调试工具
NetServerOptions options = new NetServerOptions();
options.setRegisterWriteHandler(true);
options.setTcpKeepAlive(true); // 是否存活
// options.setIdleTimeout(5); // 配置了 客户端会自动断连,重新连接
<!-- vertx tcp开发依赖 --> <dependency> <groupId>io.vertx</groupId> <artifactId>vertx-core</artifactId> <version>4.3.1</version> </dependency> <!-- spring boot 依赖 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <!-- 其他工具类依赖 --> <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>31.1-jre</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> <dependency> <groupId>cn.hutool</groupId> <artifactId>hutool-all</artifactId> <version>5.8.11</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>2.0.8.graal</version> </dependency> <dependency> <groupId>com.alibaba.fastjson2</groupId> <artifactId>fastjson2</artifactId> <version>2.0.25</version> </dependency>
server.connectHandler
服务端创建连接后的操作socket.writeHandlerID()
获取 clientid
socket.handler
接收客户端发送的消息vertx.setPeriodic
设置定时器 通过定时器,服务端主动定时给服务端发送消息socket.closeHandler
监测客户端是否断开连接 server.listen
监听指定的端口号,启动TcpServer使用 vertx.createNetServer
创建 netServer
当客户端连入后,服务端会通过配置的定时发送消息,给客户端发送消息,客户端断开连接后,会通过socket.closeHandler
监测,检测到取消定时器,不再向客户端发送消息
package com.example.socketdemo.socket5; import cn.hutool.core.util.CharsetUtil; import io.vertx.core.AbstractVerticle; import io.vertx.core.Vertx; import io.vertx.core.buffer.Buffer; import io.vertx.core.net.NetServer; import io.vertx.core.net.NetServerOptions; import lombok.extern.slf4j.Slf4j; import java.time.LocalDateTime; @Slf4j public class VertxTCPServer extends AbstractVerticle { @Override public void start() { // 可以配置 ip 端口号 NetServerOptions netServerOptions = new NetServerOptions().setPort(9090).setHost("127.0.0.1"); // 创建TCP服务器 NetServer server = vertx.createNetServer(netServerOptions); // 处理连接请求 server.connectHandler(socket -> { String handlerID = socket.writeHandlerID(); System.out.println("服务端id" + handlerID); // 接收客户端发送的消息 socket.handler(buff -> { System.out.println("RECV: " + buff.length()); // 需要回复消息可以在这发送 socket.write(Buffer.buffer("TcpServer Receive -=== Hello Vertx from Server!")); }); // 创建服务端发送消息定时器 long id = vertx.setPeriodic(10000, res -> { socket.write(Buffer.buffer("Hello Vertx from Server!......"), ar -> { if (ar.succeeded()) { socket.handler(buffer -> { // 在这里应该解析报文,封装为协议对象,并找到响应的处理类,得到处理结果,并响应 log.info("TcpServer接收到的数据为:" + buffer.toString(CharsetUtil.CHARSET_GBK) + " " + LocalDateTime.now()); //todo 收到客户端消息后做其他操作 }); } else { log.error("写入数据失败!"); // todo 发送给客户端消息失败 记录日志或其他操作 } }); }); // 监听客户端的退出连接 socket.closeHandler(close -> { System.out.println("客户端退出连接"+ socket.writeHandlerID()); // 取消定时器 不再向退出连接的客户端发送信息 vertx.cancelTimer(id); }); }); // 监听端口 server.listen(9090, "127.0.0.1", res -> { if (res.succeeded()) { log.info(" 服务器启动成功"); } else { log.error(" 服务器启动失败 {}", res.cause().getMessage()); } }); } public static void main(String[] args) { // 启动服务端 spring boot 可以使用 ApplicationRunner 启动 Vertx.vertx().deployVerticle(new VertxTCP2Server()); } }
注意 这里有两个 socket.handler
接收消息的方法,客户端创建连接后第一个,定时器里面第二个,第一个接收创建连接后的第一条消息,可以理解为注册包
, 后面客户端发送的消息都在第二个 socket.handler
接收
启动完服务端之后,我们可以通过 网络调试助手
启动客户端,进行消息发送接收测试
使用 public static void main
进行简单的启动测试
整合spring Boot
可以使用 ApplicationRunner
代码如下
@Component
public class ThreadApplicationRuner implements ApplicationRunner {
@Override
public void run(ApplicationArguments args) throws Exception {
Vertx.vertx().deployVerticle(new VertxTCP2Server());
}
}
使用 vertx.createNetClient
创建客户端
client.connect
客户端进行连接 socket.handler
连接成功后 接收客户端消息启动同 TcpServer
可以使用 ApplicationRunner
package com.example.socketdemo.socket5; import cn.hutool.core.util.CharsetUtil; import io.vertx.core.AbstractVerticle; import io.vertx.core.Vertx; import io.vertx.core.buffer.Buffer; import io.vertx.core.net.NetClient; import io.vertx.core.net.NetClientOptions; import io.vertx.core.net.NetSocket; import lombok.extern.slf4j.Slf4j; import java.time.LocalDateTime; @Slf4j public class VertxTCPClient extends AbstractVerticle { @Override public void start() { NetClientOptions options = new NetClientOptions().setConnectTimeout(10000); NetClient client = vertx.createNetClient(options); client.connect(11203, "192.168.2.141", res -> { if (res.succeeded()) { System.out.println("Connected!"); NetSocket socket = res.result(); socket.handler(buffer -> { // // 在这里应该解析报文,封装为协议对象,并找到响应的处理类,得到处理结果,并响应 log.info("tcpClient 接收到的数据为:" + buffer.toString(CharsetUtil.CHARSET_GBK) + " " + LocalDateTime.now()); // String heartStr = HexUtil.encodeHexStr(buffer.toString(CharsetUtil.CHARSET_GBK).getBytes(CharsetUtil.CHARSET_GBK)).toUpperCase(); // System.out.println("接收到的数据为 -HEX:" + HexUtil.encodeHexStr(buffer.toString(CharsetUtil.CHARSET_GBK).getBytes(CharsetUtil.CHARSET_GBK)).toUpperCase()); // 收到客户端消息后回复客户端信息 socket.write(Buffer.buffer("tcpClient send ====> Hello Vertx from Server!")); }); } else { System.out.println("Failed to connect: " + res.cause().getMessage()); } }); } public static void main(String[] args) { Vertx.vertx().deployVerticle(new VertxTCPClient()); } }
有时我们需要向tcp客户端发送十六进制的数据,直接发送的时候会发现没有数据返回 下面代码是以 十六进制格式发送信息, 利用 hutool
工具里
使用 HexUtil.decodeHex(sendMsg)
将字符串转换成十六进制的字节码 进行发送,测试是否返回数据
socket.write(Buffer.buffer(HexUtil.decodeHex(sendMsg)), resp -> {
if (resp.succeeded()) {
log.info("服务端 发送消息成功 {}", sendDeHex);
} else {
log.error("发送消息失败");
}
});
接 5g路由器配置
tcp 接收消息的时候会有拆包粘包问题,可能接收的消息被分开,或者多条消息在一起接收,所以指令一般会有首部跟尾部进行区分,vertx也是异步编程,所以接收的时候需要改同步接收
具体代码图下,使用 Record Parser
解析
这里要点是
socket.write(Buffer.buffer(HexUtil.decodeHex(sendMsg.get()))).onComplete
同步发送消息RecordParser parser = RecordParser.newDelimited
使用解析器确定结尾命令 防止粘包 拆包package com.example.socket.tcp2; import cn.hutool.core.thread.ThreadUtil; import cn.hutool.core.util.CharsetUtil; import cn.hutool.core.util.HexUtil; import com.example.socket.tcp1.BufferWrapper; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import io.vertx.core.AbstractVerticle; import io.vertx.core.Promise; import io.vertx.core.Vertx; import io.vertx.core.VertxOptions; import io.vertx.core.buffer.Buffer; import io.vertx.core.net.NetServer; import io.vertx.core.net.NetServerOptions; import io.vertx.core.net.NetSocket; import io.vertx.core.parsetools.RecordParser; import lombok.extern.slf4j.Slf4j; import java.time.LocalDateTime; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @Slf4j public class VertxTCPServer extends AbstractVerticle { public final static Map<String, NetSocket> CLIENT_MAP = Maps.newHashMap(); public static final String host = "192.168.2.141"; public static final Integer port = 10123; public static final String REGISTER_MSG = "心跳包"; public static final String HEART_MSG = "注册包"; @Override public void start() { //测试指令 这里模拟发送给tcp客户端的指令 String sendMsg = "指令1"; String sendMsg2 = "指令2"; List<String> msgList = Lists.newArrayList(); msgList.add(sendMsg); msgList.add(sendMsg2); //配置 NetServerOptions netServerOptions = new NetServerOptions(); netServerOptions.setRegisterWriteHandler(true); netServerOptions.setTcpKeepAlive(true); // 创建TCP服务器 NetServer server = vertx.createNetServer(netServerOptions); // 处理连接请求 server.connectHandler(socket -> { String handlerId = socket.writeHandlerID(); log.info("客户端接入===》 {}", handlerId); CLIENT_MAP.put(handlerId, socket); // 接收客户端发送的消息 socket.handler(buff -> { log.info("==接收串口指令 GBK:" + buff.toString(CharsetUtil.CHARSET_GBK)); }); // 监听客户端的退出连接 socket.closeHandler(close -> { CLIENT_MAP.remove(socket.writeHandlerID()); log.info("客户端退出连接 {}" , LocalDateTime.now()); // 取消定时器 不再向退出连接的客户端发送信息 }); }); // 监听端口 server.listen(port, host, res -> { if (res.succeeded()) { System.out.println(port + " 服务器启动成功"); } else { log.error("连接失败===》 {} {}" ,port, res.cause().getMessage()); } }); // 定时给客户端发送指令 long timerId = vertx.setPeriodic(18000, res -> { CLIENT_MAP.forEach((key, socket) -> receiveMsg(socket, msgList)); }); } private static void receiveMsg(NetSocket socket, List<String> msgList) { int initialIndex = 0; sendNextMessage(socket, msgList, initialIndex); } private static void sendNextMessage(NetSocket socket, List<String> msgList, int currentIndex) { if (currentIndex < msgList.size()) { String nextMsg = msgList.get(currentIndex); AtomicReference<String> sendMsg = new AtomicReference<>(nextMsg); System.out.println(sendMsg.get()); // 1. 使用 onComplete 做同步发送消息 // 2. 根据接收发送格式 选择时候使用十六进制发送 HexUtil.decodeHex(sendMsg.get())) socket.write(Buffer.buffer(HexUtil.decodeHex(sendMsg.get()))).onComplete(ar1 -> { if (ar1.succeeded()) { ThreadUtil.sleep(1000); final RecordParser parser = RecordParser.newDelimited(Buffer.buffer(HexUtil.decodeHex("EE")), receive -> { if (receive.toString().equals(REGISTER_MSG) || receive.toString().equals(HEART_MSG)) { return; } String heartStr = HexUtil.encodeHexStr(receive.getBytes(), false).toUpperCase(); System.out.println("接收到的消息: " + sendMsg.get() +" " + LocalDateTime.now() + " - " + heartStr); // todo 具体逻辑处理 // 发送下一条消息 sendNextMessage(socket, msgList, currentIndex + 1); }); socket.handler(parser); } else { // 处理发送失败的情况 log.error("发送失败"); } }); } } public static void main(String[] args) { VertxOptions vertxOptions = new VertxOptions(); vertxOptions.setBlockedThreadCheckInterval(999888777666L); vertxOptions.setMaxEventLoopExecuteTime(999888777666L); Vertx.vertx(vertxOptions).deployVerticle(new VertxTCPServer()); Vertx.vertx().close(); } }
经过测试 vertx使用 socket.write(Buffer.buffer(HexUtil.decodeHex(sendMsg.get()))).onComplete
同步编程,假使客户端不回发消息,会造成堵塞,所以还是得用异步发送
要点
RecordParser.newDelimited
进行拆包,解决TCP粘包问题
package com.example.socket.tcphex; import cn.hutool.core.thread.ThreadUtil; import cn.hutool.core.util.CharsetUtil; import cn.hutool.core.util.HexUtil; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import io.vertx.core.AbstractVerticle; import io.vertx.core.Promise; import io.vertx.core.Vertx; import io.vertx.core.VertxOptions; import io.vertx.core.buffer.Buffer; import io.vertx.core.net.NetServer; import io.vertx.core.net.NetServerOptions; import io.vertx.core.net.NetSocket; import io.vertx.core.parsetools.RecordParser; import lombok.extern.slf4j.Slf4j; import java.time.LocalDateTime; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; @Slf4j public class VertxTCPHexServer1 extends AbstractVerticle { public final static Map<String, NetSocket> CLIENT_MAP = Maps.newHashMap(); public static final String host = "192.168.2.141"; public static final Integer port = 10123; public final static Map<String, Integer> PORT_MAP = Maps.newHashMap(); @Override public void start() { String sendMsg = "指令一"; String sendMsg2 = "指令二"; List<String> msgList = Lists.newArrayList(); msgList.add(sendMsg); msgList.add(sendMsg2); //配置 NetServerOptions netServerOptions = new NetServerOptions(); netServerOptions.setRegisterWriteHandler(true); netServerOptions.setTcpKeepAlive(true); //设置socket多长时间未收到消息超时(s) netServerOptions.setReadIdleTimeout(120); // 创建TCP服务器 NetServer server = vertx.createNetServer(netServerOptions); // 处理连接请求 server.connectHandler(socket -> { String handlerId = socket.writeHandlerID(); CLIENT_MAP.put(handlerId, socket); // 接收客户端发送的消息 socket.handler(buff -> { log.info("==接收串口指令 GBK:{} 客户端{}", buff.toString(CharsetUtil.CHARSET_GBK), handlerId); }); // 监听客户端的退出连接 socket.closeHandler(close -> { CLIENT_MAP.remove(socket.writeHandlerID()); log.error("客户端退出连接 "); // 取消定时器 不再向退出连接的客户端发送信息 }); }); // 监听端口 配置tcp启动信息 String key1 = String.format("%s-%s", host, port); if (!PORT_MAP.containsKey(key1)) { server.listen(port, host).onComplete(res -> { if (res.succeeded()) { log.info(port + " 服务器启动成功"); PORT_MAP.put(key1, server.actualPort()); } else { log.error("连接失败===》 {} {}", port, res.cause().getMessage()); } }); } // 定时循环向客户端发送消息 long timerId = vertx.setPeriodic(20000, res -> { log.info("循环开始====》{} {}", LocalDateTime.now(), CLIENT_MAP.size()); CLIENT_MAP.forEach((key, socket) -> { sendMsg(socket, msgList); }); }); } private void sendMsg(NetSocket socket, List<String> msgList) { Map<String, String> sendMap = Maps.newHashMap(); msgList.forEach(msg -> { // 异步的时候 发送的消息跟接收的消息会不一致 这里我们发到map里面去 用的时候从里面取 int index = msg.indexOf("CAFC"); String substring = msg.substring(index, index + 6); sendMap.put(substring.replace("FC", "BD"), msg); socket.write(Buffer.buffer(HexUtil.decodeHex(msg))).onComplete(ar1 -> { if (ar1.succeeded()) { final RecordParser parser = RecordParser.newDelimited(Buffer.buffer(HexUtil.decodeHex("EE")), receive -> { String heartStr = HexUtil.encodeHexStr(receive.getBytes(), false).toUpperCase(); int receiveIndex = heartStr.indexOf("CABD"); String receiveSubstring = heartStr.substring(receiveIndex, receiveIndex + 6); String sendOne = sendMap.get(receiveSubstring); log.info(String.format("接收指令: %s key %s 客户端数量 %s 信息%s", sendOne, socket.writeHandlerID(), CLIENT_MAP.size(), heartStr)); }); socket.handler(parser); } }); ThreadUtil.sleep(3000); }); } public static void main(String[] args) { VertxOptions vertxOptions = new VertxOptions(); vertxOptions.setBlockedThreadCheckInterval(999888777666L); vertxOptions.setMaxEventLoopExecuteTime(999888777666L); Vertx.vertx(vertxOptions).deployVerticle(new VertxTCPHexServer1()); } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。