赞
踩
在项目开发中会有需要用websocket客户端的情况,下面就来说一下分享的这个解决方案
其中 workPoolScheduler.execute方法 可以用 ThreadUtil.execute(hutool工具包)代替 省略下方步骤三的线程池配置
package com.enrising.ctsc.park.security.service;
import lombok.extern.slf4j.Slf4j;
import org.java_websocket.client.WebSocketClient;
import org.java_websocket.drafts.Draft_6455;
import org.java_websocket.handshake.ServerHandshake;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.net.URI;
import java.util.Map;
/**
* @Author huasheng
* @Date 2022/11/1 9:49
* @Description
*/
@Slf4j
public class BaseWebsocketClient extends WebSocketClient {
//客户端标识
private String clientName;
//客户端连接状态
private boolean isConnect = false;
//spring包下的线程池类
private ThreadPoolTaskExecutor workPoolScheduler;
public BaseWebsocketClient(URI serverUri, Map<String, String> httpHeaders,
String clientName,
ThreadPoolTaskExecutor workPoolScheduler) {
super(serverUri, new Draft_6455(), httpHeaders, 0);
this.clientName = clientName;
this.workPoolScheduler = workPoolScheduler;
}
@Override
public void onOpen(ServerHandshake serverHandshake) {
}
@Override
public void onMessage(String s) {
}
/***检测到连接关闭之后,会更新连接状态以及尝试重新连接***/
@Override
public void onClose(int i, String s, boolean b) {
log.info("------ {} onClose ------{}", clientName, b);
setConnectState(false);
recontact();
}
/***检测到错误,更新连接状态***/
@Override
public void onError(Exception e) {
log.info("------ {} onError ------{}", clientName, e);
setConnectState(false);
}
public void setConnectState(boolean isConnect) {
this.isConnect = isConnect;
}
public boolean getConnectState(){
return this.isConnect;
}
public ThreadPoolTaskExecutor getWorkPoolScheduler() {
return workPoolScheduler;
}
/**
* 重连
*/
public void recontact() {
workPoolScheduler.execute(() -> {
Thread.currentThread().setName( "ReconnectThread-" + Thread.currentThread().getId() );
try {
Thread.sleep(10000);
log.info("重连开始");
if (isConnect) {
log.info("{} 重连停止", clientName);
return;
}
this.reconnect();
log.info("重连结束");
} catch (Exception e) {
log.info("{} 重连失败", clientName);
}
});
}
}
package com.enrising.ctsc.park.security.service;
import cn.hutool.core.thread.ThreadUtil;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import com.alibaba.nacos.common.utils.UuidUtils;
import lombok.extern.slf4j.Slf4j;
import org.java_websocket.handshake.ServerHandshake;
import org.slf4j.MDC;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.net.URI;
import java.util.Map;
/**
*
*/
@Slf4j
public class DeviceWebsocketClient extends BaseWebsocketClient {
private static final String ACS_CTRL_RESULT = "deviceWebsocketClient";
private static final String SUBSCRIBE = "subscribe";
private WebSocketReportService deviceService;
/*这个订阅格式是实现约定好的,可以具体情况具体分析*/
// private String sendStr = "{\n" +
// " \"method\": \"subscribe\",\n" +
// " \"params\": \"device\"\n" +
// "}";
private String sendStr = "hello";
/**
* 建立连接
* @param serverUri serverUri
* @param httpHeaders httpHeaders
* @param workPoolScheduler workPoolScheduler
* @param deviceService deviceService
*/
public DeviceWebsocketClient(URI serverUri, Map<String, String> httpHeaders, ThreadPoolTaskExecutor workPoolScheduler, WebSocketReportService deviceService) {
super(serverUri, httpHeaders, ACS_CTRL_RESULT, workPoolScheduler);
this.deviceService = deviceService;
}
@Override
public void onOpen(ServerHandshake serverHandshake) {
log.info("------ {} onOpen ------", ACS_CTRL_RESULT);
this.send(sendStr);
setConnectState(true);
}
public void sendMessage(String str) {
log.info("------ {} send ------", str);
this.send(str);
setConnectState(true);
}
@Override
public void onMessage(String msg) {
log.info("WebSocketReportService.onMessage()接收消息={}", msg);
ThreadUtil.execAsync(() -> {
MDC.put(RequestIdTraceInterceptor.REQUEST_ID_KEY, requestId);
try {
//业务代码
deviceService.saveDeviceReportInfo(msg);
} catch (Exception e) {
log.info("WebSocketReportService.onMessage()上报异常={}", e);
}
//请求完成,从MDC中移除requestId
MDC.remove(RequestIdTraceInterceptor.REQUEST_ID_KEY);
});
log.info("WebSocketReportService.onMessage()推送结束={}", msg);
}
}
配置文件
#线程池配置
settings:
work-pool:
core-pool-size: 10
max-pool-size: 20
queue-capacity: 200
实体类
package com.enrising.ctsc.park.security.service;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.ThreadPoolExecutor;
/**
* @author huasheng
* @date 2022/11/1
*/
@Configuration
public class WorkPoolConfig {
@Value("${settings.work-pool.core-pool-size}")
private Integer workPoolCoreSize;
@Value("${settings.work-pool.max-pool-size}")
private Integer workPoolMaxSize;
@Value("${settings.work-pool.queue-capacity}")
private Integer queueCapacity;
@Bean("workPoolScheduler")
public ThreadPoolTaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(workPoolCoreSize);
executor.setMaxPoolSize(workPoolMaxSize);
executor.setQueueCapacity(queueCapacity);
executor.setKeepAliveSeconds(60);
executor.setThreadNamePrefix("-device-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setAwaitTerminationSeconds(60);
return executor;
}
}
WebSocketReportService .java
package xxx.service;
/**
* 公共父类
*/
public interface WebSocketReportService {
/**
* @throws Exception
* @throws
* @Title: saveDeviceReportInfo
* @Description: 保存设备上报信息
* @param: @param data 参数说明
* @return: void 返回类型
*/
void saveDeviceReportInfo(Object report) throws Exception;
}
配置文件
#开关
api:
initListen:
sideSlop:
device:
open: false
根据实际场景用到两个实例,监听并各自处理
package com.enrising.ctsc.park.security.service;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.core.annotation.Order;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
/**
* @Author huasheng
* @Date 2022/12/3 17:04
* @Description
*/
@Component
@Order(value = 6)
@Slf4j
@ConditionalOnProperty(prefix = "api", name = "initListen.sideSlop.device.open")
public class DeviceWebsocketClientService {
@Resource
private 子类1(实现父类saveDeviceReportInfo方法) afSideSlopeDeviceService;
@Resource
private 子类2(实现父类saveDeviceReportInfo方法) afSideSlopeWarnService;
@Resource
private ThreadPoolTaskExecutor workPoolScheduler;
static DeviceWebsocketClient deviceWebsocketClient;
static DeviceWebsocketClient warnWebsocketClient;
@PostConstruct
public void start() {
try {
log.info("start to receive device data");
URI uri = new URI("ws://127.0.0.1:8888");
Map<String, String> httpHeaders = new HashMap<>(4);
// httpHeaders.put("Origin", "http://" + uri.getHost());
deviceWebsocketClient = new DeviceWebsocketClient(uri, httpHeaders, workPoolScheduler, afSideSlopeDeviceService);
deviceWebsocketClient.connect();
}catch (Exception e){
log.error("start to receive device data failed", e);
}
}
@PostConstruct
public void startStatus() {
try {
log.info("start to receive device status");
URI uri = new URI("ws://127.0.0.1:8889");
Map<String, String> httpHeaders = new HashMap<>(4);
// httpHeaders.put("Origin", "http://" + uri.getHost());
warnWebsocketClient = new DeviceWebsocketClient(uri, httpHeaders, workPoolScheduler, afSideSlopeWarnService);
warnWebsocketClient.connect();
}catch (Exception e){
log.error("start to receive device status failed", e);
}
}
public void sendMessage(String str) {
deviceWebsocketClient.sendMessage(str);
}
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。