当前位置:   article > 正文

Spring boot 2基于Netty的高性能Websocket服务器(心跳模式)_springboot2 netty

springboot2 netty

1:为什么要用Netty

Netty是一款基于NIO(Nonblocking I/O,非阻塞IO)开发的网络通信框架,对比于BIO(Blocking I/O,阻塞IO),他的并发性能得到了很大提高;Netty的传输快其实也是依赖了NIO的一个特性——零拷贝。我们知道,Java的内存有堆内存、栈内存和字符串常量池等等,其中堆内存是占用内存空间最大的一块,也是Java对象存放的地方,一般我们的数据如果需要从IO读取到堆内存,中间需要经过Socket缓冲区,也就是说一个数据会被拷贝两次才能到达他的的终点,如果数据量大,就会造成不必要的资源浪费。
Netty针对这种情况,使用了NIO中的另一大特性——零拷贝,当他需要接收数据的时候,他会在堆内存之外开辟一块内存,数据就直接从IO读到了那块内存中去,在netty里面通过ByteBuf可以直接对这些数据进行直接操作,从而加快了传输速度

2:官方中文文档

官方文档

3: Netty-websocket-spring-boot-starter

这是个开源的框架。通过它,我们可以像spring-boot-starter-websocket一样使用注解进行开发,只需关注需要的事件(如OnMessage)。并且底层是使用Netty,netty-websocket-spring-boot-starter其他配置和spring-boot-starter-websocket完全一样,当需要调参的时候只需要修改配置参数即可,无需过多的关心handler的设置

4:Maven 依赖

要求 JDK 1.8
	<dependency>
		<groupId>org.yeauty</groupId>
		<artifactId>netty-websocket-spring-boot-starter</artifactId>
		<version>0.8.0</version>
	</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

5:创建MyWebSocket类

此处为官方文档给的实例

@ServerEndpoint
@Component
public class MyWebSocket {

    @OnOpen
    public void onOpen(Session session, HttpHeaders headers, ParameterMap parameterMap) throws IOException {
        System.out.println("new connection");
        
        String paramValue = parameterMap.getParameter("paramKey");
        System.out.println(paramValue);
    }

    @OnClose
    public void onClose(Session session) throws IOException {
       System.out.println("one connection closed"); 
    }

    @OnError
    public void onError(Session session, Throwable throwable) {
        throwable.printStackTrace();
    }

    @OnMessage
    public void onMessage(Session session, String message) {
        System.out.println(message);
        session.sendText("Hello Netty!");
    }

    @OnBinary
    public void onBinary(Session session, byte[] bytes) {
        for (byte b : bytes) {
            System.out.println(b);
        }
        session.sendBinary(bytes); 
    }

    @OnEvent
    public void onEvent(Session session, Object evt) {
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
            switch (idleStateEvent.state()) {
                case READER_IDLE:
                    System.out.println("read idle");
                    break;
                case WRITER_IDLE:
                    System.out.println("write idle");
                    break;
                case ALL_IDLE:
                    System.out.println("all idle");
                    break;
                default:
                    break;
            }
        }
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57

此处为本人项目部分源码

package org.springblade.websocket;

import java.io.IOException;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Date;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import org.yeauty.annotation.OnBinary;
import org.yeauty.annotation.OnClose;
import org.yeauty.annotation.OnError;
import org.yeauty.annotation.OnEvent;
import org.yeauty.annotation.OnMessage;
import org.yeauty.annotation.OnOpen;
import org.yeauty.annotation.ServerEndpoint;
import org.yeauty.pojo.ParameterMap;
import org.yeauty.pojo.Session;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;

import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.timeout.IdleStateEvent;

@ServerEndpoint(prefix = "netty-websocket")
@Component
public class MyWebSocket {
	private static final Logger logger = LoggerFactory.getLogger(MyWebSocket.class);
	//静态变量,用来记录当前在线连接数。应该把它设计成线程安全的。
    private static int onlineCount = 0;
    //定时器
    private Timer timer;
    //心跳检验
    private volatile boolean isPong;
    //线程
  	private ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
    public MyWebSocket() {
        this.timer = new Timer();
        this.isPong = true;
    }
	/**
	 * 当有新的WebSocket连接进入时,对该方法进行回调 注入参数的类型:Session、HttpHeaders、ParameterMap
	 * @param session
	 * @param headers
	 * @param parameterMap
	 * @throws IOException
	 */
	@OnOpen
    public void onOpen(Session session, HttpHeaders headers, ParameterMap parameterMap) throws IOException {
        String paramValue = parameterMap.getParameter("paramKey");
        //在线数加1
        addOnlineCount();
        //心跳连接
        timer.schedule(new TimerTask() {
            @Override
            public void run() {
                try {
                    if (isPong) {
                        //服务没有断开
                        String message = "{\"type\":1,\"data\":\"yes\"}";
                        session.sendText(message);
                        isPong = false;
                    } else {
                    	onClose(session);
                        this.cancel();
                    }

                } catch (IOException e) {
                    e.printStackTrace();
                    this.cancel();
                }
            }

        }, 0, 10 * 1000);
        logger.info("有新连接加入!当前在线人数为" + getOnlineCount());
    }
	/**
	 * 当有WebSocket连接关闭时,对该方法进行回调 注入参数的类型:Session
	 * @param session
	 * @throws IOException
	 */
    @OnClose
    public void onClose(Session session) throws IOException {
       System.out.println("one connection closed"); 
       //在线数减1
       if(onlineCount>0) {
    	   subOnlineCount();
       }
       session.close();
       logger.info("有一连接关闭!当前在线人数为" + getOnlineCount());
    }
    /**
     * 当有WebSocket抛出异常时,对该方法进行回调 注入参数的类型:Session、Throwable
     * @param session
     * @param throwable
     */
    @OnError
    public void onError(Session session, Throwable throwable) {
        throwable.printStackTrace();
        session.close();
    }
    /**
     * 当接收到字符串消息时,对该方法进行回调 注入参数的类型:Session、String
     * @param session
     * @param message {"type":13, "parameter":{"appId":3}}
     */
    @OnMessage
    public void onMessage(Session session, String message) {
    	logger.info("来自客户端" + session.channel().id() + "的消息:" + message +"   时间:"+ LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
        //接收消息
        JSONObject jsonObject = JSON.parseObject(message);
        int type = jsonObject.getInteger("type");
        //服务端主动向服务端推送信息
        if(getOnlineCount()==0) {
        	return;
        }
        //类型为1  设为心跳检测
        if(type == 1) {
        	this.isPong=true;
        }else {
        	if(session.isOpen()) {
            	executor.submit(new Runnable() {
    				@Override
    				public void run() {
    					JSONObject parameter=jsonObject.getJSONObject("parameter");
    					 session.sendText("Hello Netty!" +parameter.getString("appId"));
    				}
    			});
            	logger.info("---》websocket发送消息成功!!!ID--》" + session.channel().id());
            }
        }
    }
    /**
     * 当接收到二进制消息时,对该方法进行回调 注入参数的类型:Session、byte[]
     * @param session
     * @param bytes
     */
    @OnBinary
    public void onBinary(Session session, byte[] bytes) {
        for (byte b : bytes) {
            System.out.println(b);
        }
        session.sendBinary(bytes); 
    }
    /**
     * 当接收到Netty的事件时,对该方法进行回调 注入参数的类型:Session、Object
     * @param session
     * @param evt
     */
    @OnEvent
    public void onEvent(Session session, Object evt) {
        if (evt instanceof IdleStateEvent) {
            IdleStateEvent idleStateEvent = (IdleStateEvent) evt;
            switch (idleStateEvent.state()) {
                case READER_IDLE:
                    System.out.println("read idle");
                    break;
                case WRITER_IDLE:
                    System.out.println("write idle");
                    break;
                case ALL_IDLE:
                    System.out.println("all idle");
                    break;
                default:
                    break;
            }
        }
    }
    private static synchronized int getOnlineCount() {
        return onlineCount;
    }

    private static synchronized void addOnlineCount() {
    	MyWebSocket.onlineCount++;
    }

    private static synchronized void subOnlineCount() {
    	MyWebSocket.onlineCount--;
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161
  • 162
  • 163
  • 164
  • 165
  • 166
  • 167
  • 168
  • 169
  • 170
  • 171
  • 172
  • 173
  • 174
  • 175
  • 176
  • 177
  • 178
  • 179
  • 180
  • 181
  • 182
  • 183
  • 184
  • 185
  • 186
  • 187

打开WebSocket客户端,连接到ws://127.0.0.1:80

此处IP地址为默认的,在实际的开发业务中需要改变,所以可以根据官方文档给的实例进行配置

@ServerEndpoint(prefix = "netty-websocket")
@Component
public class MyWebSocket {
	---------
}
  • 1
  • 2
  • 3
  • 4
  • 5

配置文件配置application.properties/application.yml

netty-websocket.host=0.0.0.0
netty-websocket.path=/
netty-websocket.port=80
  • 1
  • 2
  • 3

其他的还有很多配置项 ,可以按照实际业务需求自己加上就行

效果如图
在这里插入图片描述在这里插入图片描述

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/凡人多烦事01/article/detail/242151
推荐阅读
相关标签
  

闽ICP备14008679号