当前位置:   article > 正文

springboot整合netty 实现聊天_springboot + netty做聊天

springboot + netty做聊天

引入Netty的Maven依赖

<dependency>
	<groupId>io.netty</groupId>
	<artifactId>netty-all</artifactId>
	<version>4.1.50.Final</version>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5

配置Netty的host和port

netty:
  host: 0.0.0.0  #0.0.0.0表示绑定任意ip
  port: 9998
  • 1
  • 2
  • 3

springboot 异步启动netty
配置线程池

#线程池配置
async:
  executor:
    thread:
      # 核心线程数
      core_pool_size: 5
      # 最大线程数
      max_pool_size: 20
      # 任务队列大小
      queue_capacity: 100
      # 线程池中线程的名称前缀
      name_prefix: async-service-
      # 缓冲队列中线程的空闲时间
      keep_alive_seconds: 100
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

线程池 ThreadPoolConfig

package com.hongyu.thread;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.TaskExecutor;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.ThreadPoolExecutor;

/**
 * 线程池配置
 *
 * @author JHY
 * @date 2023/03/09
 */
@Configuration
@EnableAsync
@Slf4j
public class ThreadPoolConfig {

    @Value("${async.executor.thread.core_pool_size}")
    private int corePoolSize;
    @Value("${async.executor.thread.max_pool_size}")
    private int maxPoolSize;
    @Value("${async.executor.thread.queue_capacity}")
    private int queueCapacity;
    @Value("${async.executor.thread.name_prefix}")
    private String namePrefix;
    @Value("${async.executor.thread.keep_alive_seconds}")
    private int keepAliveSeconds;

    @Bean
    public TaskExecutor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        /**
         *第一种:有具体的计算公式算出来的
         * 比如:线程数 = N * U * ( 1 + W/C )
         * N = cpu数量(可以理解为cpu核数,即同一时刻能并行处理线程的数量)
         * U = 目标cpu使用率(0-1区间范围内)
         * W = 等待时间
         * C = 计算时间
         * W/C = 等待时间和计算时间的比例
         *第二种:
         *  如果是计算密集型的应用则设置N+1线程数
         *  如果是I0密集性的应用则设置2N的线程数
         *  - N是cpu数量
         */
        // 设置核心线程数
        executor.setCorePoolSize(corePoolSize);
        // 设置最大线程数
        executor.setMaxPoolSize(maxPoolSize);
        // 设置队列容量
        executor.setQueueCapacity(queueCapacity);
        // 设置线程活跃时间(秒)
        executor.setKeepAliveSeconds(keepAliveSeconds);
        // 设置默认线程名称
        executor.setThreadNamePrefix(namePrefix);
        // 设置拒绝策略
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        // 等待所有任务结束后再关闭线程池
        executor.setWaitForTasksToCompleteOnShutdown(true);
        log.info("创建一个线程池 corePoolSize is [" + corePoolSize + "] maxPoolSize is [" + maxPoolSize + "] queueCapacity is [" + queueCapacity +
                "] keepAliveSeconds is [" + keepAliveSeconds + "] namePrefix is [" + namePrefix + "].");
        return executor;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69

启动类

package com.hongyu;

import com.hongyu.netty.NettyAsyncServer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ApplicationContext;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;


/**
 * 应用程序
 *
 * @author JHY
 * @date 2023/3/9
 */
@Slf4j
@EnableScheduling
@EnableAsync
@SpringBootApplication
public class App {
    public static void main(String[] args) {
        ApplicationContext ctx = SpringApplication.run(App.class, args);
        NettyAsyncServer nettyAsyncServer = ctx.getBean("NettyAsyncServer", NettyAsyncServer.class);
        nettyAsyncServer.start();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

编写NettyServer

package com.hongyu.netty;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;

import java.net.InetSocketAddress;

/**
 * Netty服务
 *
 * @author JHY
 * @date 2023/03/08
 */
@Slf4j
@Component("NettyAsyncServer")
public class NettyAsyncServer {
    @Value("${netty.host}")
    private String host = "0.0.0.0";
    @Value("${netty.port}")
    private Integer port = 9998;

    @Async
    public void start() {
        InetSocketAddress socketAddress = new InetSocketAddress(host, port);
        // 主线程池: 用于接收客户端请求连接,不做任何处理
        EventLoopGroup masterGroup = new NioEventLoopGroup();
        // 从线程池: 主线程池会把任务交给他,让其做任务
        EventLoopGroup subGroup = new NioEventLoopGroup();

        try {
            // 创建服务器启动类
            ServerBootstrap server = new ServerBootstrap();
            // 设置主从线程组
            server.group(masterGroup, subGroup)
                    // 设置双向通道
                    .channel(NioServerSocketChannel.class)
                    // 添加子处理器,用于处理从线程池的任务
                    .childHandler(new NettyServerInitializer())
                    .localAddress(socketAddress);
            // 启动服务,并且设置端口号,设置成同步方式
            ChannelFuture future = server.bind(socketAddress).sync();
            log.info("NettyServer启动成功: ws:/{}/ws", socketAddress);
            // 监听关闭的channel,设置成同步方式
            future.channel().closeFuture().sync();
        } catch (Exception e) {
            log.error("NettyServer异常:" + e.getMessage());
        } finally {
            masterGroup.shutdownGracefully();
            subGroup.shutdownGracefully();
        }
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60

NettyServerInitializer

package com.hongyu.netty;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.handler.timeout.IdleStateHandler;

/**
 * 网状服务器初始化
 *
 * @author JHY
 * @date 2023/03/08
 */
public class NettyServerInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel channel) throws Exception {
        // 通过SocketChannel获取对应的管道
        ChannelPipeline pipeline = channel.pipeline();
        // 通过管道添加Handler
        // HttpServerCodec由Netty提供的助手类,可以理解为拦截器
        // 当请求到服务器需要解码,响应到客户端需要编码
        pipeline.addLast("HttpServerCodec", new HttpServerCodec());
        // 写大数据流支持
        pipeline.addLast("ChunkedWriteHandler", new ChunkedWriteHandler());
        // 对httpMessage 聚合处理
        pipeline.addLast("HttpObjectAggregator", new HttpObjectAggregator(1024 * 64));
        // 增加心跳支持,已秒为单位
        pipeline.addLast("IdleStateHandler", new IdleStateHandler(8, 10, 12));
        // 自定义空闲状态检测
        pipeline.addLast("HeartHandler", new HeartHandler());
        // 支持ws,处理一些繁重复杂的事情,处理握手动作{close,ping.pong}
        pipeline.addLast("WebSocketServerProtocolHandler", new WebSocketServerProtocolHandler("/ws"));
        // 添加自定义助手类
        pipeline.addLast("CustomHandler", new CustomChatHandler());


    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43

心跳HeartHandler

package com.hongyu.netty;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import lombok.extern.slf4j.Slf4j;

/**
 * 心处理程序
 * 检测心跳
 *
 * @author JHY
 * @date 2023/03/08
 */
@Slf4j
public class HeartHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            // 强制类型转换
            IdleStateEvent event = (IdleStateEvent) evt;
            // 读空闲
            if (event.state() == IdleState.READER_IDLE) {
                log.info("客户端[{}]进入读空闲", ctx.channel().id().asShortText());
            } else if (event.state() == IdleState.WRITER_IDLE) {
                log.info("客户端[{}]进入写空闲", ctx.channel().id().asShortText());
            } else if (event.state() == IdleState.ALL_IDLE) {
                log.info("客户端[{}]进入全部空闲", ctx.channel().id().asShortText());
                log.info("关闭之前,users数量为:" + CustomChatHandler.users.size());
                Channel channel = ctx.channel();
                // 资源释放
                channel.close();
                log.info("关闭之后,users数量为:" + CustomChatHandler.users.size());
            }
        }
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41

自定义助手类

package com.hongyu.netty;

import com.alibaba.fastjson.JSON;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.util.concurrent.GlobalEventExecutor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;

/**
 * 自定义聊天处理程序
 *
 * @author JHY
 * @date 2023/03/08
 */
@Slf4j
public class CustomChatHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
    /**
     * 记录和管理客户端的通道
     */
    public static ChannelGroup users = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
    DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
        // 1.获取客户端发来的消息
        String text = msg.text();
        Channel channel = ctx.channel();
        String now = LocalDateTime.now().format(formatter);
        log.info("接收到[{}]的数据:{},接收时间:{}", channel.id().asShortText(), text, now);
        int textIndexOf = text.indexOf("{");
        if (textIndexOf >= 0) {
            DataContent dataContent = JSON.parseObject(text, DataContent.class);
            /**
             * 2.1 当websocket第一次open时 初始化 channel,把当前channel和userid关联起来
             * 2.2 判断消息类型,把聊天内容记录到数据库,标签消息的读取状态,未读
             * 2.3 读取消息类型 , 针对具体的消息进行读取,修改读取状态,已读
             * 2.5 心跳类型的消息
             */
            String types = dataContent.getTypes();
            if (StringUtils.isNotBlank(types)) {


                Integer meId = dataContent.getMeId();
                Integer chatId = dataContent.getChatId();
                String content = dataContent.getContent();
                Integer msgId = dataContent.getMsgId();
                String extended = dataContent.getExtended();
                if ("第一次连接".equals(types)) {
                    UserChannelRelation.put(meId, channel);
                } else if ("单聊".equals(types)) {
                    // 参考接口 一对一聊天 逻辑
                    // 保存消息进数据库 设为未读
                    // 需要使用SpringUtil来注入service
                    Channel receiverChannel = UserChannelRelation.get(chatId);
                    if (receiverChannel == null) {
                        // 离线用户
                    } else {
                        Channel findChannel = users.find(receiverChannel.id());
                        if (findChannel != null) {
                            // 用户在线
                            receiverChannel.writeAndFlush(new TextWebSocketFrame(JSON.toJSONString(dataContent)));
                        } else {
                            // 离线用户
                        }
                    }

                } else if ("群聊".equals(types)) {

                } else if ("读取".equals(types)) {
                    // 如果多条消息批处理

                } else if ("心跳".equals(types)) {
                    log.info("收到来自[{}]的心跳包,当前时间:{}", channel.id().asLongText(), now);
                    channel.writeAndFlush(new TextWebSocketFrame("{\"types\":\"心跳回复\",\"timestamp\":\"" + now + "\"}"));
                }
            }
        } else if ("PING".equals(text)) {
            channel.writeAndFlush(new TextWebSocketFrame("PONG"));
        }

//        // 将数据刷新到客户端上
//        String writeText = String.format("服务器在:%s,接收到的消息内容为:%s", LocalDateTime.now(), msg.text());
//        users.writeAndFlush(new TextWebSocketFrame(writeText));
    }

    /**
     * 处理程序添加
     *
     * @param ctx ctx
     * @throws Exception 异常
     */
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        users.add(ctx.channel());
        log.info("客户端[{}]连接成功,当前{}人在线", ctx.channel().id().asShortText(), users.size());
    }

    /**
     * 处理程序删除
     *
     * @param ctx ctx
     * @throws Exception 异常
     */
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        users.remove(ctx.channel());
        log.info("客户端[{}]断开连接,当前{}人在线", ctx.channel().id().asShortText(), users.size());
    }

    /**
     * 异常
     *
     * @param ctx ctx
     * @param e   异常
     * @throws Exception 异常
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable e) throws Exception {
        log.error("连接发生异常:{}", e.getMessage());
        // 发生异常,关闭连接,同时ChannelGroup移除
        ctx.channel().close();
        users.remove(ctx.channel());
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133

DataContent

package com.hongyu.netty;

import lombok.Data;
import lombok.ToString;

import java.io.Serializable;

/**
 * 数据内容
 *
 * @author JHY
 * @date 2023/03/08
 */
@Data
@ToString
public class DataContent implements Serializable {
    /**
     * 类型,单聊 群聊 心跳
     */
    private String types;
    /**
     * 个人ID
     */
    private Integer meId;
    /**
     * 聊天id
     */
    private Integer chatId;
    /**
     * 聊天内容
     */
    private String content;
    /**
     * 消息id
     */
    private Integer msgId;
    /**
     * 扩展字段
     */
    private String extended;
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42

UserChannelRelation

package com.hongyu.netty;

import io.netty.channel.Channel;
import lombok.Data;
import lombok.ToString;

import java.io.Serializable;
import java.util.HashMap;

/**
 * 用户渠道关系
 *
 * @author JHY
 * @date 2023/03/08
 */
@Data
@ToString
public class UserChannelRelation implements Serializable {
    private static HashMap<Integer, Channel> manage = new HashMap<>();

    public static void put(Integer meId, Channel channel) {
        manage.put(meId, channel);
    }

    public static Channel get(Integer meId) {
        return manage.get(meId);
    }

}

```

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/码创造者/article/detail/973888
推荐阅读
相关标签
  

闽ICP备14008679号