赞
踩
实现思路
首先需要知道java里如何创建一个Socket服务器端。
- //创建一个服务器端对象
- ServerSocket server = new ServerSocket();
- //绑定启动的ip和端口号
- server.bind(new InetSocketAddress("127.0.0.1",8082));
- //启动成功后,调用accept()方法阻塞,
- //当有客户端成功连接时会生成一个Socket对象用于通讯
- Socket socket = server.accept();
提示:注意server.accept()方法调用会阻塞,只有新的客户端连接后才返回一个新的socket对象。如果一直未连接那么会一直处于阻塞状态
1、了解了如何创建一个socket服务器端后。那么如何实现给指定的连接客户端发送消息呢?首先我们可以知道只有有客户端连接服务器就会生成一个socket对象,socket对象中获取对应的输入输出流读取和写入就可以实现发送消息;也就是只要咱们把每次连接的socket保存起来,发送和接收数据就直接获取对应的socket就可以实现。
2、总结一下。也就是需要循环调用 server.accept()阻塞方法,对每个连接的socket保存起来。并对每一个socket实现读取写入操作。为了可用性。对于服务器阻塞方法需要单独创建一个线程进行阻塞等待连接操作。并且对于每一个连接的客户端单独创建一个线程进行读写操作。
好了话不多说直接上代码
项目源码
服务器端接口ISocketServer
- /**
- * @description: 启用socket服务
- * @author
- * @date 2024/2/19 13:51
- * @version 1.0
- */
- public interface ISocketServer {
-
- /**
- * @description: 启动服务
- * @author liangxuelong
- * @date 2023/6/19 13:53
- * @version 1.0
- */
- public void startServer();
-
- /**
- * @description: 停止服务
- * @author liangxuelong
- * @date 2023/6/19 13:53
- * @version 1.0
- */
- public void stopServer();
-
- /**
- * @description: 判断是否启动
- * @author liangxuelong
- * @date 2023/6/19 14:01
- * @version 1.0
- */
- public boolean isStart();
-
- }
服务器端实现一个抽象类AbstractSocketServer,定义启动、停止。查看连接状态等方法
- /**
- * @author
- *
- * @version 1.0
- * @description:
- * @date 2023/6/19 14:04
- */
- public abstract class AbstractSocketServer implements ISocketServer{
- //服务器端口
- protected int port;
- //默认ip地址
- protected String ipAddress = "127.0.0.1";
- //默认最大连接数
- protected int maxConnectSize = 1;
- //当前连接状态
- protected boolean isConn = false;
- //java ServerSocket 对象
- private ServerSocket server;
-
- //保存所有连接通讯类
- protected ConcurrentHashMap<String,ICommunication> communications = new ConcurrentHashMap<>();
-
- public AbstractSocketServer(String ipAddress, int port) {
- this.ipAddress = ipAddress;
- this.port = port;
- }
-
- public AbstractSocketServer(int port) {
- this.port = port;
- }
- /**
- * @description: 启动服务
- * @author liangxuelong
- * @date 2023/6/19 16:14
- * @version 1.0
- */
- @Override
- public void startServer() {
- new Thread(() -> {
- try {
- if (isConn) {
- stopServer();
- }
- server = new ServerSocket();
- //绑定数据连接地址端口号
- server.bind(new InetSocketAddress(ipAddress,port));
- //绑定成功设置当前服务器状态为true
- isConn = true;
- //循环等待客户端连接
- while(true){
- //阻塞 等待socket client 连接
- Socket socket = server.accept();
- //生成连接通讯对象
- SocketCommunication socketCommunication = new SocketCommunication(socket,this);
- new Thread(() -> {
- try {
- addCommunication(socketCommunication);
- socketCommunication.handle();
- } catch (Exception e) {
- e.printStackTrace();
- }
- removeCommunication(socket.getInetAddress().getHostAddress());
- }).start();
- }
- } catch (IOException ioException) {
- ioException.printStackTrace();
- }
- }).start();
- }
-
- /**
- * @description: 停止服务
- * @author liangxuelong
- * @date 2023/6/19 16:14
- * @version 1.0
- */
- @Override
- public void stopServer(){
- if (server != null) {
- try {
- server.close();
- } catch (IOException ioException) {
- ioException.printStackTrace();
- }
- isConn = false;
- removeAllCommunication();
- }
- }
-
-
- /**
- * @description: 给所有连接发送
- * @author liangxuelong
- * @date 2023/6/19 16:51
- * @version 1.0
- */
- public void sendToALl(byte[] bytes){
- for (String ipAddress : communications.keySet()) {
- send(ipAddress,bytes);
- }
- }
-
- /**
- * @description: 给某个ip发送
- * @author liangxuelong
- * @date 2023/6/19 16:53
- * @version 1.0
- */
- public boolean send(String ip,byte[] bytes) {
- if (communications.get(ip) == null)
- return false;
- try {
- communications.get(ip).send(bytes);
- return true;
- } catch (Exception e) {
- e.printStackTrace();
- return false;
- }
- }
-
- /**
- * @description: 当前是否启动 socket 服务端
- * @author liangxuelong
- * @date 2023/6/19 16:14
- * @version 1.0
- */
- @Override
- public boolean isStart() {
- return isConn;
- }
-
- /**
- * @description: 设置最大连接数量
- * @author
- * @date 2023/6/19 16:19
- * @version 1.0
- */
- public void setMaxConnectSize(int maxConnectSize) {
- this.maxConnectSize = maxConnectSize;
- }
-
- /**
- * @description: 获取当前连接数据
- * @param:
- * @return: void
- * @author liangxuelong
- * @date: 2023/6/19
- */
- public int getConnectSize(){
- return communications.size();
- }
-
- /**
- * @description: 获取当前所有连接的Ip地址
- * @param:
- * @return: void
- * @author liangxuelong
- * @date: 2023/6/19
- */
- public Set<String> getConnectIpAddress(){
- return communications.keySet();
- }
-
- /**
- * @description: 添加连接对象
- * @author liangxuelong
- * @date 2023/6/19 15:14
- * @version 1.0
- */
- protected void addCommunication(ICommunication communication) throws Exception {
- Socket socket = communication.getSocket();
- //判断是否超出最大连接数量,超出后断开连接
- if (maxConnectSize > communications.size()) {
- ICommunication iCommunication = communications.get(socket.getInetAddress()
- .getHostAddress());
- if (iCommunication != null) {
- try {
- iCommunication.disconnect();
- } catch (Exception e) {
- e.printStackTrace();
- }
- communications.remove(socket.getInetAddress()
- .getHostAddress());
- }
- communications.put(socket.getInetAddress()
- .getHostAddress(),communication);
- } else {
- socket.close();
- throw new ConnectException(maxConnectSize);
- }
- }
-
- /**
- * @description: 移除连接对象
- * @author liangxuelong
- * @date 2023/6/19 15:14
- * @version 1.0
- */
- public void removeCommunication(String ip){
- ICommunication iCommunication = communications.get(ip);
- if (iCommunication != null) {
- try {
- iCommunication.disconnect();
- } catch (Exception e) {
- e.printStackTrace();
- }
- communications.remove(ip);
- }
- }
-
- /**
- * @description: 移除所有连接对象
- * @author
- * @date 2023/6/19 15:14
- * @version 1.0
- */
- public void removeAllCommunication(){
- for (String ipAddress : communications.keySet()) {
- removeCommunication(ipAddress);
- }
- }
-
- }
服务器端实现抽象类创建SocketServer类
- /**
- * @author
- * @version 1.0
- * @description: socket 服务启动类
- * @date 2023/6/19 15:53
- */
- public class SocketServer extends AbstractSocketServer {
-
- public SocketServer(String ipAddress, int port) {
- super(ipAddress, port);
- }
-
- public SocketServer(int port) {
- super(port);
- }
- }
客户端通讯接口ICommunication主要用于读写socket交互数据
- public interface ICommunication {
-
- /**
- * @description: 获取当前连接的socket对象
- * @author
- * @date 2023/6/19 14:28
- * @version 1.0
- */
- public Socket getSocket();
-
- /**
- * @description: socket创建成功后读取数据
- * @author
- * @date 2023/6/19 14:29
- * @version 1.0
- */
- public void handle() throws Exception;
-
- /**
- * @description: 将读取到的数据统一处理
- * @author
- * @date 2023/6/19 14:29
- * @version 1.0
- */
- public void receive(Socket socket,byte[] data) throws Exception;
-
- /**
- * @description: 发送数据
- * @author
- * @date 2023/6/19 14:41
- * @version 1.0
- */
- public void send(byte[] data) throws Exception;
-
- /**
- * @description: 断开连接
- * @author liangxuelong
- * @date 2023/6/19 14:42
- * @version 1.0
- */
- public void disconnect() throws Exception;
- }
-
客户端实现通讯接口ICommunication创建SocketCommunication对象用于读写处理接收发送和发送数据
- /**
- * @author
- * @version 1.0
- * @description: socket 连接对象
- * 主要用于对已连接的客户端收发消息
- * @date 2023/6/19 14:43
- */
- public class SocketCommunication implements ICommunication{
-
- private Socket socket; //已经连接的客户端对象
-
- private AbstractSocketServer socketServer; //来自哪个服务器,创建的服务器对象
-
- public SocketCommunication(@NotNull Socket socket, AbstractSocketServer socketServer) {
- this.socket = socket;
- this.socketServer = socketServer;
- }
-
- @Override
- public Socket getSocket() {
- return socket;
- }
-
- @Override
- public void handle() throws Exception {
- try {
- System.out.println(socket.getInetAddress()
- .getHostAddress()+" 已连接");
- InputStream in = socket.getInputStream();
- ByteArrayOutputStream output = new ByteArrayOutputStream();
- byte[] b = new byte[1024];
- int len;
- while ( (len = in.read(b)) != -1) {
- output.write(b, 0, len);
- try {
- Thread.sleep(10);
- } catch (InterruptedException e1) {
- e1.printStackTrace();
- }
- if(in.available() == 0) {
- this.receive(socket,output.toByteArray());
- output.reset();
- }
- b = new byte[1024];
- len = 0;
- }
- } catch (IOException ioException) {
- throw ioException;
- }
- }
-
- /**
- * @description: 接收数据,对客户端接收的数据进行统一处理
- * 可以编写相应的处理逻辑,我这里是服务器端收到消息后。回复当前连接数量、
- * @author
- * @date 2023/6/19 17:30
- * @version 1.0
- */
- @Override
- public void receive(Socket socket, byte[] data) throws Exception {
- //服务器接收客户端消息
- System.out.println(socket.getInetAddress().getHostAddress()+" 发送:"+new String(data,"gbk"));
- //服务器回复客户端消息
- String msg = "当前连接数量:"+socketServer.getConnectSize();
- socketServer.send(socket.getInetAddress().getHostAddress(),
- msg.getBytes("gbk"));
- }
-
- /**
- * @description: 发送数据
- * @author
- * @date 2023/6/19 17:30
- * @version 1.0
- */
- @Override
- public void send(byte[] data) throws Exception {
- socket.getOutputStream().write(data);
- }
- /**
- * @description: 断开连接
- * @author
- * @date 2023/6/19 17:30
- * @version 1.0
- */
- @Override
- public void disconnect() throws Exception {
- System.out.println(socket.getInetAddress()
- .getHostAddress()+" 已断开");
- socket.close();
- }
-
- }
-
定义一个自定义异常ConnectException处理连接数超出最大限制
- /**
- * @author
- * @version 1.0
- * @description:
- * @date 2023/6/19 16:25
- */
- public class ConnectException extends Exception{
- public ConnectException(int maxSize) {
- super("连接数量超出最大限制,连接失败! 当前最大连接数:"+maxSize);
- }
- }
-
最后定义main方法进行测试
- /**
- * @author
- * @version 1.0
- * @description: TODO
- * @date 2024/2/9 15:57
- */
- public class Main {
-
- public static void main(String[] args) {
- SocketServer server = new SocketServer("192.168.0.100",5032);
- server.setMaxConnectSize(2); //设置最大连接数量
- server.startServer();
-
- //每5秒钟获取一下当前连接,并发送数据
- new Thread(() -> {
-
- while (true) {
- //获取所有连接
- for (String ip : server.getConnectIpAddress()) {
- try {
- System.out.println("当前已连接ip:"+ip);
- String msg = "向指定客户端发送消息: "+ip+" 你好";
- //向指定ip发送消息
- if (server.send(ip,msg.getBytes("gbk"))) {
- System.out.println("向 "+ip+" 发送了 ["+msg+"]");
- }
-
- } catch (UnsupportedEncodingException e) {
- e.printStackTrace();
- }
- }
- try {
- Thread.sleep(5000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
-
- }).start();
- }
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。