当前位置:   article > 正文

Spring boot 整合websocket 客户端_springboot websocket客户端

springboot websocket客户端

前言

在项目开发中会有需要用websocket客户端的情况,下面就来说一下分享的这个解决方案

一、BaseWebsocketClient.java 继承WebsocketClient.java 作为父类

其中 workPoolScheduler.execute方法 可以用 ThreadUtil.execute(hutool工具包)代替 省略下方步骤三的线程池配置

package com.enrising.ctsc.park.security.service;
import lombok.extern.slf4j.Slf4j;
import org.java_websocket.client.WebSocketClient;
import org.java_websocket.drafts.Draft_6455;
import org.java_websocket.handshake.ServerHandshake;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.net.URI;
import java.util.Map;

/**
 * @Author huasheng
 * @Date 2022/11/1 9:49
 * @Description
 */
@Slf4j
public class BaseWebsocketClient extends WebSocketClient {
	//客户端标识
	private String clientName;
	//客户端连接状态
	private boolean isConnect = false;
	//spring包下的线程池类
	private ThreadPoolTaskExecutor workPoolScheduler;

	public BaseWebsocketClient(URI serverUri, Map<String, String> httpHeaders,
							   String clientName,
							   ThreadPoolTaskExecutor workPoolScheduler) {
		super(serverUri, new Draft_6455(), httpHeaders, 0);
		this.clientName = clientName;
		this.workPoolScheduler = workPoolScheduler;
	}

	@Override
	public void onOpen(ServerHandshake serverHandshake) {
	}

	@Override
	public void onMessage(String s) {
	}
	/***检测到连接关闭之后,会更新连接状态以及尝试重新连接***/
	@Override
	public void onClose(int i, String s, boolean b) {
		log.info("------ {} onClose ------{}", clientName, b);
		setConnectState(false);
		recontact();
	}
	/***检测到错误,更新连接状态***/
	@Override
	public void onError(Exception e) {
		log.info("------ {} onError ------{}", clientName, e);
		setConnectState(false);
	}

	public void setConnectState(boolean isConnect) {
		this.isConnect = isConnect;
	}

	public boolean getConnectState(){
		return this.isConnect;
	}

	public ThreadPoolTaskExecutor getWorkPoolScheduler() {
		return workPoolScheduler;
	}

	/**
	 * 重连
	 */
	public void recontact() {
		workPoolScheduler.execute(() -> {
			Thread.currentThread().setName( "ReconnectThread-" + Thread.currentThread().getId() );
			try {
				Thread.sleep(10000);
				log.info("重连开始");
				if (isConnect) {
					log.info("{} 重连停止", clientName);
					return;
				}
				this.reconnect();
				log.info("重连结束");
			} catch (Exception e) {
				log.info("{} 重连失败", clientName);
			}
		});
	}
}


  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
二、DeviceWebsocketClient.java 子类根据实际场景重写方法
package com.enrising.ctsc.park.security.service;


import cn.hutool.core.thread.ThreadUtil;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import com.alibaba.nacos.common.utils.UuidUtils;
import lombok.extern.slf4j.Slf4j;
import org.java_websocket.handshake.ServerHandshake;
import org.slf4j.MDC;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.net.URI;
import java.util.Map;

/**
 *
 */
@Slf4j
public class DeviceWebsocketClient extends BaseWebsocketClient {

	private static final String ACS_CTRL_RESULT = "deviceWebsocketClient";
	private static final String SUBSCRIBE = "subscribe";
	private WebSocketReportService deviceService;
	/*这个订阅格式是实现约定好的,可以具体情况具体分析*/
//	private String sendStr = "{\n" +
//			"    \"method\": \"subscribe\",\n" +
//			"    \"params\": \"device\"\n" +
//			"}";
	private String sendStr = "hello";
	/**
	 * 建立连接
	 * @param serverUri serverUri
	 * @param httpHeaders httpHeaders
	 * @param workPoolScheduler workPoolScheduler
	 * @param deviceService deviceService
	 */
	public DeviceWebsocketClient(URI serverUri, Map<String, String> httpHeaders, ThreadPoolTaskExecutor workPoolScheduler, WebSocketReportService deviceService) {
		super(serverUri, httpHeaders, ACS_CTRL_RESULT, workPoolScheduler);
		this.deviceService = deviceService;
	}

	@Override
	public void onOpen(ServerHandshake serverHandshake) {
		log.info("------ {} onOpen ------", ACS_CTRL_RESULT);
		this.send(sendStr);
		setConnectState(true);
	}

	public void sendMessage(String str) {
		log.info("------ {} send ------", str);
		this.send(str);
		setConnectState(true);
	}


	@Override
	public void onMessage(String msg) {
		log.info("WebSocketReportService.onMessage()接收消息={}", msg);
			ThreadUtil.execAsync(() -> {
			MDC.put(RequestIdTraceInterceptor.REQUEST_ID_KEY, requestId);
			try {
                //业务代码
				deviceService.saveDeviceReportInfo(msg);
			} catch (Exception e) {
				log.info("WebSocketReportService.onMessage()上报异常={}", e);
			}
			//请求完成,从MDC中移除requestId
			MDC.remove(RequestIdTraceInterceptor.REQUEST_ID_KEY);
		});
		log.info("WebSocketReportService.onMessage()推送结束={}", msg);
	}
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
三、WorkPoolConfig.java 线程池

配置文件

#线程池配置
settings: 
  work-pool: 
   core-pool-size: 10
   max-pool-size: 20
   queue-capacity: 200
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

实体类

package com.enrising.ctsc.park.security.service;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.ThreadPoolExecutor;

/**
 * @author huasheng
 * @date 2022/11/1
 */
@Configuration
public class WorkPoolConfig {

	@Value("${settings.work-pool.core-pool-size}")
	private Integer workPoolCoreSize;

	@Value("${settings.work-pool.max-pool-size}")
	private Integer workPoolMaxSize;

	@Value("${settings.work-pool.queue-capacity}")
	private Integer queueCapacity;

	@Bean("workPoolScheduler")
	public ThreadPoolTaskExecutor taskExecutor() {
		ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
		executor.setCorePoolSize(workPoolCoreSize);
		executor.setMaxPoolSize(workPoolMaxSize);
		executor.setQueueCapacity(queueCapacity);
		executor.setKeepAliveSeconds(60);
		executor.setThreadNamePrefix("-device-");
		executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
		executor.setWaitForTasksToCompleteOnShutdown(true);
		executor.setAwaitTerminationSeconds(60);
		return executor;
	}
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
三、实际场景封装

WebSocketReportService .java

package xxx.service;


/**
 * 公共父类
 */
public interface WebSocketReportService {

	/**
	 * @throws Exception
	 * @throws
	 * @Title: saveDeviceReportInfo
	 * @Description: 保存设备上报信息
	 * @param: @param data 参数说明
	 * @return: void 返回类型
	 */
	void saveDeviceReportInfo(Object report) throws Exception;

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
四、启动客户端

配置文件

#开关
api:
  initListen:
   sideSlop:
    device:
     open: false
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

根据实际场景用到两个实例,监听并各自处理

package com.enrising.ctsc.park.security.service;

import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.core.annotation.Order;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;

/**
 * @Author huasheng
 * @Date 2022/12/3 17:04
 * @Description
 */
@Component
@Order(value = 6)
@Slf4j
@ConditionalOnProperty(prefix = "api", name = "initListen.sideSlop.device.open")
public class DeviceWebsocketClientService {

	@Resource
	private 子类1(实现父类saveDeviceReportInfo方法) afSideSlopeDeviceService;
	@Resource
	private 子类2(实现父类saveDeviceReportInfo方法) afSideSlopeWarnService;
	@Resource
	private ThreadPoolTaskExecutor workPoolScheduler;

    static DeviceWebsocketClient deviceWebsocketClient;

	static DeviceWebsocketClient warnWebsocketClient;

    @PostConstruct
	public void start() {
		try {
			log.info("start to receive device data");
			URI uri = new URI("ws://127.0.0.1:8888");
			Map<String, String> httpHeaders = new HashMap<>(4);
//			httpHeaders.put("Origin", "http://" + uri.getHost());
			deviceWebsocketClient = new DeviceWebsocketClient(uri, httpHeaders, workPoolScheduler, afSideSlopeDeviceService);
			deviceWebsocketClient.connect();
		}catch (Exception e){
			log.error("start to receive device data failed", e);
		}
	}
	@PostConstruct
	public void startStatus() {
		try {
			log.info("start to receive device status");
			URI uri = new URI("ws://127.0.0.1:8889");
			Map<String, String> httpHeaders = new HashMap<>(4);
//			httpHeaders.put("Origin", "http://" + uri.getHost());
			warnWebsocketClient = new DeviceWebsocketClient(uri, httpHeaders, workPoolScheduler, afSideSlopeWarnService);
			warnWebsocketClient.connect();
		}catch (Exception e){
			log.error("start to receive device status failed", e);
		}
	}
	public void sendMessage(String str) {
		deviceWebsocketClient.sendMessage(str);
	}


}


  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小舞很执着/article/detail/892646
推荐阅读
相关标签
  

闽ICP备14008679号