当前位置:   article > 正文

java基于Redis及WebSocket实现聊天室(二)_java redis+websocket 实现多人聊天 把群聊信息放到redis

java redis+websocket 实现多人聊天 把群聊信息放到redis

------------------2019-4-17---------------
优化界面后 移动端运行如图:聊天室
网页端运行如图,主要分辨率调成手机端适应的了,网页的不缩放的话有点大。
在这里插入图片描述
------------------2019-4-17---------------
P:我之前测试阶段把整个springCloud的配置一起打包了…其实只用打包一个service的jar包即可。下面的附件有点大个…实际上里面有用的就一个service-hi,其他都是springCloud的配置

在(一)中提到在本地运行成功但是在Linux下运行失败的原因已经找到了主要是跨域的问题导致websocket连接失败。

简单解决方法:

wsUrl = "ws://localhost:8763/websocket/" + channel + "/" + name;
  • 1

改成获取本地路径的,这种方式修改不适用反向代理等方式,因为他获取的是地址,反向代理后地址不一样还是会跨域

  wsUrl = "ws://" + document.domain + ":8763/websocket/" + channel + "/" + name;
  • 1

WebSocket实现

前端

 wsUrl = "ws://" + document.domain + ":8763/websocket/" + channel + "/" + name;
  • 1

然后用几个方法来接收后端的消息

		 ws.onclose = function () {
            console.log('链接关闭');
            reconnect();
        };
        ws.onerror = function () {
            console.log('发生异常了');
            reconnect();
        };
        ws.onopen = function () {
            console.log('链接开启');
            //心跳检测重置
            heartCheck.start();
        };
        ws.onmessage = function (event) {
            if (event.data != '心跳检测') {
                setMessageInnerHTML(event.data);
            }
            //拿到任何消息都说明当前连接是正常的 后台收到消息后要发送消息确保收到消息
            heartCheck.start();
        }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

这里的错误及失败需要通过重连机制来保证重连
打开连接时开启心跳检测机制,然后在收到消息后重置心跳检测机制的事件。
重启及心跳机制如下:

    function reconnect() {
        if (lockReconnect) {
            return;
        }
        lockReconnect = true;
        //没连接上会一直重连,设置延迟避免请求过多
        tt && clearTimeout(tt);
        tt = setTimeout(function () {
            createWebSocket(channel, name);
            lockReconnect = false;
        }, 4000);
    }
    //用来保持一致连接的状态
    var heartCheck = {
        timeout: 30000,  //  心跳检测时长
        timeoutObj: null, // 定时变量
        serverTimeoutObj: null,
        start: function () {
            var self = this;
            this.timeoutObj && clearTimeout(this.timeoutObj);
            this.serverTimeoutObj && clearTimeout(this.serverTimeoutObj);
            this.timeoutObj = setTimeout(function () {
                ws.send("HeartBeat");
                self.serverTimeoutObj = setTimeout(function () {
                    ws.close();//如果onclose会执行reconnect,我们执行ws.close()就行了.如果直接执行reconnect 会触发onclose导致重连两次
                }, self.timeout)
            }, this.timeout)
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29

心跳检测是因为我在使用的时候发现websocket一段时间不用就会自动断开,网上也有很多心跳的资料参考,但是资料中都没有提一个关键的地方onmessage 里面一定要重置心跳检测的时间,并且后台在收到心跳的消息后要发回消息,这样才能成功。
心跳机制在实际应用上我觉得还是比较重要的。

后台

后台需要建立一个专有的控制器来接收websocket的连接请求,并需要注入一个Config的Bean。

package com.cjdjyf.servicehi.websocket;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;
import org.springframework.web.socket.handler.TextWebSocketHandler;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;

/**
 * @author :cjd
 * @description: 
 * @create 2019-04-12 11:57
 **/
@Configuration
public class WebSocketConfig   {
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
package com.cjdjyf.servicehi.websocket;

import com.cjdjyf.servicehi.redis.Subscriber;
import org.springframework.boot.autoconfigure.web.ServerProperties;
import org.springframework.stereotype.Component;

import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;

/**
 * @author :cjd
 * @description: websocket连接类
 * @create 2019-04-12 11:57
 **/
@ServerEndpoint(value = "/websocket/{channel}/{name}")
@Component
public class MyWebSocket {
    private Session session;
    private Thread thread;
    private Subscriber subscriber;
    private static ConcurrentHashMap<String, Subscriber> map = new ConcurrentHashMap<>();

    /**
     * 连接建立成功调用的方法*/
    @OnOpen
    public void onOpen(@PathParam(value = "channel") String channel, @PathParam(value = "name") String name,
                       Session session) {
        this.session = session;
        Subscriber before = map.get(name);
        //如果第一次连接
        if (before == null) {
            subscriber = new Subscriber(channel, name, this);
            thread = new Thread(this.subscriber);
            thread.start();
            map.put(name, subscriber);
        } else {
            //多次连接 如果频道和之前的不一样则订阅新的频道并取消之前订阅的频道
            if (!before.getChannel().equals(channel)) {
                before.unsubscribe();
                subscriber = new Subscriber(channel, name, this);
                thread = new Thread(this.subscriber);
                thread.start();
                map.put(name, subscriber);
            }
        }
      /*  if (set.add(name)) {
            this.subscriber = new Subscriber(channel, name, this);
            thread = new Thread(this.subscriber);
            thread.start();
        }*/
    }

    /**
     * 连接关闭调用的方法
     */
    @OnClose
    public void onClose() {
        //退出后要取消之前订阅的频道并从map中移除
        if (thread != null && thread.isAlive()) {
            map.remove(subscriber.getName());
            subscriber.unsubscribe();
            thread.interrupt();
        }
    }

    @OnMessage
    public void onMessage(String message) {
        try {
            sendMessage("心跳检测");
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @OnError
    public void onError(Session session, Throwable error) {
    }


    public void sendMessage(String message) throws IOException {
        session.getBasicRemote().sendText(message);
        //this.session.getAsyncRemote().sendText(message);
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88

对(一)中的订阅者也作出了略微的修改

package com.cjdjyf.servicehi.redis;

import com.cjdjyf.servicehi.websocket.MyWebSocket;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;

import javax.websocket.Session;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;

/**
 * @author :cjd
 * @description: 订阅者
 * @create 2019-04-11 16:08
 **/
public class Subscriber extends JedisPubSub implements Runnable {
    private String channel;
    private MyWebSocket webSocket;
    private String name;
    private SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

    public Subscriber(String channel, String name, MyWebSocket webSocket) {
        this.channel = channel;
        this.name = name;
        this.webSocket = webSocket;
    }

    @Override
    public void onMessage(String channel, String message) {       //收到消息会调用
        try {
            webSocket.sendMessage(message);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void onSubscribe(String channel, int subscribedChannels) {    //订阅了频道会调用
        RedisUtil.publish(channel, format.format(new Date()) + " " + name + "加入聊天室" + channel);
    }

    @Override
    public void onUnsubscribe(String channel, int subscribedChannels) {   //取消订阅 会调用
        RedisUtil.publish(channel, format.format(new Date()) + " " + name + "退出聊天室" + channel);
    }

    public void run() {
        //开一个线程去订阅,阻塞
        Jedis jedis = null;
        try {
            jedis = RedisUtil.getJedis();
            jedis.subscribe(this, channel);
        } catch (Exception e) {
        } finally {
            if (jedis != null) {
                jedis.close();
            }
        }

    }

    public String getChannel() {
        return channel;
    }

    public void setChannel(String channel) {
        this.channel = channel;
    }

    public MyWebSocket getWebSocket() {
        return webSocket;
    }

    public void setWebSocket(MyWebSocket webSocket) {
        this.webSocket = webSocket;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87

整体上,我的做法和网上的一些资料有些不同。

实现上主要依赖了Redis的发布/订阅。

本实现是由Redis精准地将消息发送给订阅了频道的WebSocket,再由各自的WebSocket发送消息给各自的前端接收。
还有一种只用WebSocket来实现,是直接将消息发送给自己的WebSocket,再由自己的消息处理中调用别的WebSocket的推送消息方法。
看代码应该可以发现一些不同

我的发送消息给前端代码是集中在订阅者,由Redis的publish触发订阅了该频道的订阅者的事件onMessage然后发送给各自保存下来的webSocket(或session)发送给各自的前端。

 @Override
    public void onMessage(String channel, String message) {       //收到消息会调用
        try {
            webSocket.sendMessage(message);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

而网上的一般都是用Map存下webSocket然后通过迭代器循环发送给各自连接的前端。

项目运行

项目求简单被我放在了之前的SpringBoot下,然后用maven打包成jar包,拖到Linux后

nohup java -jar xxx.jar  >xxx.txt &
  • 1

来进行后台运行

建立脚本

start.sh
#!/bin/bash
nohup java -jar eureka-server-exec.jar >eureka.txt &
nohup java -jar config-server-exec.jar >config.txt &
nohup java -jar service-hi-exec.jar >hi.txt &
  • 1
  • 2
  • 3
  • 4
  • 5
stop.sh

#!/bin/bash
PID=$(ps -ef | grep eureka-server-exec.jar | grep -v grep | awk '{ print $2 }')
if [ -z "$PID" ]
then
    echo Application is already stopped
else
    echo kill $PID
    kill $PID
fi

PID=$(ps -ef | grep config-server-exec.jar | grep -v grep | awk '{ print $2 }')
if [ -z "$PID" ]
then
    echo Application is already stopped
else
    echo kill $PID
    kill $PID
fi

PID=$(ps -ef | grep service-hi-exec.jar | grep -v grep | awk '{ print $2 }')
if [ -z "$PID" ]
then
    echo Application is already stopped
else
    echo kill $PID
    kill $PID
fi
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29

授予文件读写权,然后防火墙开端口,就可以在外网访问了。
后续准备优化一下界面及用redis存一下聊天记录。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/笔触狂放9/article/detail/325505
推荐阅读
相关标签
  

闽ICP备14008679号