赞
踩
目录
要实现单聊和群聊需要考虑哪些方面?
1、需要有客户端和服务端,客户端给用户提供发送消息、接收消息的途径。服务端用于接受客户端的连接,并进行消息接收、消息处理和消息转发。
2、需要统一消息的格式,我这里仅仅是定义一个Message对象,通过type字段区分消息的类型。
如果做的精细点,可以考虑自己设计一个基于tcp的通讯协议,具体思路可以看我上一篇博客的方案二。
3、单聊:服务器把收到的消息转发给目标用户。群聊:服务器收到消息后,转发给指定小组的所有人。
源码地址在最下方,需要的自取。
单聊
群聊
Message对象
- /**
- * 消息
- * @author dayrain
- */
- @Data
- public class Message implements Serializable {
-
- private static final long serialVersionUID = 7753923870637862141L;
-
- /**
- * 发送人
- */
- private String sendFrom;
-
- /**
- * 接收人
- */
- private String sendTo;
-
- /**
- * 消息内容
- */
- private String content;
-
- /**
- * 消息类型
- */
- private Integer type;
-
- /**
- * 发送时间
- */
- private String sendTime;
-
- /**
- * 创建登录消息
- * @param sendFrom 登录人
- * @return 消息体
- */
- public static Message createLoginMessage(String sendFrom) {
- Message message = new Message();
- message.setSendFrom(sendFrom);
- message.setType(Constants.MessageType.LOGIN);
- message.setSendTime(TimeUtils.getNowTime());
- return message;
- }
-
- /**
- * 单聊时创建的消息
- * @param userId 发送人
- * @param sendTo 接收人
- * @param msg 消息
- * @return 消息体
- */
- public static Message createPrivateMessage(String userId, String sendTo, String msg) {
- Message message = new Message();
- message.setSendFrom(userId);
- message.setSendTo(sendTo);
- message.setContent(msg);
- message.setType(Constants.MessageType.PRIVATE_MESSAGE);
- message.setSendTime(TimeUtils.getNowTime());
- return message;
- }
-
- /**
- * 创建退出消息
- * @param userId 需要退出的用户名
- * @return 消息体
- */
- public static Message createExitMessage(String userId) {
- Message message = new Message();
- message.setType(Constants.MessageType.EXIT);
- message.setSendFrom(userId);
- message.setSendTime(TimeUtils.getNowTime());
- return message;
- }
-
- /**
- * 创建群组消息
- * @param userId 发送人
- * @param groupId 组号
- * @param content 内容
- * @return 消息体
- */
- public static Message createGroupMessage(String userId, String groupId, String content) {
- Message message = new Message();
- message.setType(Constants.MessageType.GROUP_MESSAGE);
- message.setSendFrom(userId);
- message.setSendTo(groupId);
- message.setContent(content);
- message.setSendTime(TimeUtils.getNowTime());
- return message;
- }
- }
data:image/s3,"s3://crabby-images/deb9d/deb9d52e6c78f73fbfaadc6e519fd00d286664e1" alt=""
消息具体的类型大概有下面的几种
- public interface Constants {
- /**
- * 消息类型
- */
- interface MessageType{
- /**
- * 单聊
- */
- Integer PRIVATE_MESSAGE = 1;
- /**
- * 群聊
- */
- Integer GROUP_MESSAGE = 2;
- /**
- * 系统消息,系统推送时用到
- */
- Integer SYSTEM_MESSAGE = 3;
- /**
- * 用户登录
- */
- Integer LOGIN = 4;
- /**
- * 用户退出
- */
- Integer EXIT = 5;
- }
-
- /**
- * 用户循环输入消息时的结束标志
- */
- String EXIT_FLAG = "exit";
- }
data:image/s3,"s3://crabby-images/deb9d/deb9d52e6c78f73fbfaadc6e519fd00d286664e1" alt=""
Group对象,群发的时候用到
- /**
- * 群组
- * @author dayrain
- */
- @Data
- @AllArgsConstructor
- @NoArgsConstructor
- public class Group {
- /**
- * 组号
- */
- private String groupId;
-
- /**
- * 成员
- */
- private List<String> userIds;
- }
data:image/s3,"s3://crabby-images/deb9d/deb9d52e6c78f73fbfaadc6e519fd00d286664e1" alt=""
首先服务端肯定是要常驻内存,随时接受来自客户端的连接。
- /**
- * tcp服务端
- * @author dayrain
- */
- public class TcpServer {
-
- public TcpServer() throws IOException, ClassNotFoundException {
- ServerSocket serverSocket = new ServerSocket(8081);
- while (true) {
- Socket socket = serverSocket.accept();
- TcpServerThread tcpServerThread = new TcpServerThread(socket);
- tcpServerThread.start();
- }
- }
-
- public static void main(String[] args) throws IOException, ClassNotFoundException {
- new TcpServer();
- }
- }
data:image/s3,"s3://crabby-images/deb9d/deb9d52e6c78f73fbfaadc6e519fd00d286664e1" alt=""
Socket socket = serverSocket.accept(); 系统用启动后会阻塞在这里,直到有客户端连接上。
当有客户端连接时,会把socket注入到一个线程里,并启动了这个线程。下面是这个线程的源码:
- /**
- * 服务线程(有一个客户端连接,就有一条线程)
- * @author dayrain
- */
- public class TcpServerThread extends Thread{
-
- private final Socket socket;
-
- private InputStream inputStream;
-
- private OutputStream outputStream;
-
- public TcpServerThread(Socket socket) {
- this.socket = socket;
- try {
- inputStream = socket.getInputStream();
- outputStream = socket.getOutputStream();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
-
- @Override
- public void run() {
- while (true) {
- try {
- Message message = MessageUtils.readMessageFromInputStream(inputStream);
- if(Constants.MessageType.LOGIN.equals(message.getType())) {
- //登录
- TcpServerThreadGroup.addServerThread(message.getSendFrom(), this);
- System.out.println(message.getSendFrom() + "登录成功!");
- continue;
- }
-
- if(Constants.MessageType.PRIVATE_MESSAGE.equals(message.getType())) {
-
- //私聊, 发送给指定的用户
- String sendTo = message.getSendTo();
- TcpServerThreadGroup.getServerThread(sendTo).sendMessageToThisOne(message);
- }else if(Constants.MessageType.GROUP_MESSAGE.equals(message.getType())) {
-
- //群组消息,转发给同组的所有人(除了自己)
- String groupId = message.getSendTo();
- Group group = new GroupService().getGroupByGroupId(groupId);
- for (String userId : group.getUserIds()) {
- if(!userId.equals(message.getSendFrom())) {
- TcpServerThread serverThread = TcpServerThreadGroup.getServerThread(userId);
- if(serverThread != null) {
- TcpServerThreadGroup.getServerThread(userId).sendMessageToThisOne(message);
- }
- }
- }
-
- }else if(Constants.MessageType.EXIT.equals(message.getType())) {
- //退出
- ResourceCloseUtils.closeSocketResource(null, socket);
- ResourceCloseUtils.closeStreamResource(inputStream, outputStream);
- break;
- }
- } catch (IOException | ClassNotFoundException e) {
- e.printStackTrace();
- }
- }
- }
-
- /**
- * 把消息发送给当前用户
- * @param message 消息
- * @throws IOException 异常
- */
- public void sendMessageToThisOne(Message message) throws IOException {
- MessageUtils.writeMessageFromOutputStream(this.outputStream, message);
- }
- }
data:image/s3,"s3://crabby-images/deb9d/deb9d52e6c78f73fbfaadc6e519fd00d286664e1" alt=""
正如”思路“中描述的,服务端充当一个中介的作用,只负责转发。
run方法里面是个死循环,保持和客户端的连接,循环处理客户端发过来的消息。
登录成功后,我们把当前线程加入到一个threadGroup中,threadGroup是一个主控,实际上就是一个map,以键值对的形式存储与客户端的连接。
当服务端需要把消息转发给某个人时,就去这个group中拿。具体实现如下:
- /**
- * 线程组,服务器管理所有连接的线程
- * @author dayrain
- */
- public class TcpServerThreadGroup {
-
- private static final ConcurrentHashMap<String, TcpServerThread> tcpServerGroup = new ConcurrentHashMap<>();
-
- public static void addServerThread(String userId, TcpServerThread tcpServerThread) {
- tcpServerGroup.put(userId, tcpServerThread);
- }
-
- public static TcpServerThread getServerThread(String userId) {
- return tcpServerGroup.get(userId);
- }
- }
data:image/s3,"s3://crabby-images/deb9d/deb9d52e6c78f73fbfaadc6e519fd00d286664e1" alt=""
群聊的时候,需要获取群组信息,我这里是写死的,也可以从数据库中取。
- /**
- * 组服务
- * @author dayrain
- */
- public class GroupService {
-
- private static HashMap <String, Group> groupDao = null;
- static {
- groupDao = new HashMap<>();
- groupDao.put("1", new Group("1", Arrays.asList("张三", "李四", "王二")));
- groupDao.put("2", new Group("2", Arrays.asList("李四", "王二", "赵六")));
- }
-
- Group getGroupByGroupId(String groupId) {
- return groupDao.get(groupId);
- }
- }
data:image/s3,"s3://crabby-images/deb9d/deb9d52e6c78f73fbfaadc6e519fd00d286664e1" alt=""
消息处理工具类
- /**
- * 消息发送、接收的工具类
- * @author dayrain
- */
- public class MessageUtils {
- /**
- * 从输入流中获取对方发送的消息
- * @param inputStream 输入流
- * @return 消息
- * @throws IOException io异常
- * @throws ClassNotFoundException 消息格式不正确
- */
- public static Message readMessageFromInputStream(InputStream inputStream) throws IOException, ClassNotFoundException {
- if(inputStream == null) {
- return null;
- }
- ObjectInputStream objectInputStream = new ObjectInputStream(inputStream);
- return (Message) objectInputStream.readObject();
- }
-
- /**
- * 向输出流中写信息
- * @param outputStream 输出流
- * @param message 消息
- * @throws IOException 异常
- */
- public static void writeMessageFromOutputStream(OutputStream outputStream, Message message) throws IOException {
- if(outputStream == null) {
- return;
- }
- ObjectOutputStream objectOutputStream = new ObjectOutputStream(outputStream);
- objectOutputStream.writeObject(message);
- }
- }
data:image/s3,"s3://crabby-images/deb9d/deb9d52e6c78f73fbfaadc6e519fd00d286664e1" alt=""
客户端和服务器建立连接后,会得到一个Socket对象。下面是客户端的一段代码:
- /**
- * 客户端
- * @author dayrain
- */
- public class TcpClient {
-
- private static String userId = null;
-
- public static void main(String[] args) throws IOException, InterruptedException {
- Socket socket = new Socket(InetAddress.getLocalHost(), 8081);
- System.out.println("========请先登录========");
- userId = login(socket);
- ReadThread readThread = new ReadThread(socket);
- readThread.start();
- while (true) {
- System.out.println("========1.单聊========");
- System.out.println("========2.群聊========");
- System.out.println("========3.退出========");
- String choice = ScannerUtils.readMessage();
- switch (choice){
- case "1":
- //私聊
- privateChat(socket);
- break;
- case "2":
- //群聊
- groupChat(socket);
- break;
- default:
- //退出系统
- exitChat(socket);
- System.out.println("已退出登录");
- System.exit(0);
- return;
- }
- }
- }
-
- /**
- * 退出聊天
- * @param socket
- * @throws IOException
- */
- private static void exitChat(Socket socket) throws IOException {
- Message message = Message.createExitMessage(userId);
- MessageUtils.writeMessageFromOutputStream(socket.getOutputStream(), message);
- }
-
- /**
- * 群聊
- * @param socket
- * @throws IOException
- */
- private static void groupChat(Socket socket) throws IOException {
- System.out.println("----------输入群组号----------");
- String groupId = ScannerUtils.readMessage();
- while (true) {
- System.out.println("输入你要发送的消息(exit 退出)");
- String content = ScannerUtils.readMessage();
- if(Constants.EXIT_FLAG.equals(content)) {
- break;
- }
- Message groupMessage = Message.createGroupMessage(userId, groupId, content);
- MessageUtils.writeMessageFromOutputStream(socket.getOutputStream(), groupMessage);
- }
-
- }
-
- /**
- * 单聊
- * @param socket
- * @throws IOException
- */
- private static void privateChat(Socket socket) throws IOException {
- System.out.println("----------输入对方用户名----------");
- String sendTo = ScannerUtils.readMessage();
- while (true) {
- System.out.println("输入你要发送的消息(exit 退出)");
- String message = ScannerUtils.readMessage();
- if(Constants.EXIT_FLAG.equals(message)) {
- break;
- }
- Message privateMessage = Message.createPrivateMessage(userId, sendTo, message);
- MessageUtils.writeMessageFromOutputStream(socket.getOutputStream(), privateMessage);
- }
- }
-
- /**
- * 登录
- * @param socket
- * @return 当前用户的id
- * @throws IOException
- */
- private static String login(Socket socket) throws IOException {
- System.out.println("输入登录名");
- String userId = ScannerUtils.readMessage();
- Message loginMessage = Message.createLoginMessage(userId);
- MessageUtils.writeMessageFromOutputStream(socket.getOutputStream(), loginMessage);
- return userId;
- }
- }
data:image/s3,"s3://crabby-images/deb9d/deb9d52e6c78f73fbfaadc6e519fd00d286664e1" alt=""
上面的代码可以把用户的各种需求推送给服务器,但却没有接收消息相关的代码?那是因为我把输入和输出分开了。
Java BIO中,有两个阻塞的地方:
serverSocket.accept() //服务端等待客户端连接。
socket.getInputStream().read() //服务器或者客户端,阻塞等待对方发送的消息。
如果输入和输出不分开,放在一个线程里,会出现阻塞的情况。就算处理好,聊天的时候也会出现只能一问一答的尴尬情况。
为了不影响用户体验,我再给read操作单独开一个线程。
ReadThread类
- /**
- * 每一个客户端都另开一个线程,循环读取客户端发过来的消息
- * @author dayrain
- */
- public class ReadThread extends Thread {
-
- private final Socket socket;
-
- public ReadThread(Socket socket) {
- this.socket = socket;
- }
-
- @Override
- public void run() {
- try {
- while (true) {
- if(!socket.isClosed()) {
- Message message = MessageUtils.readMessageFromInputStream(socket.getInputStream());
- if(Constants.MessageType.PRIVATE_MESSAGE.equals(message.getType())) {
- System.out.println("【悄悄话】【"+message.getSendTime()+"】收到了来自【" + message.getSendFrom() + "】的一条消息,内容为: 【" + message.getContent() + "】");
- }
-
- if(Constants.MessageType.GROUP_MESSAGE.equals(message.getType())) {
- System.out.println("【群消息】【"+message.getSendTime()+"】收到了来自群【" + message.getSendTo() + "】,用户名为【" + message.getSendFrom() + "】发来了一条消息,内容为: 【" + message.getContent() + "】");
- }
- }
- }
- } catch (IOException | ClassNotFoundException e) {
- e.printStackTrace();
- }
- }
-
- }
data:image/s3,"s3://crabby-images/deb9d/deb9d52e6c78f73fbfaadc6e519fd00d286664e1" alt=""
当然上面的代码只是简单的demo,还有很多问题需要解决。
比如:
1、用户认证问题。上述代码未实现登录功能,包括消息体中的sendFrom依赖客户端,容易被篡改。
其实我们可以仿造传统的cookie+session模式,登录成功后返回token,客户端每次访问接口,都带上token。
2、持久化问题。如果想实现消息存储的功能,服务端接收到消息后,在转发的同时,也要进行持久化操作。
3、序列化问题。这个demo中,因为只有一个Message类型,客户端和服务端都是Java写的,所以就用自带的ObjectOutputStream、ObjectInputStream来序列化和反序列化。
之前做的一个项目用websocket实现后台推送的,前后端交互的是json字符串,那样的话就需要自己处理了。
其他没贴出来的代码,都是简化操作的工具类。
需要完整代码的小伙伴,可自取:
https://gitee.com/DayCloud/dayrain-demo/tree/master/java-net/tcp-chatroom
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。