当前位置:   article > 正文

SpringBoot实现STOMP协议下的WebSocket_springboot websocket stomp

springboot websocket stomp

WebSocket简介

WebSocket 是 HTML5 开始提供的一种在单个 TCP 连接上进行全双工通讯的协议。

WebSocket 使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据。在 WebSocket API 中,浏览器和服务器只需要完成一次握手,两者之间就直接可以创建持久性的连接,并进行双向数据传输。

在 WebSocket API 中,浏览器和服务器只需要做一个握手的动作,然后,浏览器和服务器之间就形成了一条快速通道。两者之间就直接可以数据互相传送。

实现WebSocket的三种方式

  • 使用Tomcat的WebSocket实现,需要tomcat 7.x,JEE7的支持,无需别的任何配置,只需服务端一个处理类,可以看这篇spring boot中使用websocket实现点对点通信与服务器推送
  • 使用Spring的WebSocket实现,需要spring 4.x,并且使用了socketjs,对不支持websocket的浏览器可以模拟websocket使用,本文主要介绍这种方式。
  • 使用Netty的WebSocket实现,高性能、高可靠性.

STOMP简介

直接使用WebSocket就很类似于使用TCP套接字来编写Web应用。因为没有高层级的线路协议,因此就需要我们定义应用之间所发送消息的语义,还需要确保连接的两端都能遵循这些语义。

就像HTTP在TCP套接字之上添加了请求-响应模型层一样,STOMP在WebSocket之上提供了一个基于帧的线路格式(frame-based wire format)层,用来定义消息的语义。与HTTP请求和响应类似,STOMP帧由命令、一个或多个头信息以及负载所组成。例如,如下就是发送数据的一个STOMP帧:

>>> SEND
transaction:tx-0
destination:/app/marco
content-length:20

{"message":"Marco!"}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  1. SEND:STOMP命令,表明会发送一些内容;
  2. destination:头信息,用来表示消息发送到哪里;
  3. content-length:头信息,用来表示 负载内容的 大小;
  4. 空行:
  5. 帧内容(负载)内容

上去就是干

环境配置

JDK1.8, Intellij IDEA, Spring Boot 2.1.3, Maven 3.5

引入依赖

 <!--websocket-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-websocket</artifactId>
        </dependency>
        <!--jackson,由于SockJs与Spring WebSocket之间采用JSON通讯,需要引入jackson的相关jar包-->
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
        </dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

新建WebSocket配置类

在Spring中启用STOMP通讯不用我们自己去写原生态的帧,spring的消息功能是基于代理模式构建,其实说得复杂,都是封装好了的,如果需要开启SOMP,只需要在websocket配置类上使用**@EnableWebSocketMessageBroker** (注解的作用为能够在 WebSocket 上启用 STOMP),并实现WebSocketMessageBrokerConfigurer接口

有些教程在这一步会继承AbstractWebSocketMessageBrokerConfigurer 类,我们看一下AbstractWebSocketMessageBrokerConfigurer类的源码,可以看到都是空方法,也是实现的接口,这里推荐自己实现接口,因为官方API上AbstractWebSocketMessageBrokerConfigurer已经标记为废弃.

/**
 * A convenient abstract base class for {@link WebSocketMessageBrokerConfigurer}
 * implementations providing empty method implementations for optional methods.
 *
 * @author Rossen Stoyanchev
 * @since 4.0.1
 * @deprecated as of 5.0 in favor of simply using {@link WebSocketMessageBrokerConfigurer}
 * which has default methods, made possible by a Java 8 baseline.
 */
@Deprecated
public abstract class AbstractWebSocketMessageBrokerConfigurer implements WebSocketMessageBrokerConfigurer {

	@Override
	public void configureWebSocketTransport(WebSocketTransportRegistration registration) {
	}

	@Override
	public void configureClientInboundChannel(ChannelRegistration registration) {
	}

	@Override
	public void configureClientOutboundChannel(ChannelRegistration registration) {
	}

	@Override
	public boolean configureMessageConverters(List<MessageConverter> messageConverters) {
		return true;
	}

	@Override
	public void addArgumentResolvers(List<HandlerMethodArgumentResolver> argumentResolvers) {
	}

	@Override
	public void addReturnValueHandlers(List<HandlerMethodReturnValueHandler> returnValueHandlers) {
	}

	@Override
	public void configureMessageBroker(MessageBrokerRegistry registry) {
	}

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42

在来看一下WebSocketMessageBrokerConfigurer的主要方法

public interface WebSocketMessageBrokerConfigurer {

    // 添加这个Endpoint,这样在网页中就可以通过websocket连接上服务,也就是我们配置websocket的服务地址,并且可以指定是否使用socketjs
    void registerStompEndpoints(StompEndpointRegistry var1);

    // 配置发送与接收的消息参数,可以指定消息字节大小,缓存大小,发送超时时间
    void configureWebSocketTransport(WebSocketTransportRegistration var1);

    // 设置输入消息通道的线程数,默认线程为1,可以自己自定义线程数,最大线程数,线程存活时间
    void configureClientInboundChannel(ChannelRegistration var1);
    
    // 设置输出消息通道的线程数,默认线程为1,可以自己自定义线程数,最大线程数,线程存活时间
    void configureClientOutboundChannel(ChannelRegistration var1);

    // 添加自定义的消息转换器,spring 提供多种默认的消息转换器,返回false,不会添加消息转换器,返回true,会添加默认的消息转换器,当然也可以把自己写的消息转换器添加到转换链中
    boolean configureMessageConverters(List<MessageConverter> var1);

    // 配置消息代理,哪种路径的消息会进行代理处理
    void configureMessageBroker(MessageBrokerRegistry var1);
    
    // 自定义控制器方法的参数类型,有兴趣可以百度google HandlerMethodArgumentResolver这个的用法
    void addArgumentResolvers(List<HandlerMethodArgumentResolver> var1);

    // 自定义控制器方法返回值类型,有兴趣可以百度google HandlerMethodReturnValueHandler这个的用法
    void addReturnValueHandlers(List<HandlerMethodReturnValueHandler> var1);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26

完整的配置类

import com.b505.interceptor.WsHandshakeInterceptor;
import com.b505.interceptor.WsChannelInterceptor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolver;
import org.springframework.messaging.handler.invocation.HandlerMethodReturnValueHandler;
import org.springframework.messaging.simp.config.ChannelRegistration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketTransportRegistration;

import java.util.List;
/**
 * <配置基于STOMP的websocket>
 **/
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketStompConfig implements WebSocketMessageBrokerConfigurer {

    /**
     * 添加这个Endpoint,这样在网页中就可以通过websocket连接上服务,
     * 也就是我们配置websocket的服务地址,并且可以指定是否使用socketjs
     * @param registry
     */
    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry)
    {
        
        /*
         * 1. 将 /serviceName/stomp/websocketJs路径注册为STOMP的端点,
         *    用户连接了这个端点后就可以进行websocket通讯,支持socketJs
         * 2. setAllowedOrigins("*")表示可以跨域
         * 3. withSockJS()表示支持socktJS访问
         * 4. addInterceptors 添加自定义拦截器,这个拦截器是上一个demo自己定义的获取httpsession的拦截器
         * 5. addInterceptors 添加拦截处理,这里MyPrincipalHandshakeHandler 封装的认证用户信息
         */
        registry.addEndpoint("/stomp/websocketJS")
                .setAllowedOrigins("*")
                .addInterceptors(new WebSocketHandshakeInterceptor())
                .setHandshakeHandler(new MyPrincipalHandshakeHandler())
                .withSockJS()

        ;

        /*
         * 看了下源码,它的实现类是WebMvcStompEndpointRegistry ,
         * addEndpoint是添加到WebMvcStompWebSocketEndpointRegistration的集合中,
         * 所以可以添加多个端点
         */
        registry.addEndpoint("/stomp/websocket");
    }

    /**
     * 配置消息代理
     * @param registry
     */
    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry)
    {
        /*
         *  enableStompBrokerRelay 配置外部的STOMP服务,需要安装额外的支持 比如rabbitmq或activemq
         * 1. 配置代理域,可以配置多个,此段代码配置代理目的地的前缀为 /topicTest 或者 /userTest
         *    我们就可以在配置的域上向客户端推送消息
         * 3. 可以通过 setRelayHost 配置代理监听的host,默认为localhost
         * 4. 可以通过 setRelayPort 配置代理监听的端口,默认为61613
         * 5. 可以通过 setClientLogin 和 setClientPasscode 配置账号和密码
         * 6. setxxx这种设置方法是可选的,根据业务需要自行配置,也可以使用默认配置
         */
        //registry.enableStompBrokerRelay("/topicTest","/userTest")
                //.setRelayHost("rabbit.someotherserver")
                //.setRelayPort(62623);
                //.setClientLogin("userName")
                //.setClientPasscode("password")
                //;

        // 自定义调度器,用于控制心跳线程
        ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
        // 线程池线程数,心跳连接开线程
        taskScheduler.setPoolSize(1);
        // 线程名前缀
        taskScheduler.setThreadNamePrefix("websocket-heartbeat-thread-");
        // 初始化
        taskScheduler.initialize();

        /*
         * spring 内置broker对象
         * 1. 配置代理域,可以配置多个,此段代码配置代理目的地的前缀为 /topicTest 或者 /userTest
         *    我们就可以在配置的域上向客户端推送消息
         * 2,进行心跳设置,第一值表示server最小能保证发的心跳间隔毫秒数, 第二个值代码server希望client发的心跳间隔毫秒数
         * 3. 可以配置心跳线程调度器 setHeartbeatValue这个不能单独设置,不然不起作用,要配合setTaskScheduler才可以生效
         *    调度器我们可以自己写一个,也可以自己使用默认的调度器 new DefaultManagedTaskScheduler()
         */
        registry.enableSimpleBroker("/topicTest","/userTest")
                .setHeartbeatValue(new long[]{10000,10000})
                .setTaskScheduler(taskScheduler);
        /*
         *  "/app" 为配置应用服务器的地址前缀,表示所有以/app 开头的客户端消息或请求
         *  都会路由到带有@MessageMapping 注解的方法中
         */
        registry.setApplicationDestinationPrefixes("/app");

        /*
         *  1. 配置一对一消息前缀, 客户端接收一对一消息需要配置的前缀 如“'/user/'+userid + '/message'”,
         *     是客户端订阅一对一消息的地址 stompClient.subscribe js方法调用的地址
         *  2. 使用@SendToUser发送私信的规则不是这个参数设定,在框架内部是用UserDestinationMessageHandler处理,
         *     而不是而不是 AnnotationMethodMessageHandler 或  SimpleBrokerMessageHandler
         *     or StompBrokerRelayMessageHandler,是在@SendToUser的URL前加“user+sessionId"组成
         */
        registry.setUserDestinationPrefix("/user");

        /*
         * 自定义路径分割符
         * 注释掉的这段代码添加的分割符为. 分割是类级别的@messageMapping和方法级别的@messageMapping的路径
         * 例如类注解路径为 “topic”,方法注解路径为“hello”,那么客户端JS stompClient.send 方法调用的路径为“/app/topic.hello”
         * 注释掉此段代码后,类注解路径“/topic”,方法注解路径“/hello”,JS调用的路径为“/app/topic/hello”
         */
        //registry.setPathMatcher(new AntPathMatcher("."));

    }

    /**
     * 配置发送与接收的消息参数,可以指定消息字节大小,缓存大小,发送超时时间
     * @param registration
     */
    @Override
    public void configureWebSocketTransport(WebSocketTransportRegistration registration) {
        /*
         * 1. setMessageSizeLimit 设置消息缓存的字节数大小 字节
         * 2. setSendBufferSizeLimit 设置websocket会话时,缓存的大小 字节
         * 3. setSendTimeLimit 设置消息发送会话超时时间,毫秒
         */
        registration.setMessageSizeLimit(10240)
                    .setSendBufferSizeLimit(10240)
                    .setSendTimeLimit(10000);
    }

    /**
     * 设置输入消息通道的线程数,默认线程为1,可以自己自定义线程数,最大线程数,线程存活时间
     * @param registration
     */
    @Override
    public void configureClientInboundChannel(ChannelRegistration registration) {

        /*
         * 配置消息线程池
         * 1. corePoolSize 配置核心线程池,当线程数小于此配置时,不管线程中有无空闲的线程,都会产生新线程处理任务
         * 2. maxPoolSize 配置线程池最大数,当线程池数等于此配置时,不会产生新线程
         * 3. keepAliveSeconds 线程池维护线程所允许的空闲时间,单位秒
         */
        registration.taskExecutor().corePoolSize(10)
                    .maxPoolSize(20)
                    .keepAliveSeconds(60);
        /*
         * 添加stomp自定义拦截器,可以根据业务做一些处理
         * springframework 4.3.12 之后版本此方法废弃,代替方法 interceptors(ChannelInterceptor... interceptors)
         * 消息拦截器,实现ChannelInterceptor接口
         */
        registration.setInterceptors(webSocketChannelInterceptor());
    }

    /**
     *设置输出消息通道的线程数,默认线程为1,可以自己自定义线程数,最大线程数,线程存活时间
     * @param registration
     */
    @Override
    public void configureClientOutboundChannel(ChannelRegistration registration) {
        registration.taskExecutor().corePoolSize(10)
                    .maxPoolSize(20)
                    .keepAliveSeconds(60);
        //registration.setInterceptors(new WebSocketChannelInterceptor());
    }

    /**
     * 添加自定义的消息转换器,spring 提供多种默认的消息转换器,
     * 返回false,不会添加消息转换器,返回true,会添加默认的消息转换器,当然也可以把自己写的消息转换器添加到转换链中
     * @param list
     * @return
     */
    @Override
    public boolean configureMessageConverters(List<MessageConverter> list) {
        return true;
    }

    /**
     * 自定义控制器方法的参数类型,有兴趣可以百度google HandlerMethodArgumentResolver这个的用法
     * @param list
     */
    @Override
    public void addArgumentResolvers(List<HandlerMethodArgumentResolver> list) {

    }

    /**
     * 自定义控制器方法返回值类型,有兴趣可以百度google HandlerMethodReturnValueHandler这个的用法
     * @param list
     */
    @Override
    public void addReturnValueHandlers(List<HandlerMethodReturnValueHandler> list) {

    }

    /**
     * 拦截器加入 spring ioc容器
     * @return
     */
    @Bean
    public WebSocketChannelInterceptor webSocketChannelInterceptor()
    {
        return new WebSocketChannelInterceptor();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161
  • 162
  • 163
  • 164
  • 165
  • 166
  • 167
  • 168
  • 169
  • 170
  • 171
  • 172
  • 173
  • 174
  • 175
  • 176
  • 177
  • 178
  • 179
  • 180
  • 181
  • 182
  • 183
  • 184
  • 185
  • 186
  • 187
  • 188
  • 189
  • 190
  • 191
  • 192
  • 193
  • 194
  • 195
  • 196
  • 197
  • 198
  • 199
  • 200
  • 201
  • 202
  • 203
  • 204
  • 205
  • 206
  • 207
  • 208
  • 209
  • 210
  • 211
  • 212
  • 213
  • 214
  • 215

新建保存用户认证信息的实体类MyPrincipalHandshakeHandler

上面的代码可以看到,我们在STOMP端点上注册了一个保存用户认证信息的实体类MyPrincipalHandshakeHandler和一个握手拦截器WebSocketHandshakeInterceptor

用户认证信息类MyPrincipalHandshakeHandler需要实现Principal接口

import java.security.Principal;

/**
 * <websocket登录连接对象>
 * <用于保存websocket连接过程中需要存储的业务参数>
 * @author 郑智聪
 * @version 2018-06-11
 **/
public class WebSocketUserAuthentication implements Principal {

    /**
     * 用户身份标识符
     */
    private String token;

    public WebSocketUserAuthentication(String token) {
        this.token = token;
    }

    /**
     * 获取用户登录令牌
     * @return
     */
    @Override
    public String getName() {
        return token;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

握手拦截器WebSocketHandshakeInterceptor

握手拦截器WebSocketHandshakeInterceptor需要继承HttpSessionHandshakeInterceptor,它是HandshakeInterceptor接口的一个实现,并重写beforeHandshake方法,实现自己的握手拦截的逻辑。

import org.apache.commons.lang.StringUtils;
import org.apache.log4j.Logger;
import org.springframework.http.server.ServerHttpRequest;
import org.springframework.http.server.ServletServerHttpRequest;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.server.support.DefaultHandshakeHandler;

import javax.servlet.http.HttpSession;
import java.security.Principal;
import java.util.Map;

/**
 * <设置认证用户信息的握手拦截器>
 **/
public class MyPrincipalHandshakeHandler extends DefaultHandshakeHandler{

    private static final Logger log = Logger.getLogger(MyPrincipalHandshakeHandler.class);

     @Override
    public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) {
	  //这里就是简单 
      HttpSession session = getSession(request);
		if (session != null) {
			if (isCopyHttpSessionId()) {
				attributes.put(HTTP_SESSION_ID_ATTR_NAME, session.getId());
			}
			Enumeration<String> names = session.getAttributeNames();
			while (names.hasMoreElements()) {
				String name = names.nextElement();
				if (isCopyAllAttributes() || getAttributeNames().contains(name)) {
					attributes.put(name, session.getAttribute(name));
				}
			}
		}
		return true;
    }

    private HttpSession getSession(ServerHttpRequest request) {
        if (request instanceof ServletServerHttpRequest) {
            ServletServerHttpRequest serverRequest = (ServletServerHttpRequest) request;
            return serverRequest.getServletRequest().getSession(false);
        }
        return null;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45

消息通道拦截器

除此之外,如果需要添加监听,我们的监听类需要实现ChannelInterceptor接口,在 springframework包5.0.7之前这一步我们一般是实现ChannelInterceptorAdapter 抽象类,不过这个类已经废弃了,文档也推荐直接实现接口。

在ChannelInterceptor接口中的preSend能在消息发送前做一些处理,例如可以获取到用户登录的唯一token令牌,这里的令牌是我们业务传递给客户端的,例如用户在登录成功后我们后台生成的一个标识符,客户端在和服务端建立websocket连接的时候,我们可以从消息头中获取到这种业务参数,并做一系列后续处理。

import org.apache.log4j.Logger;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.simp.stomp.StompCommand;
import org.springframework.messaging.simp.stomp.StompHeaderAccessor;
import org.springframework.messaging.support.ChannelInterceptor;
import org.springframework.messaging.support.MessageHeaderAccessor;

import javax.servlet.http.HttpSession;

import static org.springframework.messaging.simp.stomp.StompCommand.CONNECT;

/**
 * <websocke消息监听,用于监听websocket用户连接情况>
 * <功能详细描述>
 **/
public class WebSocketChannelInterceptor implements ChannelInterceptor {


    Logger log = Logger.getLogger(WebSocketChannelInterceptor.class);

    // 在消息发送之前调用,方法中可以对消息进行修改,如果此方法返回值为空,则不会发生实际的消息发送调用
    @Override
    public Message<?> preSend(Message<?> message, MessageChannel messageChannel) {

        StompHeaderAccessor accessor =  MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
        /**
         * 1. 判断是否为首次连接请求,如果已经连接过,直接返回message
         * 2. 网上有种写法是在这里封装认证用户的信息,本文是在http阶段,websockt 之前就做了认证的封装,所以这里直接取的信息
         */
        if(StompCommand.CONNECT.equals(accessor.getCommand()))
        {
            /*
             * 1. 这里获取就是JS stompClient.connect(headers, function (frame){.......}) 中header的信息
             * 2. JS中header可以封装多个参数,格式是{key1:value1,key2:value2}
             * 3. header参数的key可以一样,取出来就是list
             * 4. 样例代码header中只有一个token,所以直接取0位
             */
            String token = accessor.getNativeHeader("token").get(0);

            /*
             * 1. 这里直接封装到StompHeaderAccessor 中,可以根据自身业务进行改变
             * 2. 封装大搜StompHeaderAccessor中后,可以在@Controller / @MessageMapping注解的方法中直接带上StompHeaderAccessor
             *    就可以通过方法提供的 getUser()方法获取到这里封装user对象
             * 2. 例如可以在这里拿到前端的信息进行登录鉴权
             */
            WebSocketUserAuthentication user = (WebSocketUserAuthentication) accessor.getUser();

            System.out.println("认证用户:" + user.toString() + " 页面传递令牌" + token);

        }else if (StompCommand.DISCONNECT.equals(accessor.getCommand()))
        {
			System.out.println("用户:" + accessor.getUser() + " 断开连接");
        }
        return message;
    }

    // 在消息发送后立刻调用,boolean值参数表示该调用的返回值
    @Override
    public void postSend(Message<?> message, MessageChannel messageChannel, boolean b) {

        StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message);

        /*
         * 拿到消息头对象后,我们可以做一系列业务操作
         * 1. 通过getSessionAttributes()方法获取到websocketSession,
         *    就可以取到我们在WebSocketHandshakeInterceptor拦截器中存在session中的信息
         * 2. 我们也可以获取到当前连接的状态,做一些统计,例如统计在线人数,或者缓存在线人数对应的令牌,方便后续业务调用
         */
        HttpSession httpSession = (HttpSession) accessor.getSessionAttributes().get("HTTP_SESSION");

        // 这里只是单纯的打印,可以根据项目的实际情况做业务处理
        log.info("postSend 中获取httpSession key:" + httpSession.getId());

        // 忽略心跳消息等非STOMP消息
        if(accessor.getCommand() == null)
        {
            return;
        }

        // 根据连接状态做处理,这里也只是打印了下,可以根据实际场景,对上线,下线,首次成功连接做处理
        System.out.println(accessor.getCommand());
        switch (accessor.getCommand())
        {
            // 首次连接
            case CONNECT:
                log.info("httpSession key:" + httpSession.getId() + " 首次连接");
                break;
            // 连接中
            case CONNECTED:
                break;
            // 下线
            case DISCONNECT:
                log.info("httpSession key:" + httpSession.getId() + " 下线");
                break;
             default:
                break;
        }


    }
    /*
     * 1. 在消息发送完成后调用,而不管消息发送是否产生异常,在次方法中,我们可以做一些资源释放清理的工作
     * 2. 此方法的触发必须是preSend方法执行成功,且返回值不为null,发生了实际的消息推送,才会触发
     */
    @Override
    public void afterSendCompletion(Message<?> message, MessageChannel messageChannel, boolean b, Exception e) {

    }

    /* 1. 在消息被实际检索之前调用,如果返回false,则不会对检索任何消息,只适用于(PollableChannels),
     * 2. 在websocket的场景中用不到
     */
    @Override
    public boolean preReceive(MessageChannel messageChannel) {
        return true;
    }

    /*
     * 1. 在检索到消息之后,返回调用方之前调用,可以进行信息修改,如果返回null,就不会进行下一步操作
     * 2. 适用于PollableChannels,轮询场景
     */
    @Override
    public Message<?> postReceive(Message<?> message, MessageChannel messageChannel) {
        return message;
    }

    /*
     * 1. 在消息接收完成之后调用,不管发生什么异常,可以用于消息发送后的资源清理
     * 2. 只有当preReceive 执行成功,并返回true才会调用此方法
     * 2. 适用于PollableChannels,轮询场景
     */
    @Override
    public void afterReceiveCompletion(Message<?> message, MessageChannel messageChannel, Exception e) {
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136

服务端消息处理器

  • MessageMapping
    Spring对于WebSocket封装的特别简单,提供了一个@MessageMapping注解,功能类似@RequestMapping,它是存在于Controller中的,定义一个消息的基本请求,功能也跟@RequestMapping类似,包括支持通配符``的url定义等等,详细用法参见Annotation Message Handling

  • SimpMessagingTemplate
    SimpMessagingTemplate是Spring-WebSocket内置的一个消息发送工具,可以将消息发送到指定的客户端。

消息发送回无非就是两种情况,广播和点对点式

广播式消息

@MessageMapping("/change-notice")
@SendTo("/topic/notice")
public String greeting(String value) {    
    return value;
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • @SendTo定义了消息的目的地。结合例子解释就是“接收/app/change-notice发来的value,然后将value转发到/topic/notice客户端。

  • /topic/notice是客户端发起连接后,订阅服务端消息时指定的一个地址,用于接收服务端的返回。

点对点式消息

/**
     * 根据ID 把消息推送给指定用户
     * 1. 这里用了 @SendToUser 和 返回值 其意义是可以在发送成功后回执给发送放其信息发送成功
     * 2. 非必须,如果实际业务不需要关心此,可以不用@SendToUser注解,方法返回值为void
     * 3. 这里接收人的参数是用restful风格带过来了,websocket把参数带到后台的方式很多,除了url路径,
     *    还可以在header中封装用@Header或者@Headers去取等多种方式
     * @param accountId 消息接收人ID
     * @param json 消息JSON字符串
     * @param headerAccessor
     * @return
     */
    @MessageMapping("/sendChatMsgById/{accountId}")
    @SendToUser(value = "/userTest/callBack")
    public Map<String, Object> sendChatMsgById(
        @DestinationVariable(value = "accountId") String accountId, String json,
        StompHeaderAccessor headerAccessor)
    {
        Map msg = (Map)JSON.parse(json);
        Map<String, Object> data = new HashMap<String, Object>();

        // 这里拿到的user对象是在WebSocketChannelInterceptor拦截器中绑定上的对象
        WebSocketUserAuthentication user = (WebSocketUserAuthentication)headerAccessor.getUser();

        log.info("SendToUser controller 中获取用户登录令牌:" + user.getName()
                + " socketId:" + headerAccessor.getSessionId());

        // 向用户发送消息,第一个参数是接收人、第二个参数是浏览器订阅的地址,第三个是消息本身
        // 如果服务端要将消息发送给特定的某一个用户,
        // 可以使用SimpleMessageTemplate的convertAndSendToUser方法(第一个参数是用户的登陆名username)
        String address = "/userTest/callBack";
        messagingTemplate.convertAndSendToUser(accountId, address, msg.get("msg"));

        data.put("msg", "callBack 消息已推送,消息内容:" + msg.get("msg"));
        return data;
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35

前端VUE实现

vue项目需要先引入sockjs-clientstompjs这两个库

// 安装并引入相关模块
import SockJS from 'sockjs-client';
import Stomp from 'stompjs';
import {getToken} from './auth'//这是我自己的获取token的方法,可以根据具体业务进行修改

export default {
  methods: {
    initWebSocket() {
      this.connection();
      let self = this;
      // 断开重连机制,尝试发送消息,捕获异常发生时重连
      this.timer = setInterval(() => {
        try {
          self.stompClient.send("alive");
        } catch (err) {
          console.log("断线了: " + err);
          self.connection();
        }
      }, 5000);
    },
    connection() {
      // 建立连接对象
      this.socket = new SockJS('http://127.0.0.1:8081/websocket');//连接服务端提供的通信接口,连接以后才可以订阅广播消息和个人消息,注意这里用的是http而不是原生WebSocket的ws
      
      // 获取STOMP子协议的客户端对象
      this.stompClient = Stomp.over(this.socket);

      // 定义客户端的认证信息,按需求配置
      const token = getToken();
      let headers = {
        'Authorization': token,
      };
      // 向服务器发起websocket连接
      this.stompClient.connect(headers, this.onConnected, this.onFailed);
    },
    onConnected: function (frame) {
      console.log("Connected: " + frame);
      let topic = "/ws/topic/charger.messageTopic";
      this.stompClient.subscribe(topic,this.onFailed);
    },
    onFailed(frame) {
      console.log("Failed: " + frame);
    },
    disconnect() {
      if (this.stompClient != null) {
        this.stompClient.disconnect();
        console.log("Disconnected");
      }
    }
  }
};
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51

部分内容参考:https://www.jianshu.com/p/9103c9c7e128?tdsourcetag=s_pctim_aiomsg
STOMP API地址:https://stomp-js.github.io/stomp-websocket/codo/extra/docs-src/Usage.md.html
API中文翻译博文:https://blog.csdn.net/jqsad/article/details/77745379

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/笔触狂放9/article/detail/457922
推荐阅读
相关标签
  

闽ICP备14008679号