jjzjj

Spring boot 整合websocket 客户端

花生大帝 2023-04-22 原文

Spring boot 整合websocket 客户端

前言

在项目开发中会有需要用websocket客户端的情况,下面就来说一下分享的这个解决方案

一、BaseWebsocketClient.java 继承WebsocketClient.java 作为父类

其中 workPoolScheduler.execute方法 可以用 ThreadUtil.execute(hutool工具包)代替 省略下方步骤三的线程池配置

package com.enrising.ctsc.park.security.service;
import lombok.extern.slf4j.Slf4j;
import org.java_websocket.client.WebSocketClient;
import org.java_websocket.drafts.Draft_6455;
import org.java_websocket.handshake.ServerHandshake;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.net.URI;
import java.util.Map;

/**
 * @Author huasheng
 * @Date 2022/11/1 9:49
 * @Description
 */
@Slf4j
public class BaseWebsocketClient extends WebSocketClient {
	//客户端标识
	private String clientName;
	//客户端连接状态
	private boolean isConnect = false;
	//spring包下的线程池类
	private ThreadPoolTaskExecutor workPoolScheduler;

	public BaseWebsocketClient(URI serverUri, Map<String, String> httpHeaders,
							   String clientName,
							   ThreadPoolTaskExecutor workPoolScheduler) {
		super(serverUri, new Draft_6455(), httpHeaders, 0);
		this.clientName = clientName;
		this.workPoolScheduler = workPoolScheduler;
	}

	@Override
	public void onOpen(ServerHandshake serverHandshake) {
	}

	@Override
	public void onMessage(String s) {
	}
	/***检测到连接关闭之后,会更新连接状态以及尝试重新连接***/
	@Override
	public void onClose(int i, String s, boolean b) {
		log.info("------ {} onClose ------{}", clientName, b);
		setConnectState(false);
		recontact();
	}
	/***检测到错误,更新连接状态***/
	@Override
	public void onError(Exception e) {
		log.info("------ {} onError ------{}", clientName, e);
		setConnectState(false);
	}

	public void setConnectState(boolean isConnect) {
		this.isConnect = isConnect;
	}

	public boolean getConnectState(){
		return this.isConnect;
	}

	public ThreadPoolTaskExecutor getWorkPoolScheduler() {
		return workPoolScheduler;
	}

	/**
	 * 重连
	 */
	public void recontact() {
		workPoolScheduler.execute(() -> {
			Thread.currentThread().setName( "ReconnectThread-" + Thread.currentThread().getId() );
			try {
				Thread.sleep(10000);
				log.info("重连开始");
				if (isConnect) {
					log.info("{} 重连停止", clientName);
					return;
				}
				this.reconnect();
				log.info("重连结束");
			} catch (Exception e) {
				log.info("{} 重连失败", clientName);
			}
		});
	}
}


二、DeviceWebsocketClient.java 子类根据实际场景重写方法

package com.enrising.ctsc.park.security.service;


import cn.hutool.core.thread.ThreadUtil;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import com.alibaba.nacos.common.utils.UuidUtils;
import lombok.extern.slf4j.Slf4j;
import org.java_websocket.handshake.ServerHandshake;
import org.slf4j.MDC;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.net.URI;
import java.util.Map;

/**
 *
 */
@Slf4j
public class DeviceWebsocketClient extends BaseWebsocketClient {

	private static final String ACS_CTRL_RESULT = "deviceWebsocketClient";
	private static final String SUBSCRIBE = "subscribe";
	private WebSocketReportService deviceService;
	/*这个订阅格式是实现约定好的,可以具体情况具体分析*/
//	private String sendStr = "{\n" +
//			"    \"method\": \"subscribe\",\n" +
//			"    \"params\": \"device\"\n" +
//			"}";
	private String sendStr = "hello";
	/**
	 * 建立连接
	 * @param serverUri serverUri
	 * @param httpHeaders httpHeaders
	 * @param workPoolScheduler workPoolScheduler
	 * @param deviceService deviceService
	 */
	public DeviceWebsocketClient(URI serverUri, Map<String, String> httpHeaders, ThreadPoolTaskExecutor workPoolScheduler, WebSocketReportService deviceService) {
		super(serverUri, httpHeaders, ACS_CTRL_RESULT, workPoolScheduler);
		this.deviceService = deviceService;
	}

	@Override
	public void onOpen(ServerHandshake serverHandshake) {
		log.info("------ {} onOpen ------", ACS_CTRL_RESULT);
		this.send(sendStr);
		setConnectState(true);
	}

	public void sendMessage(String str) {
		log.info("------ {} send ------", str);
		this.send(str);
		setConnectState(true);
	}


	@Override
	public void onMessage(String msg) {
		log.info("WebSocketReportService.onMessage()接收消息={}", msg);
			ThreadUtil.execAsync(() -> {
			MDC.put(RequestIdTraceInterceptor.REQUEST_ID_KEY, requestId);
			try {
                //业务代码
				deviceService.saveDeviceReportInfo(msg);
			} catch (Exception e) {
				log.info("WebSocketReportService.onMessage()上报异常={}", e);
			}
			//请求完成,从MDC中移除requestId
			MDC.remove(RequestIdTraceInterceptor.REQUEST_ID_KEY);
		});
		log.info("WebSocketReportService.onMessage()推送结束={}", msg);
	}
}

三、WorkPoolConfig.java 线程池

配置文件

#线程池配置
settings: 
  work-pool: 
   core-pool-size: 10
   max-pool-size: 20
   queue-capacity: 200

实体类

package com.enrising.ctsc.park.security.service;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.ThreadPoolExecutor;

/**
 * @author huasheng
 * @date 2022/11/1
 */
@Configuration
public class WorkPoolConfig {

	@Value("${settings.work-pool.core-pool-size}")
	private Integer workPoolCoreSize;

	@Value("${settings.work-pool.max-pool-size}")
	private Integer workPoolMaxSize;

	@Value("${settings.work-pool.queue-capacity}")
	private Integer queueCapacity;

	@Bean("workPoolScheduler")
	public ThreadPoolTaskExecutor taskExecutor() {
		ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
		executor.setCorePoolSize(workPoolCoreSize);
		executor.setMaxPoolSize(workPoolMaxSize);
		executor.setQueueCapacity(queueCapacity);
		executor.setKeepAliveSeconds(60);
		executor.setThreadNamePrefix("-device-");
		executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
		executor.setWaitForTasksToCompleteOnShutdown(true);
		executor.setAwaitTerminationSeconds(60);
		return executor;
	}
}

三、实际场景封装

WebSocketReportService .java

package xxx.service;


/**
 * 公共父类
 */
public interface WebSocketReportService {

	/**
	 * @throws Exception
	 * @throws
	 * @Title: saveDeviceReportInfo
	 * @Description: 保存设备上报信息
	 * @param: @param data 参数说明
	 * @return: void 返回类型
	 */
	void saveDeviceReportInfo(Object report) throws Exception;

}

四、启动客户端

配置文件

#开关
api:
  initListen:
   sideSlop:
    device:
     open: false

根据实际场景用到两个实例,监听并各自处理

package com.enrising.ctsc.park.security.service;

import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.core.annotation.Order;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;

/**
 * @Author huasheng
 * @Date 2022/12/3 17:04
 * @Description
 */
@Component
@Order(value = 6)
@Slf4j
@ConditionalOnProperty(prefix = "api", name = "initListen.sideSlop.device.open")
public class DeviceWebsocketClientService {

	@Resource
	private 子类1(实现父类saveDeviceReportInfo方法) afSideSlopeDeviceService;
	@Resource
	private 子类2(实现父类saveDeviceReportInfo方法) afSideSlopeWarnService;
	@Resource
	private ThreadPoolTaskExecutor workPoolScheduler;

    static DeviceWebsocketClient deviceWebsocketClient;

	static DeviceWebsocketClient warnWebsocketClient;

    @PostConstruct
	public void start() {
		try {
			log.info("start to receive device data");
			URI uri = new URI("ws://127.0.0.1:8888");
			Map<String, String> httpHeaders = new HashMap<>(4);
//			httpHeaders.put("Origin", "http://" + uri.getHost());
			deviceWebsocketClient = new DeviceWebsocketClient(uri, httpHeaders, workPoolScheduler, afSideSlopeDeviceService);
			deviceWebsocketClient.connect();
		}catch (Exception e){
			log.error("start to receive device data failed", e);
		}
	}
	@PostConstruct
	public void startStatus() {
		try {
			log.info("start to receive device status");
			URI uri = new URI("ws://127.0.0.1:8889");
			Map<String, String> httpHeaders = new HashMap<>(4);
//			httpHeaders.put("Origin", "http://" + uri.getHost());
			warnWebsocketClient = new DeviceWebsocketClient(uri, httpHeaders, workPoolScheduler, afSideSlopeWarnService);
			warnWebsocketClient.connect();
		}catch (Exception e){
			log.error("start to receive device status failed", e);
		}
	}
	public void sendMessage(String str) {
		deviceWebsocketClient.sendMessage(str);
	}


}


有关Spring boot 整合websocket 客户端的更多相关文章

  1. ruby - Faye WebSocket,关闭处理程序被触发后重新连接到套接字 - 2

    我有一个super简单的脚本,它几乎包含了FayeWebSocketGitHub页面上用于处理关闭连接的内容:ws=Faye::WebSocket::Client.new(url,nil,:headers=>headers)ws.on:opendo|event|p[:open]#sendpingcommand#sendtestcommand#ws.send({command:'test'}.to_json)endws.on:messagedo|event|#hereistheentrypointfordatacomingfromtheserver.pJSON.parse(event.d

  2. ruby - 在 TCPServer (Ruby) 中,我如何从客户端获取 IP/MAC? - 2

    我想在Ruby的TCPServer中获取客户端的IP地址。以及(如果可能的话)MAC地址。例如,Ruby中的时间服务器,请参阅评论。tcpserver=TCPServer.new("",80)iftcpserverputs"Listening"loopdosocket=tcpserver.acceptifsocketThread.newdoputs"Connectedfrom"+#HERE!HowcanigettheIPAddressfromtheclient?socket.write(Time.now.to_s)socket.closeendendendend非常感谢!

  3. C/C++好用的websocket库 - 2

    IntrductionLibwebsocketsisasimple-to-use,MIT-license,pureClibraryprovidingclientandserverforhttp/1,http/2,websockets,MQTTandotherprotocolsinasecurity-minded,lightweight,configurable,scalableandflexibleway.It’seasytobuildandcross-buildviacmakeandissuitablefortasksfromembeddedRTOSthroughmasscloudservi

  4. ruby-on-rails - 为什么我必须在使用客户验证器后重新加载 rspec 中的记录? - 2

    我有一个模型User,它在创建后的回调中创建了选项#Userhas_one:user_optionsafter_create:create_optionsprivatedefcreate_optionsUserOptions.create(user:self)end我对此有一些简单的Rspec覆盖:describe"newuser"doit"createsuser_optionsaftertheuseriscreated"douser=create(:user)user.user_options.shouldbe_kind_of(UserOptions)endend一切正常,直到我将自

  5. ruby - 如何获得带有 SSL 客户端证书的 HTTPS 请求以与 Ruby EventMachine 一起使用? - 2

    我正在尝试使用RubyEventMachine访问使用SSL证书身份验证的HTTPSWeb服务,但我没有让它工作。我编写了以下简单代码块来对其进行端到端测试:require'rubygems'require'em-http'EventMachine.rundourl='https://foobar.com/'ssl_opts={:private_key_file=>'/tmp/private.key',:cert_chain_file=>'/tmp/ca.pem',:verify_peer=>false}http=EventMachine::HttpRequest.new(url).g

  6. ruby-on-rails - 在 Ruby on Rails 应用程序中使用客户端 SSL - 2

    我正在为需要与API建立SSL连接的客户端开发应用程序。我得到了三个文件;一个信任根证书(.cer)文件、一个中间证书(.cer)文件和一个签名的响应文件。我得到的安装说明与IIS或Javakeytool程序有关;我正在用RubyonRails构建应用程序,所以这两种方法都不是一个选项(据我所知)。证书由运行API服务的组织自签名,看来我获得了客户端证书以相互验证https连接。我不确定如何使用我的应用程序中的证书连接和使用API签名响应文件的作用我读过"Usingaself-signedcertificate"和thisarticleonOpenSSLinRuby但两者似乎都不是很到

  7. ruby - 为什么这个启用 SSL 的 Ruby 服务器/客户端测试有效? - 2

    我正在努力在Ruby中创建启用SSL的服务器,以及与服务器一起使用的相应Ruby客户端。为了进行测试,我使用以下命令创建了自己的根CA证书。$:~/devel/ssl-test/ssl/CA$opensslgenrsa-outTestCA.key2048GeneratingRSAprivatekey,2048bitlongmodulus............+++...........................+++eis65537(0x10001)$:~/devel/ssl-test/ssl/CA$opensslreq-new-keyTestCA.key-outTestCA.

  8. ruby-on-rails - Websocket-rails 不适用于 Nginx 和 Unicorn 的生产环境 - 2

    我有带有gemwebsocket-rails0.7的Rails3.2应用程序。在开发机上,一切正常在生产环境中,我使用Nginx/1.6作为代理服务器,Unicorn作为http服务器。Thin用于独立模式(在https://github.com/websocket-rails/websocket-rails/wiki/Standalone-Server-Mode之后)。nginx配置:location/websocket{proxy_passhttp://localhost:3001/websocket;proxy_http_version1.1;proxy_set_headerUp

  9. springboot定时任务 - 2

    如果您希望在Spring中启用定时任务功能,则需要在主类上添加 @EnableScheduling 注解。这样Spring才会扫描 @Scheduled 注解并执行定时任务。在大多数情况下,只需要在主类上添加 @EnableScheduling 注解即可,不需要在Service层或其他类中再次添加。以下是一个示例,演示如何在SpringBoot中启用定时任务功能:@SpringBootApplication@EnableSchedulingpublicclassApplication{publicstaticvoidmain(String[]args){SpringApplication.ru

  10. 基于SpringBoot的线上日志阅读器 - 2

    软件特点部署后能通过浏览器查看线上日志。支持Linux、Windows服务器。采用随机读取的方式,支持大文件的读取。支持实时打印新增的日志(类终端)。支持日志搜索。使用手册基本页面配置路径配置日志所在的目录,配置后按回车键生效,下拉框选择日志名称。选择日志后点击生效,即可加载日志。windows路径E:\java\project\log-view\logslinux路径/usr/local/XX历史模式历史模式下,不会读取新增的日志。针对历史文件可以分页读取,配置分页大小、跳转。历史模式下,支持根据关键词搜索。目前搜索引擎使用的是jdk自带类库,搜索速度相对较低,优点是比较简单。2G日志全文搜

随机推荐