Spring boot 整合websocket 客户端
在项目开发中会有需要用websocket客户端的情况,下面就来说一下分享的这个解决方案
其中 workPoolScheduler.execute方法 可以用 ThreadUtil.execute(hutool工具包)代替 省略下方步骤三的线程池配置
package com.enrising.ctsc.park.security.service;
import lombok.extern.slf4j.Slf4j;
import org.java_websocket.client.WebSocketClient;
import org.java_websocket.drafts.Draft_6455;
import org.java_websocket.handshake.ServerHandshake;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.net.URI;
import java.util.Map;
/**
* @Author huasheng
* @Date 2022/11/1 9:49
* @Description
*/
@Slf4j
public class BaseWebsocketClient extends WebSocketClient {
//客户端标识
private String clientName;
//客户端连接状态
private boolean isConnect = false;
//spring包下的线程池类
private ThreadPoolTaskExecutor workPoolScheduler;
public BaseWebsocketClient(URI serverUri, Map<String, String> httpHeaders,
String clientName,
ThreadPoolTaskExecutor workPoolScheduler) {
super(serverUri, new Draft_6455(), httpHeaders, 0);
this.clientName = clientName;
this.workPoolScheduler = workPoolScheduler;
}
@Override
public void onOpen(ServerHandshake serverHandshake) {
}
@Override
public void onMessage(String s) {
}
/***检测到连接关闭之后,会更新连接状态以及尝试重新连接***/
@Override
public void onClose(int i, String s, boolean b) {
log.info("------ {} onClose ------{}", clientName, b);
setConnectState(false);
recontact();
}
/***检测到错误,更新连接状态***/
@Override
public void onError(Exception e) {
log.info("------ {} onError ------{}", clientName, e);
setConnectState(false);
}
public void setConnectState(boolean isConnect) {
this.isConnect = isConnect;
}
public boolean getConnectState(){
return this.isConnect;
}
public ThreadPoolTaskExecutor getWorkPoolScheduler() {
return workPoolScheduler;
}
/**
* 重连
*/
public void recontact() {
workPoolScheduler.execute(() -> {
Thread.currentThread().setName( "ReconnectThread-" + Thread.currentThread().getId() );
try {
Thread.sleep(10000);
log.info("重连开始");
if (isConnect) {
log.info("{} 重连停止", clientName);
return;
}
this.reconnect();
log.info("重连结束");
} catch (Exception e) {
log.info("{} 重连失败", clientName);
}
});
}
}
package com.enrising.ctsc.park.security.service;
import cn.hutool.core.thread.ThreadUtil;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import com.alibaba.nacos.common.utils.UuidUtils;
import lombok.extern.slf4j.Slf4j;
import org.java_websocket.handshake.ServerHandshake;
import org.slf4j.MDC;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.net.URI;
import java.util.Map;
/**
*
*/
@Slf4j
public class DeviceWebsocketClient extends BaseWebsocketClient {
private static final String ACS_CTRL_RESULT = "deviceWebsocketClient";
private static final String SUBSCRIBE = "subscribe";
private WebSocketReportService deviceService;
/*这个订阅格式是实现约定好的,可以具体情况具体分析*/
// private String sendStr = "{\n" +
// " \"method\": \"subscribe\",\n" +
// " \"params\": \"device\"\n" +
// "}";
private String sendStr = "hello";
/**
* 建立连接
* @param serverUri serverUri
* @param httpHeaders httpHeaders
* @param workPoolScheduler workPoolScheduler
* @param deviceService deviceService
*/
public DeviceWebsocketClient(URI serverUri, Map<String, String> httpHeaders, ThreadPoolTaskExecutor workPoolScheduler, WebSocketReportService deviceService) {
super(serverUri, httpHeaders, ACS_CTRL_RESULT, workPoolScheduler);
this.deviceService = deviceService;
}
@Override
public void onOpen(ServerHandshake serverHandshake) {
log.info("------ {} onOpen ------", ACS_CTRL_RESULT);
this.send(sendStr);
setConnectState(true);
}
public void sendMessage(String str) {
log.info("------ {} send ------", str);
this.send(str);
setConnectState(true);
}
@Override
public void onMessage(String msg) {
log.info("WebSocketReportService.onMessage()接收消息={}", msg);
ThreadUtil.execAsync(() -> {
MDC.put(RequestIdTraceInterceptor.REQUEST_ID_KEY, requestId);
try {
//业务代码
deviceService.saveDeviceReportInfo(msg);
} catch (Exception e) {
log.info("WebSocketReportService.onMessage()上报异常={}", e);
}
//请求完成,从MDC中移除requestId
MDC.remove(RequestIdTraceInterceptor.REQUEST_ID_KEY);
});
log.info("WebSocketReportService.onMessage()推送结束={}", msg);
}
}
配置文件
#线程池配置
settings:
work-pool:
core-pool-size: 10
max-pool-size: 20
queue-capacity: 200
实体类
package com.enrising.ctsc.park.security.service;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.ThreadPoolExecutor;
/**
* @author huasheng
* @date 2022/11/1
*/
@Configuration
public class WorkPoolConfig {
@Value("${settings.work-pool.core-pool-size}")
private Integer workPoolCoreSize;
@Value("${settings.work-pool.max-pool-size}")
private Integer workPoolMaxSize;
@Value("${settings.work-pool.queue-capacity}")
private Integer queueCapacity;
@Bean("workPoolScheduler")
public ThreadPoolTaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(workPoolCoreSize);
executor.setMaxPoolSize(workPoolMaxSize);
executor.setQueueCapacity(queueCapacity);
executor.setKeepAliveSeconds(60);
executor.setThreadNamePrefix("-device-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setAwaitTerminationSeconds(60);
return executor;
}
}
WebSocketReportService .java
package xxx.service;
/**
* 公共父类
*/
public interface WebSocketReportService {
/**
* @throws Exception
* @throws
* @Title: saveDeviceReportInfo
* @Description: 保存设备上报信息
* @param: @param data 参数说明
* @return: void 返回类型
*/
void saveDeviceReportInfo(Object report) throws Exception;
}
配置文件
#开关
api:
initListen:
sideSlop:
device:
open: false
根据实际场景用到两个实例,监听并各自处理
package com.enrising.ctsc.park.security.service;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.core.annotation.Order;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
/**
* @Author huasheng
* @Date 2022/12/3 17:04
* @Description
*/
@Component
@Order(value = 6)
@Slf4j
@ConditionalOnProperty(prefix = "api", name = "initListen.sideSlop.device.open")
public class DeviceWebsocketClientService {
@Resource
private 子类1(实现父类saveDeviceReportInfo方法) afSideSlopeDeviceService;
@Resource
private 子类2(实现父类saveDeviceReportInfo方法) afSideSlopeWarnService;
@Resource
private ThreadPoolTaskExecutor workPoolScheduler;
static DeviceWebsocketClient deviceWebsocketClient;
static DeviceWebsocketClient warnWebsocketClient;
@PostConstruct
public void start() {
try {
log.info("start to receive device data");
URI uri = new URI("ws://127.0.0.1:8888");
Map<String, String> httpHeaders = new HashMap<>(4);
// httpHeaders.put("Origin", "http://" + uri.getHost());
deviceWebsocketClient = new DeviceWebsocketClient(uri, httpHeaders, workPoolScheduler, afSideSlopeDeviceService);
deviceWebsocketClient.connect();
}catch (Exception e){
log.error("start to receive device data failed", e);
}
}
@PostConstruct
public void startStatus() {
try {
log.info("start to receive device status");
URI uri = new URI("ws://127.0.0.1:8889");
Map<String, String> httpHeaders = new HashMap<>(4);
// httpHeaders.put("Origin", "http://" + uri.getHost());
warnWebsocketClient = new DeviceWebsocketClient(uri, httpHeaders, workPoolScheduler, afSideSlopeWarnService);
warnWebsocketClient.connect();
}catch (Exception e){
log.error("start to receive device status failed", e);
}
}
public void sendMessage(String str) {
deviceWebsocketClient.sendMessage(str);
}
}
我有一个super简单的脚本,它几乎包含了FayeWebSocketGitHub页面上用于处理关闭连接的内容:ws=Faye::WebSocket::Client.new(url,nil,:headers=>headers)ws.on:opendo|event|p[:open]#sendpingcommand#sendtestcommand#ws.send({command:'test'}.to_json)endws.on:messagedo|event|#hereistheentrypointfordatacomingfromtheserver.pJSON.parse(event.d
我想在Ruby的TCPServer中获取客户端的IP地址。以及(如果可能的话)MAC地址。例如,Ruby中的时间服务器,请参阅评论。tcpserver=TCPServer.new("",80)iftcpserverputs"Listening"loopdosocket=tcpserver.acceptifsocketThread.newdoputs"Connectedfrom"+#HERE!HowcanigettheIPAddressfromtheclient?socket.write(Time.now.to_s)socket.closeendendendend非常感谢!
IntrductionLibwebsocketsisasimple-to-use,MIT-license,pureClibraryprovidingclientandserverforhttp/1,http/2,websockets,MQTTandotherprotocolsinasecurity-minded,lightweight,configurable,scalableandflexibleway.It’seasytobuildandcross-buildviacmakeandissuitablefortasksfromembeddedRTOSthroughmasscloudservi
我有一个模型User,它在创建后的回调中创建了选项#Userhas_one:user_optionsafter_create:create_optionsprivatedefcreate_optionsUserOptions.create(user:self)end我对此有一些简单的Rspec覆盖:describe"newuser"doit"createsuser_optionsaftertheuseriscreated"douser=create(:user)user.user_options.shouldbe_kind_of(UserOptions)endend一切正常,直到我将自
我正在尝试使用RubyEventMachine访问使用SSL证书身份验证的HTTPSWeb服务,但我没有让它工作。我编写了以下简单代码块来对其进行端到端测试:require'rubygems'require'em-http'EventMachine.rundourl='https://foobar.com/'ssl_opts={:private_key_file=>'/tmp/private.key',:cert_chain_file=>'/tmp/ca.pem',:verify_peer=>false}http=EventMachine::HttpRequest.new(url).g
我正在为需要与API建立SSL连接的客户端开发应用程序。我得到了三个文件;一个信任根证书(.cer)文件、一个中间证书(.cer)文件和一个签名的响应文件。我得到的安装说明与IIS或Javakeytool程序有关;我正在用RubyonRails构建应用程序,所以这两种方法都不是一个选项(据我所知)。证书由运行API服务的组织自签名,看来我获得了客户端证书以相互验证https连接。我不确定如何使用我的应用程序中的证书连接和使用API签名响应文件的作用我读过"Usingaself-signedcertificate"和thisarticleonOpenSSLinRuby但两者似乎都不是很到
我正在努力在Ruby中创建启用SSL的服务器,以及与服务器一起使用的相应Ruby客户端。为了进行测试,我使用以下命令创建了自己的根CA证书。$:~/devel/ssl-test/ssl/CA$opensslgenrsa-outTestCA.key2048GeneratingRSAprivatekey,2048bitlongmodulus............+++...........................+++eis65537(0x10001)$:~/devel/ssl-test/ssl/CA$opensslreq-new-keyTestCA.key-outTestCA.
我有带有gemwebsocket-rails0.7的Rails3.2应用程序。在开发机上,一切正常在生产环境中,我使用Nginx/1.6作为代理服务器,Unicorn作为http服务器。Thin用于独立模式(在https://github.com/websocket-rails/websocket-rails/wiki/Standalone-Server-Mode之后)。nginx配置:location/websocket{proxy_passhttp://localhost:3001/websocket;proxy_http_version1.1;proxy_set_headerUp
如果您希望在Spring中启用定时任务功能,则需要在主类上添加 @EnableScheduling 注解。这样Spring才会扫描 @Scheduled 注解并执行定时任务。在大多数情况下,只需要在主类上添加 @EnableScheduling 注解即可,不需要在Service层或其他类中再次添加。以下是一个示例,演示如何在SpringBoot中启用定时任务功能:@SpringBootApplication@EnableSchedulingpublicclassApplication{publicstaticvoidmain(String[]args){SpringApplication.ru
软件特点部署后能通过浏览器查看线上日志。支持Linux、Windows服务器。采用随机读取的方式,支持大文件的读取。支持实时打印新增的日志(类终端)。支持日志搜索。使用手册基本页面配置路径配置日志所在的目录,配置后按回车键生效,下拉框选择日志名称。选择日志后点击生效,即可加载日志。windows路径E:\java\project\log-view\logslinux路径/usr/local/XX历史模式历史模式下,不会读取新增的日志。针对历史文件可以分页读取,配置分页大小、跳转。历史模式下,支持根据关键词搜索。目前搜索引擎使用的是jdk自带类库,搜索速度相对较低,优点是比较简单。2G日志全文搜