赞
踩
先上总体流程图吧,(我没有用visio来画图,而是用了网上的一个工具Process On,所以图看上去不是那么正规。不用visio主要还是因为visio用的太菜了,有机会还是要多练习一下使用visio画图)。
主要起了四个工程,
接着对照上面的图说一下整体流程和思路。用户通过网关登录到服务器上,服务器A和服务器B模拟的是一个微服务的两台服务器,网关可能会让用户连到ServerA上,也可能会连到ServerB上。并且在用户登录的同时会连接WebSocket,WebSocket是消息推送能否实现的关键,为什么要使用WebSocket下面再说。用户登录并连接到服务器上以后,将用户的登录名和服务器信息写入Redis中(Key:userName,Value:serverIpAdress+port)。为什么要记录服务器信息呢?因为每台服务器都会订阅一个以自己ip地址加端口号命名的topic,如下图所示:
当消息的生产者有一条消息要推送时,会先发给Dispatch(这里的流程跟上图有些不一样,按理说应该要先存入一个队列中,然后让Dispatch自己主动去取然后再推送给指定的消息队列。为什么要这样做呢?按照架构师他的话来说,就是要在Dispatch这里做持久化等等一系列操作,真正的工程是很复杂的,消息推送不可能像我上面这个图一样简单)。但是因为一些原因我并没有这样做,而是直接让生产者将消息推给了Dispatch。Dispatch收到消息后,会在Redis中去查找用户所连接的服务器,然后将消息推送到该服务器监听的消息队列中,服务器最后通过WebSocket将消息推给用户。
首先抛出几个问题:
Q1:什么是WebSocket?
A1:WebSocket是Html5提供的一种能在单个TCP连接上进行全双工通讯的协议,是HTTP协议中长连接的升级版。全双工通讯就是你能主动给我发消息,我也能主动给你发消息。
Q2:为什么要使用WebSocket?
A2:使用WebSocket最大的好处就是客户端能主动向服务端发送消息,服务端也能主动向客户端发送消息,能最大程度的保证信息交换的实时性。Web应用的信息交互过程通常是客户端通过浏览器发出一个请求,服务器端接收和审核完请求后进行处理并返回结果给客户端,然后客户端浏览器将信息呈现出来,这种机制对于信息变化不是特别频繁的应用尚能应付,但是对于那些实时要求比较高的应用来说,比如说在线游戏、股票查询,在线证券、设备监控、新闻在线播报、RSS订阅推送等等,当客户端浏览器准备呈现这些信息的时候,这些信息在服务器端可能已经过时了。所以保持客户端和服务器端的信息同步是实时Web 应用的关键要素。
在WebSocket出现之前,客户端是怎么和服务端进行信息交换的呢?最常用的方式就是轮询,而轮询又分为短轮询和长轮询。
短轮询就相当于客户端每隔一定时间就主动发送一个request给服务端请求数据交换,服务端不管有没有新的数据都要对客户端的请求进行响应。短轮询一般都是客户端使用ajax来实现的,用户一般是不会感觉到客户端发生的变化的,但是这种短轮询的方式对服务端来说会有不小的压力。因为每次客户端和服务端之间信息的交换都对应着一次Request-Response的过程,每发生一次这个过程,双方都要交换大量的请求头和响应头,这增加了每次传输的数据量;并且轮询的时间间隔非常难控制,时间短了,容易造成服务器CPU资源的浪费,而时间长了又难以保证信息的实时性。
长轮询则相当于客户端给服务端打一个电话,如果服务端有数据更新,那么它就接起这个电话(响应),将数据返回给客户端;如果服务端没有数据更新,那么这个电话就一直挂在那(不响应),直到有数据更新才接起电话。长轮询的缺点很明显,服务器的并发连接数是有限的,如果服务端一直没有更新数据而一直保持这个连接,那么这个空闲的连接就是一种资源浪费。
WebSocket相对于轮询的优势很明显,WebSocket只需要建立一次连接,客户端和服务端只需要交换一次请求头和响应头就可以无数次交换信息。
Spring Boot集成WebSocket的maven依赖:
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-websocket</artifactId>
- </dependency>
WebSocket配置文件WebSocketAutoConfig:
-
- /**
- * websocket配置
- */
- @Configuration
- @EnableWebSocketMessageBroker
- public class WebSocketAutoConfig implements WebSocketMessageBrokerConfigurer {
- @Override
- public void registerStompEndpoints(StompEndpointRegistry registry) {
- registry.addEndpoint("/mq") //开启/bullet端点
- .setAllowedOrigins("*") //允许跨域访问
- .withSockJS(); //使用sockJS
- }
- }
前端页面,前端通过SockJS来进行WebSocket连接(其中比较关键的代码我都写了注释,就不再一一解释了):
- <!DOCTYPE html>
- <html xmlns:text-align="http://java.sun.com/JSP/Page">
- <head>
- <meta charset="UTF-8" />
- <title>h3</title>
- <noscript>
- <h2 style="color:#ff0000">貌似你的浏览器不支持websocket</h2>
- </noscript>
- <script src="/user-service/static/sockjs.js"></script>
- <script src="/user-service/static/stomp.js"></script>
- <script src="/user-service/static/jquery.js"></script>
- <script src="/user-service/static/bootstrap-3.3.7-dist/js/bootstrap.min.js"></script>
- <script src="/user-service/static/bootstrap-3.3.7-dist/css/bootstrap.min.css"></script>
- <script type="text/javascript">
- function loginin() {
- $.ajax({
- type:"get",
- data:"username=" + document.getElementById("username1").value,
- url:"login"
- })
- connect();
- }
-
- var stompClient = null;
- //gateway网关的地址
- var host="http://localhost:4445/user-service";
- function setConnected(connected) {
- document.getElementById('disconnect').disabled = !connected;
- $('#response').html();
- }
- function connect() {
- //地址+端点路径,构建websocket链接地址
- var socket = new SockJS(host+'/mq');
- var username = document.getElementById("username1").value;
- stompClient = Stomp.over(socket);
- stompClient.connect({}, function(frame) {
- setConnected(true);
- console.log('Connected:' + frame);
- //监听一个具有自己信息的对列(/toAll/id)
- //把用户名作为自己监听的消息队列标识
- stompClient.subscribe('/toAll/' + username, function(response) {
- showResponse(response.body);
- });
- });
- }
- function disconnect() {
- if (stompClient != null) {
- stompClient.disconnect();
- }
- setConnected(false);
- console.log("Disconnected");
- }
- function send() {
- var name = $('#name').val();
- var message = $('#messgae').val();
- //发送消息的路径
- stompClient.send("/chat", {}, JSON.stringify({username:name,message:message}));
- }
- function showResponse(message) {
- var response = $('#response');
- response.html(message);
- }
- </script>
- </head>
- <body onload="disconnect()">
-
- <div class="input-group" id="login" style="margin:200px auto;width:200px">
- <span class="input-group-addon" id="basic-addon1">用户名</span>
- <input type="text" class="form-control" placeholder="请输入用户名" aria-describedby="basic-addon1" id="username1"
- name="username" style="width: 200px">
- </div>
- <div style="position:absolute;left:700px;top:250px" class="btn-group" role="group" aria-label="...">
- <button type="button" class="btn btn-default" id="loginbutton" onclick="loginin()">登录</button>
- <button type="button" class="btn btn-default" id="disconnect" onclick="disconnect();">断开连接</button>
- </div>
- <div style="position:absolute;left:670px;top:60px;height: 100px">
- <h1 id="response"></h1>
- </div>
-
- </body>
- </html>

使用WebSocket给单个用户发送信息:
- /**
- * 用户在前端订阅的消息队列名称为"/toAll/username"
- * 通过teleWebSocketManager发送信息给指定用户
- */
- @ResponseBody
- @RequestMapping(value="/sendToOne", produces = {"application/json; charset=utf-8"},method= RequestMethod.GET)
- public String sendToOne(String username, String message){
- teleWebSocketManager.send("/toAll/" + username, message);
- return "成功";
- }
Spring Cloud Gateway是Spring Cloud下的一个全新项目,它创建了一个在Spring控制下的API网关,它的目的是提供一个简单、高效的api路由方式。
Spring Cloud Gateway的特点:
以上是从Spring官网翻译过来的,英语水平有限,大家可以自己去官网看一下官方文档。
首先是Spring Cloud Gateway的相关依赖:
- <dependencyManagement>
- <dependencies>
- <dependency>
- <groupId>org.springframework.cloud</groupId>
- <artifactId>spring-cloud-dependencies</artifactId>
- <version>Finchley.RELEASE</version>
- <type>pom</type>
- <scope>import</scope>
- </dependency>
- </dependencies>
- </dependencyManagement>
-
- <dependency>
- <groupId>org.springframework.cloud</groupId>
- <artifactId>spring-cloud-starter-gateway</artifactId>
- <version>2.0.1.RELEASE</version>
- <exclusions>
- <exclusion>
- <groupId>org.springframework.cloud</groupId>
- <artifactId>spring-cloud-gateway-core</artifactId>
- </exclusion>
- </exclusions>
- </dependency>

Spring Cloud Gateway配置路由有两种方式,一种是在代码中进行配置,还有一种是在配置文件中进行配置。在代码中进行配置扩展性较差,一般都不推荐;使用配置文件进行配置是比较好的一种方式,路由配置文件:
- server:
- port: 4445
-
- spring:
- application:
- name: gateway-service
- profiles:
- active: dev
- cloud:
- consul:
- host: localhost
- port: 8500
- # 服务发现配置
- discovery:
- serviceName: gateway-service
- #路由配置,user-service是我的服务名
- gateway:
- routes:
- #表示websocket的转发
- - id: user-service-websocket
- uri: lb:ws://user-service
- predicates:
- - Path=/user-service/mq/**
- - id: user-service
- uri: lb://user-service
- predicates:
- - Path=/user-service/**
- discovery:
- locator:
- enabled: true
- lower-case-service-id: true

上面是路由的配置,下面说一个很关键的问题,这个问题关系到websocket能否连接成功。我们的前端是通过SockJS来建立WebSocket连接的,什么是SockJS?(这个链接指向的是Spring的官方文档,关于SockJS的这一段在20.3.1节中,大家就不用自己去找了,直接到20.3.1节中去看就好了)。
官方文档中20.3.1节里第一句话就是:"The goal of SockJS is to let applications use a WebSocket API but fall back to non-WebSocket alternatives when necessary at runtime, i.e. without the need to change application code." 翻译过来,大致意思就是说SockJS的作用是让WEB应用使用WebSocket API,但是在必要的运行时能改变成非WebSocket(比如HTTP的长连接)而无需改动代码。SockJS被设计成在浏览器中使用,它也竭尽全力的使用各种技术去支持所有的浏览器版本。有关SockJS传输类型和浏览器的完整列表,可以在文档中查看。SockJS可用的传输类型有三种:WebSocket、HTTP Streaming、HTTP长轮询。
下面这段比较重要:"The SockJS client begins by sending "GET /info"
to obtain basic information from the server. After that it must decide what transport to use. If possible WebSocket is used. If not, in most browsers there is at least one HTTP streaming option and if not then HTTP (long) polling is used.",大致意思就是说SockJS客户端开始时会发送一个GET类型的"/info"请求从服务器去获取基本信息,这个请求之后SockJS必须决定使用哪种传输,可能是WebSocket,如果不是的话,在大部分浏览器中会使用HTTP Streaming或者HTTP长轮询。那么我们应该怎么去处理这个/info请求呢?那么我们可以在网关中配置一个拦截器,当拦截到/info请求时就对它进行处理。
拦截器代码:
-
- import org.springframework.cloud.gateway.filter.GatewayFilterChain;
- import org.springframework.cloud.gateway.filter.GlobalFilter;
- import org.springframework.core.Ordered;
- import org.springframework.stereotype.Component;
- import org.springframework.web.server.ServerWebExchange;
- import org.springframework.web.util.UriComponentsBuilder;
- import reactor.core.publisher.Mono;
-
- import java.net.URI;
-
- import static org.springframework.cloud.gateway.support.ServerWebExchangeUtils.GATEWAY_REQUEST_URL_ATTR;
-
- /**
- * @author: wang.mh
- * 2019/6/19 17:05
- */
- @Component
- public class WebsocketHandler implements GlobalFilter, Ordered {
- private final static String DEFAULT_FILTER_PATH = "/user-service/info";
-
- /**
- *
- * @param exchange ServerWebExchange是一个HTTP请求-响应交互的契约。提供对HTTP请求和响应的访问,
- * 并公开额外的 服务器 端处理相关属性和特性,如请求属性
- * @param chain
- * @return
- */
- @Override
- public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
- String upgrade = exchange.getRequest().getHeaders().getUpgrade();
-
- URI requestUrl = exchange.getRequiredAttribute(GATEWAY_REQUEST_URL_ATTR);
-
- String scheme = requestUrl.getScheme();
-
- if (!"ws".equals(scheme) && !"wss".equals(scheme)) {
- return chain.filter(exchange);
- } else if (DEFAULT_FILTER_PATH.equals(requestUrl.getPath())) {
- String wsScheme = convertWsToHttp(scheme);
- URI wsRequestUrl = UriComponentsBuilder.fromUri(requestUrl).scheme(wsScheme).build().toUri();
- exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, wsRequestUrl);
- }
- return chain.filter(exchange);
- }
-
- @Override
- public int getOrder() {
- return Ordered.LOWEST_PRECEDENCE - 2;
- }
-
- static String convertWsToHttp(String scheme) {
- scheme = scheme.toLowerCase();
- return "ws".equals(scheme) ? "http" : "wss".equals(scheme) ? "https" : scheme;
- }
- }

更加确切的来说应该是消息队列的作用,我认为消息中间件是对消息队列的一种包装,让消息队列具有更多功能,使其成为一种可配置可扩展的一种工具。一般提起消息队列,都会说它有以下三种作用:
中间件名称 | 单机吞吐量 | 客户端语言 | 文档完备性 | 社区活跃度 | 商业支持 | 消息丢失 | 所属公司 |
---|---|---|---|---|---|---|---|
ActiveMQ | 万级 | 支持多语言,java优先 | 高 | 高 | 无 | 低 | Apache |
RabbitMQ | 万级 | 多语言 | 高 | 高 | 无 | 低 | Mozilla Public License |
RocketMQ | 十万级 | 只支持java | 高 | 中 | Alibaba | 理论上不会 | 阿里开发,已捐献给Apache |
Kafka | 十万级 | 多语言,java优先 | 高 | 高 | 无 | 理论上不会 | Apache |
RocketMQ架构主要分为四部分:
心跳机制:
Master和Slave:
从字面上理解,master即主人,slave即奴隶。一个master可以配置一个或多个slave,master在线的时候,slave没有起到Broker的作用,但是slave每隔一段时间就会自动从master那里同步数据;一旦Name Server判定某一个matser宕机,那么这个master对应的slave就会升级为master,此前master上连接的消费者都会转至slave上获取消息进行消费(master宕机后,producer可能第一时间不知道,中间存在一个时间差,所以producer仍然会向该master发送消息,这个消息暂时不会被消费,但是也不会丢失,该master的故障排除重新上线后,积压的消息依然会被消费)。
方案中,我将消费者Consumer配在用户服务UserService上,UserService拿到消息后,再通过WebSocket将消息传递给用户。
- /**
- * @author: wang.mh
- * 2019/6/26 13:56
- */
- @Controller
- public class DemoRocketMqConsumerExample
- extends AbstractRocketMqConsumer<DemoRocketMqTopic, DemoRocketMqContent> {
-
- @Autowired
- private CharExchangeUtil charExchangeUtil;
-
- @Autowired
- private RedisUtil redisUtil;
-
- @Resource
- private TeleWebSocketManager teleWebSocketManager;
-
- @Autowired
- private IpUtil ipUtil;
-
- @Autowired
- private DemoRocketMqTopic demoRocketMqTopic;
-
- //获取服务器端口号
- @Value("${server.port}")
- private String serverPort;
-
- //设定消息订阅组,使用 ipadress + port 作为topic名字
- @Override
- public Map<String, Set<String>> subscribeTopicTags() {
- Map<String, Set<String>> topicSetMap = new HashMap<>();
- Set<String> tagSet = new HashSet<>();
- topicSetMap.put(demoRocketMqTopic.getTopic(), tagSet);
- return topicSetMap;
- }
-
- //消费者名称
- @Override
- public String getConsumerGroup() {
- return "spring-boot-test-consumer-group";
- }
-
- //消费者获取消息
- @Override
- public boolean consumeMsg(DemoRocketMqContent content, MessageExt msg) {
- logger.info("receive msg[{}], topic:{}, tag:{}, content:{}", msg, content);
- System.out.println(content.toString());
- sendToOne(content.getUsername(), content.getMessage());
- return false;
- }
-
- /**
- * 跳转到登录页面
- * @return
- */
- @RequestMapping(value = "page")
- public String pagechange(){
- return "h1";
- }
-
- /**
- * 用户登录并获取用户登录信息保存到redis中
- * @param username
- * @return
- */
- @ResponseBody
- @RequestMapping("login")
- public int topagewebsocket(String username) {
-
- String ipAndPort = ipUtil.getIpAndPort();
- //RocketMQ中订阅组命名不允许出现". :"这样的符号,所以进行处理,去掉字符创中的非法符号
- String newstring = charExchangeUtil.newexchange(ipAndPort);
-
- //将用户名和订阅组信息保存到redis中
- redisUtil.set(username, newstring);
- System.out.println("-------------username: " + username + "---ipadress: "
- + newstring + "-------------");
- return 0;
- }
-
-
- //单对单发送
- @ResponseBody
- @RequestMapping(value="/sendToOne", produces = {"application/json; charset=utf-8"},method= RequestMethod.GET)
- public String sendToOne(String username, String message){
- System.out.println("---------username="+username+"-------------message="+message);
- teleWebSocketManager.send("/toAll/" + username, message);
- return "成功";
- }
-
- }

这里的生产者我配置在Dispatch中,其实消息并不是由dispatch产生的,而是专门模拟消息生产的messageproducer产生的。
- /**
- * 对接收的消息进行分发
- *
- * @author: wang.mh
- * 2019/6/13 17:18
- */
- @Component
- @Controller
- public class DemoRocketMqDispatchExample {
-
- @Resource
- private DefaultRocketMqProducer producer;
-
- @Autowired
- private RedisUtil redisUtil;
-
- @ResponseBody
- @RequestMapping(value = "/messagereceive")
- public void messageReceive(String username, String message){
- System.out.println("-------------username:"+username+"--------message:"+message);
- DemoRocketMqContent content = new DemoRocketMqContent();
- content.setUsername(username);
- content.setMessage(message);
-
- String destination = (String) redisUtil.get(content.getUsername());
- Message msg = new Message(destination, "TagA", content.toString().getBytes());
- boolean sendResult = producer.sendMsg(msg);
- System.out.println(msg);
- System.out.println(content.toString());
- System.out.println("发送结果:" + sendResult);
-
- }
-
- }

暂时先写到这里,最近精神状态不是很好,很难集中记忆力,以后想到什么再继续写。
代码已经传到github(其中dispatch和messageproducer是可以合并的)。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。