赞
踩
- package com.idc.config.netty;
-
- import io.netty.bootstrap.ServerBootstrap;
- import io.netty.channel.ChannelFuture;
- import io.netty.channel.ChannelOption;
- import io.netty.channel.EventLoopGroup;
- import io.netty.channel.nio.NioEventLoopGroup;
- import io.netty.channel.socket.nio.NioServerSocketChannel;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.stereotype.Component;
-
- import java.net.InetSocketAddress;
-
- /**
- * @description: netty服务启动类
- **/
- @Slf4j
- @Component
- public class NettyServer {
-
- public void start(InetSocketAddress address) {
- //配置服务端的NIO线程组
- EventLoopGroup bossGroup = new NioEventLoopGroup(1);
- EventLoopGroup workerGroup = new NioEventLoopGroup();
-
- try {
- ServerBootstrap bootstrap = new ServerBootstrap()
- .group(bossGroup, workerGroup) // 绑定线程池
- .channel(NioServerSocketChannel.class)
- .localAddress(address)
- .childHandler(new NettyServerChannelInitializer())//编码解码
- .option(ChannelOption.SO_BACKLOG, 128); //服务端接受连接的队列长度,如果队列已满,客户端连接将被拒绝
- // .childOption(ChannelOption.SO_KEEPALIVE, true); //保持长连接,2小时无数据激活心跳机制
-
- // 绑定端口,开始接收进来的连接
- ChannelFuture future = bootstrap.bind(address).sync();
- log.info("ODF-Socket------netty服务器开始监听端口:" + address.getPort());
- //关闭channel和块,直到它被关闭
- future.channel().closeFuture().sync();
- } catch (Exception e) {
- e.printStackTrace();
- bossGroup.shutdownGracefully();
- workerGroup.shutdownGracefully();
- }
- }
-
- }
- package com.idc.config.netty;
-
- import com.idc.config.udpsocket.UdpServerHandler;
- import io.netty.channel.ChannelInitializer;
- import io.netty.channel.socket.SocketChannel;
- import io.netty.handler.codec.string.StringDecoder;
- import io.netty.handler.codec.string.StringEncoder;
- import io.netty.util.CharsetUtil;
-
- /**
- * @description: 服务端初始化,客户端与服务器端连接一旦创建,这个类中方法就会被回调,设置出站编码器和入站解码器
- **/
-
- public class NettyServerChannelInitializer extends ChannelInitializer<SocketChannel> {
-
- @Override
- protected void initChannel(SocketChannel channel) throws Exception {
-
- channel.pipeline().addLast("decoder",new StringDecoder(CharsetUtil.UTF_8));
- channel.pipeline().addLast("encoder",new StringEncoder(CharsetUtil.UTF_8));
- channel.pipeline().addLast(new NettyServerHandler());
- }
- }
- package com.idc.config.netty;
-
- import com.idc.common.exception.CommonRuntimeException;
- import com.idc.entity.odf.dto.LightingStatus;
- import com.idc.mapper.OdfAlarmMapper;
- import com.idc.mapper.OdfMapper;
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.channel.ChannelId;
- import io.netty.channel.ChannelInboundHandlerAdapter;
- import io.netty.handler.timeout.IdleState;
- import io.netty.handler.timeout.IdleStateEvent;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.stereotype.Component;
-
- import javax.annotation.Resource;
- import java.net.InetSocketAddress;
- import java.util.concurrent.ConcurrentHashMap;
-
- /**
- * @author wcybaonier
- * @description: netty服务端处理类
- **/
- @Slf4j
- @Component
- public class NettyServerHandler extends ChannelInboundHandlerAdapter {
-
- @Resource
- private OdfMapper odfMapper;
-
- /**
- * 管理一个全局map,保存连接进服务端的通道数量
- */
- private static final ConcurrentHashMap<ChannelId, ChannelHandlerContext> CHANNEL_MAP = new ConcurrentHashMap<>();
-
-
-
- /**
- * @param ctx
- * @DESCRIPTION: 有客户端连接服务器会触发此函数
- * @return: void
- */
- @Override
- public void channelActive(ChannelHandlerContext ctx) {
-
- InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress();
-
- String clientIp = insocket.getAddress().getHostAddress();
- int clientPort = insocket.getPort();
-
- //获取连接通道唯一标识
- ChannelId channelId = ctx.channel().id();
-
- System.out.println();
- //如果map中不包含此连接,就保存连接
- if (CHANNEL_MAP.containsKey(channelId)) {
- log.info("ODF-Socket------客户端【" + channelId + "】是连接状态,连接通道数量: " + CHANNEL_MAP.size());
- } else {
- //保存连接
- CHANNEL_MAP.put(channelId, ctx);
-
- log.info("ODF-Socket------客户端【" + channelId + "】连接netty服务器[IP:" + clientIp + "--->PORT:" + clientPort + "]");
- log.info("ODF-Socket------连接通道数量: " + CHANNEL_MAP.size());
- }
- }
-
- /**
- * @param ctx
- * @DESCRIPTION: 有客户端终止连接服务器会触发此函数
- * @return: void
- */
- @Override
- public void channelInactive(ChannelHandlerContext ctx) {
-
- InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress();
-
- String clientIp = insocket.getAddress().getHostAddress();
-
- ChannelId channelId = ctx.channel().id();
-
- //包含此客户端才去删除
- if (CHANNEL_MAP.containsKey(channelId)) {
- //删除连接
- CHANNEL_MAP.remove(channelId);
- System.out.println();
- log.info("ODF-Socket------客户端【" + channelId + "】退出netty服务器[IP:" + clientIp + "--->PORT:" + insocket.getPort() + "]");
- log.info("ODF-Socket------连接通道数量: " + CHANNEL_MAP.size());
- }
- }
-
- /**
- * @param ctx
- * @DESCRIPTION: 有客户端发消息会触发此函数
- * @return: void
- */
- @Override
- public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
- if (msg == null){
- throw new CommonRuntimeException("ODF-Socket------加载客户端报文为空,请联系厂商!");
- }
- log.info("ODF-Socket------加载客户端报文......【" + ctx.channel().id() + "】" + " :" + msg);
-
-
- /**
- * 下面可以解析数据,保存数据,生成返回报文,将需要返回报文写入write函数
- * 在这里可以设置异步执行 提交任务到该channel的taskQueue 中
- */
- ctx.channel().eventLoop().execute(() -> {
- String msgStr = String.valueOf(msg);
- // 如果不包含逗号, 那么格式不对 约定格式为 : 序列号,deviceid,shelfNo,moduleNo,termNo,state
- if (!msgStr.contains(",")){
- throw new CommonRuntimeException("ODF-Socket------加载客户端报文格式不正确,请联系厂商!");
- }
- try {
- String[] split = msgStr.split(",");
- if (split.length != 6){
- throw new CommonRuntimeException("ODF-Socket------加载客户端报文长度不正确,请联系厂商!");
- }
- //开始修改 admin
- LightingStatus lightingStatus = new LightingStatus();
- lightingStatus.setSerialNumber(split[0]);
- lightingStatus.setDeviceId(split[1]);
- lightingStatus.setShelfNo(split[2]);
- lightingStatus.setModuleNo(split[3]);
- lightingStatus.setTermNo(split[4]);
- lightingStatus.setState(split[5]);
- int i = odfMapper.updateTermStatus(lightingStatus);
- log.info("ODF-Socket------亮灯状态更新条数......【" + i + "】" );
- } catch (Exception e) {
- e.printStackTrace();
- }
- });
-
- /**
- * 可以设置多个异步任务
- * 但是这个会在上面异步任务执行完之后才执行
- */
- /*ctx.channel().eventLoop().execute(new Runnable() {
- @Override
- public void run() {
- try {
- Thread.sleep(10*1000);
- log.info(">>>>>>>>>休眠二十秒");
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- });*/
-
- //响应客户端
- log.info("ODF-Socket------服务端端返回报文......【" + ctx.channel().id() + "】" + " :" + msg);
- this.channelWrite(ctx.channel().id(), msg);
- }
-
- /**
- * @param msg 需要发送的消息内容
- * @param channelId 连接通道唯一id
- * @DESCRIPTION: 服务端给客户端发送消息
- * @return: void
- */
- public void channelWrite(ChannelId channelId, Object msg) throws Exception {
-
- ChannelHandlerContext ctx = CHANNEL_MAP.get(channelId);
-
- if (ctx == null) {
- log.info("ODF-Socket------通道【" + channelId + "】不存在");
- return;
- }
-
- if (msg == null || msg == "") {
- log.info("ODF-Socket------服务端响应空的消息");
- return;
- }
-
- //将客户端的信息直接返回写入ctx
- ctx.write(msg);
- //刷新缓存区
- ctx.flush();
- }
-
- public static void main(String[] args) {
- System.out.println("序列号,deviceid,shelfNo,moduleNo,termNo,state".split(",").length);
- }
-
- @Override
- public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
-
- String socketString = ctx.channel().remoteAddress().toString();
-
- if (evt instanceof IdleStateEvent) {
- IdleStateEvent event = (IdleStateEvent) evt;
- if (event.state() == IdleState.READER_IDLE) {
- log.info("ODF-Socket------Client: " + socketString + " READER_IDLE 读超时");
- ctx.disconnect();
- } else if (event.state() == IdleState.WRITER_IDLE) {
- log.info("ODF-Socket------Client: " + socketString + " WRITER_IDLE 写超时");
- ctx.disconnect();
- } else if (event.state() == IdleState.ALL_IDLE) {
- log.info("ODF-Socket------Client: " + socketString + " ALL_IDLE 总超时");
- ctx.disconnect();
- }
- }
- }
-
- /**
- * @param ctx
- * @DESCRIPTION: 发生异常会触发此函数
- * @return: void
- */
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
-
- System.out.println();
- ctx.close();
- log.info("ODF-Socket------"+ctx.channel().id() + " 发生了错误,此连接被关闭" + "此时连通数量: " + CHANNEL_MAP.size());
- //cause.printStackTrace();
- }
- }
- package com.idc;
-
- import com.idc.config.udpsocket.UdpServer;
- import lombok.extern.slf4j.Slf4j;
- import org.mybatis.spring.annotation.MapperScan;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.boot.CommandLineRunner;
- import org.springframework.boot.SpringApplication;
- import org.springframework.boot.autoconfigure.SpringBootApplication;
- import org.springframework.scheduling.annotation.EnableScheduling;
-
- import javax.annotation.Resource;
-
- /**
- * @author wcybaonier
- */
- @MapperScan("com.idc.mapper")
- @SpringBootApplication
- @Slf4j
- @EnableScheduling
- public class IdcPduApplication implements CommandLineRunner {
-
- @Value("${netty.host}")
- private String host;
-
- @Value("${netty.port}")
- private Integer port;
-
- @Resource
- private NettyServer nettyServer;
-
- public static void main(String[] args) {
- SpringApplication.run(IdcPduApplication.class, args);
- log.info("IdcPduApplication 启动成功!");
- }
-
- /**
- * netty服务启动
- * @param args
- * @throws Exception
- */
- @Override
- public void run(String... args) throws Exception {
- //tcp实现
- InetSocketAddress address = new InetSocketAddress(host,port);
- log.info("neety服务器启动地址: "+host+":"+ port);
- nettyServer.start(address);
- }
- }
yml配置:
- # 配置Netty通信IP和端口
- netty:
- port: 7101
- host: 127.0.0.1
完成,启动项目即可自动监听对应端口
这里为了测试,写了Main方法,可以参考服务端,配置启动类 ,实现跟随项目启动
- package com.ws.aa;
-
- import io.netty.bootstrap.Bootstrap;
- import io.netty.channel.*;
- import io.netty.channel.nio.NioEventLoopGroup;
- import io.netty.channel.socket.SocketChannel;
- import io.netty.channel.socket.nio.NioSocketChannel;
- import io.netty.handler.codec.string.StringDecoder;
- import io.netty.handler.codec.string.StringEncoder;
- import io.netty.util.CharsetUtil;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
-
- /**
- * Springboot整合Netty,实现Socket信息交互(本项目用于与C语言程序进行交互)
- *
- * 核心文件(与服务端进行数据交互)
- *
- * 客户端
- *
- * @author 小辰哥哥
- */
- public class SocketClient {
- // 服务端IP
- static final String HOST = System.getProperty("host", "134.95.3.134");
-
- // 服务端开放端口
- static final int PORT = Integer.parseInt(System.getProperty("port", "7101"));
-
- // 数据包大小
- static final int SIZE = Integer.parseInt(System.getProperty("size", "256"));
-
- // 日志打印
- private static final Logger LOGGER = LoggerFactory.getLogger(SocketClient.class);
-
- // 主函数启动
- public static void main(String[] args) throws InterruptedException {
- sendMessage("1,deviceid123,shelfNo123,moduleNo123,termNo123,state123");
- }
-
- /**
- * 核心方法(处理:服务端向客户端发送的数据、客户端向服务端发送的数据)
- *
- * @param content
- * @throws InterruptedException
- * @author 小辰哥哥
- */
- public static void sendMessage(String content) throws InterruptedException {
- // Configure the client.
- EventLoopGroup group = new NioEventLoopGroup();
- try {
- Bootstrap b = new Bootstrap();
- b.group(group)
- .channel(NioSocketChannel.class)
- .option(ChannelOption.TCP_NODELAY, true)
- .handler(new ChannelInitializer<SocketChannel>() {
- @Override
- public void initChannel(SocketChannel ch) throws Exception {
- ChannelPipeline p = ch.pipeline();
- p.addLast("decoder", new StringDecoder(CharsetUtil.UTF_8));
- p.addLast("encoder", new StringEncoder(CharsetUtil.UTF_8));
- p.addLast(new SocketHandler() {
- @Override
- public void channelRead(ChannelHandlerContext ctx, Object msg) {
- LOGGER.debug("####接收服务端发送过来的消息####");
- LOGGER.debug("服务端发送过来的数据:" + msg);
-
- // 主动与服务端断开连接(客户端触发)
- //ctx.channel().close();
- }
- });
- }
- });
-
- ChannelFuture future = b.connect(HOST, PORT).sync();
- future.channel().writeAndFlush(content);
-
- // 程序阻塞
- future.channel().closeFuture().sync();
- } finally {
- group.shutdownGracefully();
- }
- }
-
- }
- package com.ws.aa;
-
- import io.netty.channel.ChannelHandler;
- import io.netty.channel.ChannelInitializer;
- import io.netty.channel.ChannelPipeline;
- import io.netty.channel.socket.SocketChannel;
- import io.netty.handler.codec.string.StringDecoder;
- import io.netty.handler.codec.string.StringEncoder;
- import io.netty.util.CharsetUtil;
-
- import java.util.logging.SocketHandler;
-
- /**
- * Springboot整合Netty,实现Socket信息交互(本项目用于与C语言程序进行交互)
- *
- * 设置出站和入站的编码器和解码器(该方法在SocketClientConfig.java中被重写)
- *
- * 客户端
- *
- * @author wcybaonier
- */
-
- public class SocketChannelInitializer extends ChannelInitializer<SocketChannel> {
- protected void initChannel(SocketChannel channel) throws Exception {
- ChannelPipeline p = channel.pipeline();
- p.addLast("decoder", new StringDecoder(CharsetUtil.UTF_8));
- p.addLast("encoder", new StringEncoder(CharsetUtil.UTF_8));
- p.addLast((ChannelHandler) new SocketHandler());
- }
- }
- package com.ws.aa;
-
- import io.netty.channel.ChannelHandlerContext;
- import io.netty.channel.ChannelInboundHandlerAdapter;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
-
- /**
- * Springboot整合Netty,实现Socket信息交互(本项目用于与C语言程序进行交互)
- *
- * 初始化操作、接受服务端发送过来的消息(该方法在SocketClient.java中被重写)
- *
- * 客户端
- *
- * @author wcybaonier
- */
-
- public class SocketHandler extends ChannelInboundHandlerAdapter {
-
- // 日志打印
- private static final Logger LOGGER = LoggerFactory.getLogger(SocketHandler.class);
-
- @Override
- public void channelActive(ChannelHandlerContext ctx) {
- LOGGER.debug("SocketHandler Active(客户端)");
- }
-
- @Override
- public void channelRead(ChannelHandlerContext ctx, Object msg) {
- LOGGER.debug("####接收服务端发送过来的消息####");
- LOGGER.debug("SocketHandler read Message:" + msg);
- }
-
- @Override
- public void channelInactive(ChannelHandlerContext ctx) throws Exception {
- LOGGER.debug("####客户端断开连接####");
- }
-
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
- cause.printStackTrace();
- ctx.close();
- }
- }
......想写就参考服务端......
有测试的,,,但是忘记截图了................
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。