当前位置:   article > 正文

Java Socket Server TCP服务端向指定客户端发送消息

Java Socket Server TCP服务端向指定客户端发送消息

实现思路
首先需要知道java里如何创建一个Socket服务器端。

  1.     //创建一个服务器端对象
  2.     ServerSocket server = new ServerSocket(); 
  3.     //绑定启动的ip和端口号
  4.     server.bind(new InetSocketAddress("127.0.0.1",8082));
  5.     //启动成功后,调用accept()方法阻塞,
  6.     //当有客户端成功连接时会生成一个Socket对象用于通讯
  7.     Socket socket = server.accept();


提示:注意server.accept()方法调用会阻塞,只有新的客户端连接后才返回一个新的socket对象。如果一直未连接那么会一直处于阻塞状态

1、了解了如何创建一个socket服务器端后。那么如何实现给指定的连接客户端发送消息呢?首先我们可以知道只有有客户端连接服务器就会生成一个socket对象,socket对象中获取对应的输入输出流读取和写入就可以实现发送消息;也就是只要咱们把每次连接的socket保存起来,发送和接收数据就直接获取对应的socket就可以实现。
2、总结一下。也就是需要循环调用 server.accept()阻塞方法,对每个连接的socket保存起来。并对每一个socket实现读取写入操作。为了可用性。对于服务器阻塞方法需要单独创建一个线程进行阻塞等待连接操作。并且对于每一个连接的客户端单独创建一个线程进行读写操作。
好了话不多说直接上代码

项目源码
服务器端接口ISocketServer

  1. /**
  2.  * @description: 启用socket服务
  3.  * @author
  4.  * @date 2024/2/19 13:51
  5.  * @version 1.0
  6.  */
  7. public interface ISocketServer {
  8.     /**
  9.      * @description: 启动服务
  10.      * @author liangxuelong
  11.      * @date 2023/6/19 13:53
  12.      * @version 1.0
  13.      */
  14.     public void startServer();
  15.     /**
  16.      * @description: 停止服务
  17.      * @author liangxuelong
  18.      * @date 2023/6/19 13:53
  19.      * @version 1.0
  20.      */
  21.     public void stopServer();
  22.     /**
  23.      * @description: 判断是否启动
  24.      * @author liangxuelong
  25.      * @date 2023/6/19 14:01
  26.      * @version 1.0
  27.      */
  28.     public boolean isStart();
  29. }


服务器端实现一个抽象类AbstractSocketServer,定义启动、停止。查看连接状态等方法

  1. /**
  2. * @author
  3. *
  4. * @version 1.0
  5. * @description:
  6. * @date 2023/6/19 14:04
  7. */
  8. public abstract class AbstractSocketServer implements ISocketServer{
  9. //服务器端口
  10. protected int port;
  11. //默认ip地址
  12. protected String ipAddress = "127.0.0.1";
  13. //默认最大连接数
  14. protected int maxConnectSize = 1;
  15. //当前连接状态
  16. protected boolean isConn = false;
  17. //java ServerSocket 对象
  18. private ServerSocket server;
  19. //保存所有连接通讯类
  20. protected ConcurrentHashMap<String,ICommunication> communications = new ConcurrentHashMap<>();
  21. public AbstractSocketServer(String ipAddress, int port) {
  22. this.ipAddress = ipAddress;
  23. this.port = port;
  24. }
  25. public AbstractSocketServer(int port) {
  26. this.port = port;
  27. }
  28. /**
  29. * @description: 启动服务
  30. * @author liangxuelong
  31. * @date 2023/6/19 16:14
  32. * @version 1.0
  33. */
  34. @Override
  35. public void startServer() {
  36. new Thread(() -> {
  37. try {
  38. if (isConn) {
  39. stopServer();
  40. }
  41. server = new ServerSocket();
  42. //绑定数据连接地址端口号
  43. server.bind(new InetSocketAddress(ipAddress,port));
  44. //绑定成功设置当前服务器状态为true
  45. isConn = true;
  46. //循环等待客户端连接
  47. while(true){
  48. //阻塞 等待socket client 连接
  49. Socket socket = server.accept();
  50. //生成连接通讯对象
  51. SocketCommunication socketCommunication = new SocketCommunication(socket,this);
  52. new Thread(() -> {
  53. try {
  54. addCommunication(socketCommunication);
  55. socketCommunication.handle();
  56. } catch (Exception e) {
  57. e.printStackTrace();
  58. }
  59. removeCommunication(socket.getInetAddress().getHostAddress());
  60. }).start();
  61. }
  62. } catch (IOException ioException) {
  63. ioException.printStackTrace();
  64. }
  65. }).start();
  66. }
  67. /**
  68. * @description: 停止服务
  69. * @author liangxuelong
  70. * @date 2023/6/19 16:14
  71. * @version 1.0
  72. */
  73. @Override
  74. public void stopServer(){
  75. if (server != null) {
  76. try {
  77. server.close();
  78. } catch (IOException ioException) {
  79. ioException.printStackTrace();
  80. }
  81. isConn = false;
  82. removeAllCommunication();
  83. }
  84. }
  85. /**
  86. * @description: 给所有连接发送
  87. * @author liangxuelong
  88. * @date 2023/6/19 16:51
  89. * @version 1.0
  90. */
  91. public void sendToALl(byte[] bytes){
  92. for (String ipAddress : communications.keySet()) {
  93. send(ipAddress,bytes);
  94. }
  95. }
  96. /**
  97. * @description: 给某个ip发送
  98. * @author liangxuelong
  99. * @date 2023/6/19 16:53
  100. * @version 1.0
  101. */
  102. public boolean send(String ip,byte[] bytes) {
  103. if (communications.get(ip) == null)
  104. return false;
  105. try {
  106. communications.get(ip).send(bytes);
  107. return true;
  108. } catch (Exception e) {
  109. e.printStackTrace();
  110. return false;
  111. }
  112. }
  113. /**
  114. * @description: 当前是否启动 socket 服务端
  115. * @author liangxuelong
  116. * @date 2023/6/19 16:14
  117. * @version 1.0
  118. */
  119. @Override
  120. public boolean isStart() {
  121. return isConn;
  122. }
  123. /**
  124. * @description: 设置最大连接数量
  125. * @author
  126. * @date 2023/6/19 16:19
  127. * @version 1.0
  128. */
  129. public void setMaxConnectSize(int maxConnectSize) {
  130. this.maxConnectSize = maxConnectSize;
  131. }
  132. /**
  133. * @description: 获取当前连接数据
  134. * @param:
  135. * @return: void
  136. * @author liangxuelong
  137. * @date: 2023/6/19
  138. */
  139. public int getConnectSize(){
  140. return communications.size();
  141. }
  142. /**
  143. * @description: 获取当前所有连接的Ip地址
  144. * @param:
  145. * @return: void
  146. * @author liangxuelong
  147. * @date: 2023/6/19
  148. */
  149. public Set<String> getConnectIpAddress(){
  150. return communications.keySet();
  151. }
  152. /**
  153. * @description: 添加连接对象
  154. * @author liangxuelong
  155. * @date 2023/6/19 15:14
  156. * @version 1.0
  157. */
  158. protected void addCommunication(ICommunication communication) throws Exception {
  159. Socket socket = communication.getSocket();
  160. //判断是否超出最大连接数量,超出后断开连接
  161. if (maxConnectSize > communications.size()) {
  162. ICommunication iCommunication = communications.get(socket.getInetAddress()
  163. .getHostAddress());
  164. if (iCommunication != null) {
  165. try {
  166. iCommunication.disconnect();
  167. } catch (Exception e) {
  168. e.printStackTrace();
  169. }
  170. communications.remove(socket.getInetAddress()
  171. .getHostAddress());
  172. }
  173. communications.put(socket.getInetAddress()
  174. .getHostAddress(),communication);
  175. } else {
  176. socket.close();
  177. throw new ConnectException(maxConnectSize);
  178. }
  179. }
  180. /**
  181. * @description: 移除连接对象
  182. * @author liangxuelong
  183. * @date 2023/6/19 15:14
  184. * @version 1.0
  185. */
  186. public void removeCommunication(String ip){
  187. ICommunication iCommunication = communications.get(ip);
  188. if (iCommunication != null) {
  189. try {
  190. iCommunication.disconnect();
  191. } catch (Exception e) {
  192. e.printStackTrace();
  193. }
  194. communications.remove(ip);
  195. }
  196. }
  197. /**
  198. * @description: 移除所有连接对象
  199. * @author
  200. * @date 2023/6/19 15:14
  201. * @version 1.0
  202. */
  203. public void removeAllCommunication(){
  204. for (String ipAddress : communications.keySet()) {
  205. removeCommunication(ipAddress);
  206. }
  207. }
  208. }


服务器端实现抽象类创建SocketServer类

  1. /**
  2. * @author
  3. * @version 1.0
  4. * @description: socket 服务启动类
  5. * @date 2023/6/19 15:53
  6. */
  7. public class SocketServer extends AbstractSocketServer {
  8. public SocketServer(String ipAddress, int port) {
  9. super(ipAddress, port);
  10. }
  11. public SocketServer(int port) {
  12. super(port);
  13. }
  14. }


客户端通讯接口ICommunication主要用于读写socket交互数据

  1. public interface ICommunication {
  2. /**
  3. * @description: 获取当前连接的socket对象
  4. * @author
  5. * @date 2023/6/19 14:28
  6. * @version 1.0
  7. */
  8. public Socket getSocket();
  9. /**
  10. * @description: socket创建成功后读取数据
  11. * @author
  12. * @date 2023/6/19 14:29
  13. * @version 1.0
  14. */
  15. public void handle() throws Exception;
  16. /**
  17. * @description: 将读取到的数据统一处理
  18. * @author
  19. * @date 2023/6/19 14:29
  20. * @version 1.0
  21. */
  22. public void receive(Socket socket,byte[] data) throws Exception;
  23. /**
  24. * @description: 发送数据
  25. * @author
  26. * @date 2023/6/19 14:41
  27. * @version 1.0
  28. */
  29. public void send(byte[] data) throws Exception;
  30. /**
  31. * @description: 断开连接
  32. * @author liangxuelong
  33. * @date 2023/6/19 14:42
  34. * @version 1.0
  35. */
  36. public void disconnect() throws Exception;
  37. }



客户端实现通讯接口ICommunication创建SocketCommunication对象用于读写处理接收发送和发送数据

  1. /**
  2. * @author
  3. * @version 1.0
  4. * @description: socket 连接对象
  5. * 主要用于对已连接的客户端收发消息
  6. * @date 2023/6/19 14:43
  7. */
  8. public class SocketCommunication implements ICommunication{
  9. private Socket socket; //已经连接的客户端对象
  10. private AbstractSocketServer socketServer; //来自哪个服务器,创建的服务器对象
  11. public SocketCommunication(@NotNull Socket socket, AbstractSocketServer socketServer) {
  12. this.socket = socket;
  13. this.socketServer = socketServer;
  14. }
  15. @Override
  16. public Socket getSocket() {
  17. return socket;
  18. }
  19. @Override
  20. public void handle() throws Exception {
  21. try {
  22. System.out.println(socket.getInetAddress()
  23. .getHostAddress()+" 已连接");
  24. InputStream in = socket.getInputStream();
  25. ByteArrayOutputStream output = new ByteArrayOutputStream();
  26. byte[] b = new byte[1024];
  27. int len;
  28. while ( (len = in.read(b)) != -1) {
  29. output.write(b, 0, len);
  30. try {
  31. Thread.sleep(10);
  32. } catch (InterruptedException e1) {
  33. e1.printStackTrace();
  34. }
  35. if(in.available() == 0) {
  36. this.receive(socket,output.toByteArray());
  37. output.reset();
  38. }
  39. b = new byte[1024];
  40. len = 0;
  41. }
  42. } catch (IOException ioException) {
  43. throw ioException;
  44. }
  45. }
  46. /**
  47. * @description: 接收数据,对客户端接收的数据进行统一处理
  48. * 可以编写相应的处理逻辑,我这里是服务器端收到消息后。回复当前连接数量、
  49. * @author
  50. * @date 2023/6/19 17:30
  51. * @version 1.0
  52. */
  53. @Override
  54. public void receive(Socket socket, byte[] data) throws Exception {
  55. //服务器接收客户端消息
  56. System.out.println(socket.getInetAddress().getHostAddress()+" 发送:"+new String(data,"gbk"));
  57. //服务器回复客户端消息
  58. String msg = "当前连接数量:"+socketServer.getConnectSize();
  59. socketServer.send(socket.getInetAddress().getHostAddress(),
  60. msg.getBytes("gbk"));
  61. }
  62. /**
  63. * @description: 发送数据
  64. * @author
  65. * @date 2023/6/19 17:30
  66. * @version 1.0
  67. */
  68. @Override
  69. public void send(byte[] data) throws Exception {
  70. socket.getOutputStream().write(data);
  71. }
  72. /**
  73. * @description: 断开连接
  74. * @author
  75. * @date 2023/6/19 17:30
  76. * @version 1.0
  77. */
  78. @Override
  79. public void disconnect() throws Exception {
  80. System.out.println(socket.getInetAddress()
  81. .getHostAddress()+" 已断开");
  82. socket.close();
  83. }
  84. }



定义一个自定义异常ConnectException处理连接数超出最大限制

  1. /**
  2. * @author
  3. * @version 1.0
  4. * @description:
  5. * @date 2023/6/19 16:25
  6. */
  7. public class ConnectException extends Exception{
  8. public ConnectException(int maxSize) {
  9. super("连接数量超出最大限制,连接失败! 当前最大连接数:"+maxSize);
  10. }
  11. }


最后定义main方法进行测试

  1. /**
  2. * @author
  3. * @version 1.0
  4. * @description: TODO
  5. * @date 2024/2/9 15:57
  6. */
  7. public class Main {
  8. public static void main(String[] args) {
  9. SocketServer server = new SocketServer("192.168.0.100",5032);
  10. server.setMaxConnectSize(2); //设置最大连接数量
  11. server.startServer();
  12. //每5秒钟获取一下当前连接,并发送数据
  13. new Thread(() -> {
  14. while (true) {
  15. //获取所有连接
  16. for (String ip : server.getConnectIpAddress()) {
  17. try {
  18. System.out.println("当前已连接ip:"+ip);
  19. String msg = "向指定客户端发送消息: "+ip+" 你好";
  20. //向指定ip发送消息
  21. if (server.send(ip,msg.getBytes("gbk"))) {
  22. System.out.println("向 "+ip+" 发送了 ["+msg+"]");
  23. }
  24. } catch (UnsupportedEncodingException e) {
  25. e.printStackTrace();
  26. }
  27. }
  28. try {
  29. Thread.sleep(5000);
  30. } catch (InterruptedException e) {
  31. e.printStackTrace();
  32. }
  33. }
  34. }).start();
  35. }
  36. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/IT小白/article/detail/64985
推荐阅读
相关标签
  

闽ICP备14008679号