diff --git a/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/controller/admin/IotConnectManagerStatsController.java b/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/controller/admin/IotConnectManagerStatsController.java
index 5c02fd2..4a52207 100644
--- a/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/controller/admin/IotConnectManagerStatsController.java
+++ b/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/controller/admin/IotConnectManagerStatsController.java
@@ -1,10 +1,13 @@
package com.zt.plat.module.qms.iot.controller.admin;
+import com.alibaba.fastjson.JSONObject;
import com.zt.plat.framework.common.pojo.CommonResult;
-import com.zt.plat.module.qms.iot.tcpserver.core.ChannelContextConstant;
-import com.zt.plat.module.qms.iot.tcpserver.core.IotDeviceSessionContext;
+import com.zt.plat.module.qms.iot.service.IotConnectManagerStatsService;
+import com.zt.plat.module.qms.iot.tcpserver.cluster.TioServerMessageConfig;
+import com.zt.plat.module.qms.iot.tcpserver.cluster.TioServerMessagePublisher;
+import com.zt.plat.module.qms.iot.tcpserver.cluster.TioServerMessageVo;
+import com.zt.plat.module.qms.iot.tcpserver.core.*;
import com.github.xingfudeshi.knife4j.annotations.ApiOperationSupport;
-import com.zt.plat.module.qms.iot.tcpserver.core.IotDeviceSessionForRedisContext;
import com.zt.plat.module.qms.iot.tcpserver.device.RedisSessionComponent;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.tags.Tag;
@@ -13,9 +16,9 @@ import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.core.env.Environment;
import org.springframework.util.ObjectUtils;
import org.springframework.web.bind.annotation.*;
-import com.zt.plat.module.qms.iot.tcpserver.core.ChannelStatInfo;
import tech.zzjc.tio.core.ChannelContext;
import tech.zzjc.tio.core.Tio;
import tech.zzjc.tio.core.stat.ChannelStat;
@@ -27,6 +30,8 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
+import static java.lang.Thread.sleep;
+
/**
* IotManagerStatsController
* iot 管理统计
@@ -47,151 +52,255 @@ public class IotConnectManagerStatsController {
@Autowired private TioServerBootstrap tioServerBootstrap;
@Autowired public RedisSessionComponent redisSessionComponent;
-
+ @Autowired private Environment environment;
+ @Autowired private TioServerMessagePublisher tioServerMessagePublisher;
+ @Autowired private IotConnectManagerStatsService iotConnectManagerStatsService;
@GetMapping("/all-connect")
@ApiOperationSupport(order = 1)
@Operation(summary = "获取当前所有连接")
public CommonResult> currentAllConnect(@RequestParam(name = "clientIp", required = false) String clientIp) {
+
+ //获取spring服务端口
+ String serverPort = environment.getProperty("server.port");
+ String serverName = environment.getProperty("spring.application.name");
+ String msg = "当前服务端口:" + serverPort + ",服务名称:" + serverName;
+ log.error(msg);
+ List list = new ArrayList<>();
+ Set deviceKeys = redisSessionComponent.scanKeys(IotDeviceSessionContext.DEFAULT_DEVICE_SESSION_CONTEXT_KEY + "*");
+ Set clientKeys = redisSessionComponent.scanKeys(IotClientSessionContext.DEFAULT_CLIENT_SESSION_CONTEXT_KEY + "*");
+
+ for(String key : deviceKeys){
+ IotDeviceSessionForRedisContext deviceContext = redisSessionComponent.getByKey( key);
+ String deviceId = deviceContext.getDeviceId();
+ String bsId = Long.toString(Long.valueOf(deviceId), Character.MAX_RADIX);
+ ChannelStatInfo info = new ChannelStatInfo();
+ info.setId(deviceContext.getChannelId());
+// info.setId(deviceContext.getDeviceCode());
+ info.setBsId(bsId);
+ String controlRealName = ObjectUtils.isEmpty(deviceContext.getControlRealName())? "无" : deviceContext.getControlRealName();
+ String deviceName = deviceContext.getDeviceName();
+ String deviceCode = deviceContext.getDeviceCode();
+// info.setRealName(userRealName);
+// info.setRemarks("操作人:" + userRealName);
+ info.setDeviceId(deviceContext.getDeviceId());
+ info.setDeviceCode(deviceCode);
+ info.setDeviceType(deviceContext.getDeviceType());
+ info.setDeviceName(deviceName);
+ StringBuilder sb = new StringBuilder();
+ if (StringUtils.isNoneBlank(deviceName)) {
+ sb.append("设备名称:").append(deviceName).append(";");
+ } else {
+ sb.append("注册号:").append(deviceCode).append(";");
+ //deviceInfo.append("设备Id:").append(deviceId).append(";");
+ }
+ sb.append("控制人:").append(controlRealName);
+ info.setRemarks(sb.toString());
+ info.setClientIp(deviceContext.getClientIp());
+ info.setClientPort(deviceContext.getClientPort());
+ info.setClientProtocol(deviceContext.getClientProtocol());
+ info.setProxyIp(deviceContext.getProxyIp());
+ info.setProxyPort(deviceContext.getProxyPort());
+ info.setProxyProtocol(deviceContext.getProxyProtocol());
+ info.setConnectType(deviceContext.getConnectType());
+ //info.setReConnCount(reconnCount);
+ info.setDecodeFailCount(deviceContext.getDecodeFailCount());
+ info.setUserId(deviceContext.getUserId());
+// info.setToken(token);
+ info.setGmtCreate(deviceContext.getGmtCreate());
+ info.setReceivedBytes(deviceContext.getReceivedBytes());
+ info.setReceivedPackets(deviceContext.getReceivedPackets());
+ info.setReceivedTcps(deviceContext.getReceivedTcps());
+ info.setBytesPerTcpReceive(deviceContext.getBytesPerTcpReceive());
+ info.setPacketsPerTcpReceive(deviceContext.getPacketsPerTcpReceive());
+ info.setHandledBytes(deviceContext.getHandledBytes());
+ info.setHandledPackets(deviceContext.getHandledPackets());
+ info.setSentBytes(deviceContext.getSentBytes());
+ info.setSentPackets(deviceContext.getSentPackets());
+ if(deviceContext.getLastBalanceDataTime() != null)
+ info.setGmtLatestCommunicate(new Date(deviceContext.getLastBalanceDataTime()));
+ if(deviceContext.getRegTime() != null)
+ info.setRegTime(deviceContext.getRegTime());
+
+ list.add(info);
+
+ }
+
+ for(String key : clientKeys){
+ IotClientSessionForRedisContext clientContext = redisSessionComponent.getClientByKey( key);
+ ChannelStatInfo info = new ChannelStatInfo();
+ info.setId(clientContext.getChannelId());
+ String controlRealName = ObjectUtils.isEmpty(clientContext.getControlRealName())? "无" : clientContext.getControlRealName();
+// info.setRealName(userRealName);
+// info.setRemarks("操作人:" + userRealName);
+ StringBuilder sb = new StringBuilder();
+ sb.append("操作人:").append(controlRealName);
+ info.setRemarks(sb.toString());
+ info.setClientIp(clientContext.getClientIp());
+ info.setClientPort(clientContext.getClientPort());
+ info.setClientProtocol(clientContext.getClientProtocol());
+ info.setProxyIp(clientContext.getProxyIp());
+ info.setProxyPort(clientContext.getProxyPort());
+ info.setProxyProtocol(clientContext.getProxyProtocol());
+ info.setConnectType(clientContext.getConnectType());
+ //info.setReConnCount(reconnCount);
+ info.setDecodeFailCount(clientContext.getDecodeFailCount());
+ info.setUserId(clientContext.getUserId());
+// info.setToken(token);
+ info.setGmtCreate(clientContext.getGmtCreate());
+ info.setReceivedBytes(clientContext.getReceivedBytes());
+ info.setReceivedPackets(clientContext.getReceivedPackets());
+ info.setReceivedTcps(clientContext.getReceivedTcps());
+ info.setBytesPerTcpReceive(clientContext.getBytesPerTcpReceive());
+ info.setPacketsPerTcpReceive(clientContext.getPacketsPerTcpReceive());
+ info.setHandledBytes(clientContext.getHandledBytes());
+ info.setHandledPackets(clientContext.getHandledPackets());
+ info.setSentBytes(clientContext.getSentBytes());
+ info.setSentPackets(clientContext.getSentPackets());
+ info.setGmtLatestCommunicate(clientContext.getLastCommunicateTime());
+ info.setRegTime(clientContext.getRegTime());
+
+ list.add(info);
+
+ }
+
SetWithLock setWithLock = Tio.getAll(tioServerBootstrap.getServerTioConfig());
Set set = null;
ReentrantReadWriteLock.ReadLock readLock = setWithLock.getLock().readLock();
- List list = null;
- try {
- readLock.lock();
- set = setWithLock.getObj();
- list = new ArrayList<>();
- for (ChannelContext channelContext : set) {
-
- ChannelStatInfo info = new ChannelStatInfo();
- ChannelStat stat = channelContext.stat;
-
-
- String id = channelContext.getId();
- String bsId = channelContext.getBsId();
- Date regTime = channelContext.get("regTime") == null ? null : (Date) channelContext.get("regTime");
- String ip = channelContext.get("realClientIp") == null ? channelContext.getClientNode().getIp() : channelContext.get("realClientIp").toString(); // 客户端IP
- int port = channelContext.get("realClientPort") == null ? channelContext.getClientNode().getPort() : Integer.parseInt(channelContext.get("realClientPort").toString()); // 客户端端口
- String protocol = channelContext.get("realClientProtocol") == null ? channelContext.getClientNode().getProtocol() : channelContext.get("realClientProtocol").toString();//协议
- String ipProxy = channelContext.getProxyClientNode() == null ? "" : channelContext.getProxyClientNode().getIp();//代理 ip
- int portProxy = channelContext.getProxyClientNode() == null ? -1 : channelContext.getProxyClientNode().getPort();//代理 端口
- String protocolProxy = channelContext.getProxyClientNode() == null ? "" : channelContext.getProxyClientNode().getProtocol(); //代理协议
-
- //连接类型
- String connectType = channelContext.get(ChannelContextConstant.CONNECT_TYPE) == null ? "" : channelContext.get(ChannelContextConstant.CONNECT_TYPE).toString();
-
- switch (connectType) {
- case ChannelContextConstant.CONNECT_CONSOLE_CLIENT://控制客户端
- String userRealName = channelContext.get("user-real-name") == null ? "无" : channelContext.get("user-real-name").toString();
- info.setRealName(userRealName);
- info.setRemarks("操作人:" + userRealName);
- break;
- case ChannelContextConstant.CONNECT_CAA_CLIENT://平板端
- String caaUserRealName = channelContext.get("user-real-name") == null ? "无" : channelContext.get("user-real-name").toString();
- info.setRealName(caaUserRealName);
- info.setRemarks("操作人:" + caaUserRealName);
- break;
- case ChannelContextConstant.CONNECT_DEVICE://设备
- IotDeviceSessionContext deviceSessionContext = (IotDeviceSessionContext) channelContext.get(IotDeviceSessionContext.DEFAULT_DEVICE_SESSION_CONTEXT_KEY);
- String deviceCode = deviceSessionContext.getDeviceCode();
- IotDeviceSessionForRedisContext deviceContext = redisSessionComponent.getByDeviceCode(deviceCode);
- String deviceId = "";
- String deviceType = "";
- String deviceName = "";
- String controlRealName = "";
- if(deviceContext != null){
- deviceId = deviceContext.getDeviceId();
- deviceType = deviceContext.getDeviceType();
- deviceName = deviceContext.getDeviceName();
- controlRealName = deviceContext.getControlRealName();
- }
- info.setDeviceId(deviceId);
- info.setDeviceCode(deviceCode);
- info.setDeviceType(deviceType);
- info.setDeviceName(deviceName);
- info.setControlRealName(controlRealName);
- StringBuilder deviceInfo = new StringBuilder();
- if (StringUtils.isNoneBlank(deviceName)) {
- deviceInfo.append("设备名称:").append(deviceName).append(";");
- } else {
- deviceInfo.append("注册号:").append(deviceCode).append(";");
- //deviceInfo.append("设备Id:").append(deviceId).append(";");
- }
- deviceInfo.append("控制人:").append(controlRealName);
- info.setRemarks(deviceInfo.toString());
- break;
-
- default:
- break;
- }
-
-
- int decodeFailCount = stat.getDecodeFailCount(); // 解码异常次数
- String userId = channelContext.userid; // 用户ID
- String token = channelContext.getToken(); // 用户token
- long timeStart = stat.getTimeCreated(); // 统计开始时间
- // 统计时长
- AtomicLong receivedBytes = stat.getReceivedBytes(); // 已接收字节
- AtomicLong receivedPackets = stat.getReceivedPackets(); // 已接收业务包
- AtomicLong receivedTcps = stat.getReceivedTcps(); // 已接收TCP包
-
- double bytesPerTcpReceive = stat.getBytesPerTcpReceive(); // 平均每次TCP包接收的字节数
- double packetsPerTcpReceive = stat.getPacketsPerTcpReceive(); // 平均每次TCP包接收的业务包
-
- AtomicLong handledBytes = stat.getHandledBytes(); // 已处理字节
- AtomicLong handledPackets = stat.getHandledPackets(); // 已处理业务包
-
- AtomicLong sentBytes = stat.getSentBytes(); // 已发送字节数
- AtomicLong sentPackets = stat.getSentPackets(); // 已发送业务包
-
- long timeLatestReceivedMsg = stat.getLatestTimeOfReceivedByte();
- long timeLatestSentMsg = stat.getLatestTimeOfSentPacket();
- long latestCommunicateTime = Math.max(timeLatestReceivedMsg, timeLatestSentMsg); // 最近一次通信时间
-
- info.setId(id);
- info.setBsId(bsId);
- info.setClientIp(ip);
- info.setClientPort(port);
- info.setClientProtocol(protocol);
- info.setProxyIp(ipProxy);
- info.setProxyPort(portProxy);
- info.setProxyProtocol(protocolProxy);
- info.setConnectType(connectType);
- //info.setReConnCount(reconnCount);
- info.setDecodeFailCount(decodeFailCount);
- info.setUserId(userId);
- info.setToken(token);
- info.setGmtCreate(new Date(timeStart));
- info.setReceivedBytes(receivedBytes);
- info.setReceivedPackets(receivedPackets);
- info.setReceivedTcps(receivedTcps);
- info.setBytesPerTcpReceive(bytesPerTcpReceive);
- info.setPacketsPerTcpReceive(packetsPerTcpReceive);
- info.setHandledBytes(handledBytes);
- info.setHandledPackets(handledPackets);
- info.setSentBytes(sentBytes);
- info.setSentPackets(sentPackets);
- info.setGmtLatestCommunicate(new Date(latestCommunicateTime));
- info.setRegTime(regTime);
-
- list.add(info);
-
- log.info("Current Client Info : [{}].", info);
- }
- } catch (Throwable e) {
- log.error("获取在线连接失败!", e);
- } finally {
- readLock.unlock();
- }
+// try {
+// readLock.lock();
+// set = setWithLock.getObj();
+// for (ChannelContext channelContext : set) {
+//
+// ChannelStatInfo info = new ChannelStatInfo();
+// ChannelStat stat = channelContext.stat;
+//
+//
+// String id = channelContext.getId();
+// String bsId = channelContext.getBsId();
+// Date regTime = channelContext.get("regTime") == null ? null : (Date) channelContext.get("regTime");
+// String ip = channelContext.get("realClientIp") == null ? channelContext.getClientNode().getIp() : channelContext.get("realClientIp").toString(); // 客户端IP
+// int port = channelContext.get("realClientPort") == null ? channelContext.getClientNode().getPort() : Integer.parseInt(channelContext.get("realClientPort").toString()); // 客户端端口
+// String protocol = channelContext.get("realClientProtocol") == null ? channelContext.getClientNode().getProtocol() : channelContext.get("realClientProtocol").toString();//协议
+// String ipProxy = channelContext.getProxyClientNode() == null ? "" : channelContext.getProxyClientNode().getIp();//代理 ip
+// int portProxy = channelContext.getProxyClientNode() == null ? -1 : channelContext.getProxyClientNode().getPort();//代理 端口
+// String protocolProxy = channelContext.getProxyClientNode() == null ? "" : channelContext.getProxyClientNode().getProtocol(); //代理协议
+//
+// //连接类型
+// String connectType = channelContext.get(ChannelContextConstant.CONNECT_TYPE) == null ? "" : channelContext.get(ChannelContextConstant.CONNECT_TYPE).toString();
+// if(connectType.equals(ChannelContextConstant.CONNECT_DEVICE)) //设备
+// continue;
+// switch (connectType) {
+// case ChannelContextConstant.CONNECT_CONSOLE_CLIENT://控制客户端
+// String userRealName = channelContext.get("user-real-name") == null ? "无" : channelContext.get("user-real-name").toString();
+// info.setRealName(userRealName);
+// info.setRemarks("操作人:" + userRealName);
+// break;
+// case ChannelContextConstant.CONNECT_CAA_CLIENT://平板端
+// String caaUserRealName = channelContext.get("user-real-name") == null ? "无" : channelContext.get("user-real-name").toString();
+// info.setRealName(caaUserRealName);
+// info.setRemarks("操作人:" + caaUserRealName);
+// break;
+// case ChannelContextConstant.CONNECT_DEVICE://设备
+// IotDeviceSessionContext deviceSessionContext = (IotDeviceSessionContext) channelContext.get(IotDeviceSessionContext.DEFAULT_DEVICE_SESSION_CONTEXT_KEY);
+// String deviceCode = deviceSessionContext.getDeviceCode();
+// IotDeviceSessionForRedisContext deviceContext = redisSessionComponent.getByDeviceCode(deviceCode);
+// String deviceId = "";
+// String deviceType = "";
+// String deviceName = "";
+// String controlRealName = "";
+// if(deviceContext != null){
+// deviceId = deviceContext.getDeviceId();
+// deviceType = deviceContext.getDeviceType();
+// deviceName = deviceContext.getDeviceName();
+// controlRealName = deviceContext.getControlRealName();
+// }
+// info.setDeviceId(deviceId);
+// info.setDeviceCode(deviceCode);
+// info.setDeviceType(deviceType);
+// info.setDeviceName(deviceName);
+// info.setControlRealName(controlRealName);
+// StringBuilder deviceInfo = new StringBuilder();
+// if (StringUtils.isNoneBlank(deviceName)) {
+// deviceInfo.append("设备名称:").append(deviceName).append(";");
+// } else {
+// deviceInfo.append("注册号:").append(deviceCode).append(";");
+// //deviceInfo.append("设备Id:").append(deviceId).append(";");
+// }
+// deviceInfo.append("控制人:").append(controlRealName);
+// info.setRemarks(deviceInfo.toString());
+// break;
+//
+// default:
+// break;
+// }
+//
+//
+// int decodeFailCount = stat.getDecodeFailCount(); // 解码异常次数
+// String userId = channelContext.userid; // 用户ID
+// String token = channelContext.getToken(); // 用户token
+// long timeStart = stat.getTimeCreated(); // 统计开始时间
+// // 统计时长
+// AtomicLong receivedBytes = stat.getReceivedBytes(); // 已接收字节
+// AtomicLong receivedPackets = stat.getReceivedPackets(); // 已接收业务包
+// AtomicLong receivedTcps = stat.getReceivedTcps(); // 已接收TCP包
+//
+// double bytesPerTcpReceive = stat.getBytesPerTcpReceive(); // 平均每次TCP包接收的字节数
+// double packetsPerTcpReceive = stat.getPacketsPerTcpReceive(); // 平均每次TCP包接收的业务包
+//
+// AtomicLong handledBytes = stat.getHandledBytes(); // 已处理字节
+// AtomicLong handledPackets = stat.getHandledPackets(); // 已处理业务包
+//
+// AtomicLong sentBytes = stat.getSentBytes(); // 已发送字节数
+// AtomicLong sentPackets = stat.getSentPackets(); // 已发送业务包
+//
+// long timeLatestReceivedMsg = stat.getLatestTimeOfReceivedByte();
+// long timeLatestSentMsg = stat.getLatestTimeOfSentPacket();
+// long latestCommunicateTime = Math.max(timeLatestReceivedMsg, timeLatestSentMsg); // 最近一次通信时间
+//
+// info.setId(id);
+// info.setBsId(bsId);
+// info.setClientIp(ip);
+// info.setClientPort(port);
+// info.setClientProtocol(protocol);
+// info.setProxyIp(ipProxy);
+// info.setProxyPort(portProxy);
+// info.setProxyProtocol(protocolProxy);
+// info.setConnectType(connectType);
+// //info.setReConnCount(reconnCount);
+// info.setDecodeFailCount(decodeFailCount);
+// info.setUserId(userId);
+// info.setToken(token);
+// info.setGmtCreate(new Date(timeStart));
+// info.setReceivedBytes(receivedBytes);
+// info.setReceivedPackets(receivedPackets);
+// info.setReceivedTcps(receivedTcps);
+// info.setBytesPerTcpReceive(bytesPerTcpReceive);
+// info.setPacketsPerTcpReceive(packetsPerTcpReceive);
+// info.setHandledBytes(handledBytes);
+// info.setHandledPackets(handledPackets);
+// info.setSentBytes(sentBytes);
+// info.setSentPackets(sentPackets);
+// info.setGmtLatestCommunicate(new Date(latestCommunicateTime));
+// info.setRegTime(regTime);
+//
+// list.add(info);
+//
+//// log.info("Current Client Info : [{}].", info);
+// }
+// } catch (Throwable e) {
+// log.error("获取在线连接失败!", e);
+// } finally {
+// readLock.unlock();
+// }
- List resultList = list.stream().filter(f -> {
- if (StringUtils.isNotBlank(clientIp)) {
- return clientIp.equals(f.getClientIp());
- }
- return true;
- }).sorted(Comparator.comparing(ChannelStatInfo::getClientIp).thenComparing(ChannelStatInfo::getConnectType).thenComparing(ChannelStatInfo::getClientPort))
- .collect(Collectors.toList());
- return CommonResult.success(resultList);
+// List resultList = list.stream().filter(f -> {
+// if (StringUtils.isNotBlank(clientIp)) {
+// return clientIp.equals(f.getClientIp());
+// }
+// return true;
+// }).sorted(Comparator.comparing(ChannelStatInfo::getClientIp).thenComparing(ChannelStatInfo::getConnectType).thenComparing(ChannelStatInfo::getClientPort)).collect(Collectors.toList());
+ return CommonResult.success(list);
}
/**
@@ -202,17 +311,8 @@ public class IotConnectManagerStatsController {
@PostMapping("/close")
@ApiOperationSupport(order = 6)
@Operation(summary = "关闭连接")
- public CommonResult> close(@RequestParam(name="id",required=true) String id) {
- ChannelContext channelContext = Tio.getByChannelContextId(tioServerBootstrap.getServerTioConfig(), id);
- IotDeviceSessionContext sessionContext = (IotDeviceSessionContext) channelContext.get(IotDeviceSessionContext.DEFAULT_DEVICE_SESSION_CONTEXT_KEY);
- if(sessionContext != null){
- String deviceId = sessionContext.getDeviceId();
- if(!ObjectUtils.isEmpty(deviceId))
- redisSessionComponent.deleteByDeviceId(deviceId);
- }
- Tio.IpBlacklist.clear();//清空全局黑名单
- Tio.close(channelContext, "控制端主动关闭连接");
- return CommonResult.success("ok");
+ public CommonResult> close(@RequestParam(name="id",required=true) String id) throws InterruptedException {
+ return iotConnectManagerStatsService.close(id, "0");
}
/**
@@ -223,20 +323,17 @@ public class IotConnectManagerStatsController {
@PostMapping("/clearDeviceControl")
@ApiOperationSupport(order = 7)
@Operation(summary = "清除设备控制")
- public CommonResult> clearDeviceControl(@RequestParam(name="id",required=true) String id) {
- ChannelContext channelContext = Tio.getByChannelContextId(tioServerBootstrap.getServerTioConfig(), id);
- Tio.IpBlacklist.clear();//清空全局黑名单
- if(channelContext != null) {
- IotDeviceSessionContext deviceSessionContext = (IotDeviceSessionContext) channelContext.get(IotDeviceSessionContext.DEFAULT_DEVICE_SESSION_CONTEXT_KEY);
- String deviceId = deviceSessionContext.getDeviceId();
- if(!ObjectUtils.isEmpty(deviceId)){
- redisSessionComponent.clearControlByDeviceId(deviceId, channelContext);
- }
- //设置控制点的channelContext
-// deviceSessionContext.setControlChannelContext(null);
-// deviceSessionContext.setControlRealName("");
-// channelContext.set(ChannelContextConstant.CONTROL_DEVICE_ID, "");
- }
+ public CommonResult> clearDeviceControl(@RequestParam(name="id") String id) throws InterruptedException {
+ return iotConnectManagerStatsService.clearDeviceControl(id, "0");
+ }
+
+ @RequestMapping("/testMsg")
+ public CommonResult> testMsg() {
+
+ TioServerMessageVo vo = new TioServerMessageVo();
+ vo.setServerClusterMsgType("type1");
+ vo.setGroup("aaaa");
+ tioServerMessagePublisher.publish( vo);
return CommonResult.success("ok");
}
}
diff --git a/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/service/IotConnectManagerStatsService.java b/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/service/IotConnectManagerStatsService.java
new file mode 100644
index 0000000..7075b07
--- /dev/null
+++ b/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/service/IotConnectManagerStatsService.java
@@ -0,0 +1,80 @@
+package com.zt.plat.module.qms.iot.service;
+
+import com.zt.plat.framework.common.pojo.CommonResult;
+import com.zt.plat.module.qms.iot.tcpserver.cluster.TioServerMessageConfig;
+import com.zt.plat.module.qms.iot.tcpserver.cluster.TioServerMessagePublisher;
+import com.zt.plat.module.qms.iot.tcpserver.cluster.TioServerMessageVo;
+import com.zt.plat.module.qms.iot.tcpserver.core.IotDeviceSessionContext;
+import com.zt.plat.module.qms.iot.tcpserver.device.RedisSessionComponent;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.core.env.Environment;
+import org.springframework.stereotype.Service;
+import org.springframework.util.ObjectUtils;
+import org.springframework.web.bind.annotation.RequestParam;
+import tech.zzjc.tio.core.ChannelContext;
+import tech.zzjc.tio.core.Tio;
+import tech.zzjc.tio.starter.TioServerBootstrap;
+
+import static java.lang.Thread.sleep;
+
+@Service
+public class IotConnectManagerStatsService {
+
+ @Autowired
+ private TioServerBootstrap tioServerBootstrap;
+ @Autowired public RedisSessionComponent redisSessionComponent;
+ @Autowired private TioServerMessagePublisher tioServerMessagePublisher;
+
+ public CommonResult> close(String id, String fromSubscribe) throws InterruptedException {
+ ChannelContext channelContext = Tio.getByChannelContextId(tioServerBootstrap.getServerTioConfig(), id);
+ if(channelContext == null){
+ if("1".equals(fromSubscribe))
+ return CommonResult.success("ok");
+ //广播消息处理
+ TioServerMessageVo message = new TioServerMessageVo();
+ message.setServerClusterMsgType(TioServerMessageConfig.MESSAGE_TYPE_CLOSE_MANUAL);
+ message.setChannelId(id);
+ tioServerMessagePublisher.publish(message);
+ //延迟0.5秒返回
+ sleep(500);
+ return CommonResult.success("ok");
+ }
+ IotDeviceSessionContext sessionContext = (IotDeviceSessionContext) channelContext.get(IotDeviceSessionContext.DEFAULT_DEVICE_SESSION_CONTEXT_KEY);
+ String deviceId = sessionContext.getDeviceId();
+ if(!ObjectUtils.isEmpty(deviceId))
+ redisSessionComponent.deleteByDeviceId(deviceId);
+ Tio.IpBlacklist.clear();//清空全局黑名单
+ Tio.close(channelContext, "控制端主动关闭连接");
+ return CommonResult.success("ok");
+ }
+
+ public CommonResult> clearDeviceControl(String id, String fromSubscribe) throws InterruptedException {
+ ChannelContext channelContext = Tio.getByChannelContextId(tioServerBootstrap.getServerTioConfig(), id);
+ Tio.IpBlacklist.clear();//清空全局黑名单
+ if(channelContext == null){
+ if("1".equals(fromSubscribe))
+ return CommonResult.success("ok");
+ //广播消息处理
+ TioServerMessageVo message = new TioServerMessageVo();
+ message.setServerClusterMsgType(TioServerMessageConfig.MESSAGE_TYPE_CLEAR_MANUAL);
+ message.setChannelId(id);
+ tioServerMessagePublisher.publish(message);
+ //延迟0.5秒返回
+ sleep(500);
+ return CommonResult.success("ok");
+ }
+ IotDeviceSessionContext deviceSessionContext = (IotDeviceSessionContext) channelContext.get(IotDeviceSessionContext.DEFAULT_DEVICE_SESSION_CONTEXT_KEY);
+ String deviceId = deviceSessionContext.getDeviceId();
+ if(!ObjectUtils.isEmpty(deviceId)){
+ redisSessionComponent.clearControlByDeviceId(deviceId, channelContext);
+ }
+// if(channelContext != null) {
+ //设置控制点的channelContext
+// deviceSessionContext.setControlChannelContext(null);
+// deviceSessionContext.setControlRealName("");
+// channelContext.set(ChannelContextConstant.CONTROL_DEVICE_ID, "");
+// }
+ return CommonResult.success("ok");
+ }
+
+}
diff --git a/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/IotServerAioListener.java b/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/IotServerAioListener.java
index 5c13bd1..38e39b5 100644
--- a/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/IotServerAioListener.java
+++ b/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/IotServerAioListener.java
@@ -86,7 +86,6 @@ public class IotServerAioListener implements ServerAioListener {
if(!ObjectUtils.isEmpty(deviceCode)){
redisSessionComponent.clearControlByDeviceCode(deviceCode.toString(), channelContext);
}
-
//解绑业务ID
Tio.unbindBsId(channelContext);
//解绑群组
diff --git a/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/caaclient/IotCaaClientControlDeviceHander.java b/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/caaclient/IotCaaClientControlDeviceHander.java
index c6a35a7..b9d509b 100644
--- a/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/caaclient/IotCaaClientControlDeviceHander.java
+++ b/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/caaclient/IotCaaClientControlDeviceHander.java
@@ -1,6 +1,9 @@
package com.zt.plat.module.qms.iot.tcpserver.caaclient;
import com.zt.plat.module.qms.iot.tcpserver.IotUtils;
+import com.zt.plat.module.qms.iot.tcpserver.cluster.TioServerMessageConfig;
+import com.zt.plat.module.qms.iot.tcpserver.cluster.TioServerMessagePublisher;
+import com.zt.plat.module.qms.iot.tcpserver.cluster.TioServerMessageVo;
import com.zt.plat.module.qms.iot.tcpserver.core.*;
import com.zt.plat.module.qms.iot.tcpserver.core.ChannelContextConstant;
import com.zt.plat.module.qms.iot.tcpserver.core.ClientType;
@@ -12,6 +15,7 @@ import com.alibaba.fastjson.JSONObject;
import com.zt.plat.module.qms.iot.tcpserver.IotPacket;
import com.zt.plat.module.qms.iot.tcpserver.caaclient.bean.ControlDevice;
import com.zt.plat.module.qms.iot.tcpserver.handler.IotDataHander;
+import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
@@ -22,8 +26,10 @@ import tech.zzjc.tio.core.ChannelContext;
import tech.zzjc.tio.core.ChannelContextFilter;
import tech.zzjc.tio.core.Tio;
import tech.zzjc.tio.core.TioConfig;
+import tech.zzjc.tio.core.stat.ChannelStat;
import java.nio.charset.Charset;
+import java.util.Date;
import static com.zt.plat.module.qms.iot.tcpserver.core.GroupConstant.CAA_ALL_CLIENT;
@@ -42,17 +48,24 @@ import static com.zt.plat.module.qms.iot.tcpserver.core.GroupConstant.CAA_ALL_CL
public class IotCaaClientControlDeviceHander implements IotDataHander {
@Autowired public RedisSessionComponent redisSessionComponent;
+
+ @Autowired
+ private TioServerMessagePublisher tioServerMessagePublisher;
+
@Override
public Object hander(String msgId, String data, ChannelContext channelContext) throws Exception {
+ ControlDevice controlDeviceOriginal = JSON.parseObject(data, ControlDevice.class);
ControlDevice controlDevice = JSON.parseObject(data, ControlDevice.class);
//获取客户端连接信息
+ ChannelStat stat = channelContext.stat;
String realClientIp = channelContext.get("realClientIp") == null ? channelContext.getClientNode().getIp() : channelContext.get("realClientIp").toString();
String realClientPort = channelContext.get("realClientPort") == null ? channelContext.getClientNode().getPort() + "" : channelContext.get("realClientPort").toString();
+ String deviceId = controlDevice.getDeviceId();
String realClient = realClientIp + ":" + realClientPort;
log.info("{},, APP客户端控制设备 :{}", realClient, controlDevice);
- String deviceId = controlDevice.getDeviceId();
+ JSONObject json = null;
+ IotPacket iotPacket = new IotPacket();
if (StringUtils.isBlank(deviceId)) {
- JSONObject json = null;
json = new JSONObject();
json.put("msgId", msgId);
json.put("cmd", Command.REPLY_CONTROL_DEVICE.getName());
@@ -60,17 +73,16 @@ public class IotCaaClientControlDeviceHander implements IotDataHander {
controlDevice.setSuccess(false);
controlDevice.setMsg("设备解析为空!");
json.put("data", controlDevice);
- IotPacket iotPacket = new IotPacket();
+ iotPacket = new IotPacket();
iotPacket.setBody((json.toJSONString() + "\r\n").getBytes(Charset.forName(IotPacket.CHARSET)));
Tio.send(channelContext, iotPacket);
return null;
}
- String deviceCode = IotUtils.transDeviceIdToCode(deviceId);
- ChannelContext deviceChannelContext = Tio.getByBsId(channelContext.getTioConfig(), deviceCode);
-
- IotPacket iotPacket = new IotPacket();
- JSONObject json = null;
- if (deviceChannelContext == null) {
+ IotDeviceSessionForRedisContext deviceContext = redisSessionComponent.getByDeviceId(deviceId);
+ if (deviceContext == null) {
+ //如果是订阅消息,不再广播
+ if("1".equals(controlDevice.getFromSubscribe()))
+ return null;
json = new JSONObject();
json.put("msgId", msgId);
json.put("cmd", Command.REPLY_CONTROL_DEVICE.getName());
@@ -82,11 +94,31 @@ public class IotCaaClientControlDeviceHander implements IotDataHander {
Tio.send(channelContext, iotPacket);
return null;
}
+ String deviceControlUserId = deviceContext.getUserId();
+ String curUserId = controlDevice.getControlUserId();
+
+ String deviceCode = IotUtils.transDeviceIdToCode(deviceId);
+ ChannelContext deviceChannelContext = Tio.getByBsId(channelContext.getTioConfig(), deviceCode);
+ //如果设备不在本服务实例,发送广播到其他服务处理
+ if(deviceChannelContext == null){
+ String channelId = deviceContext.getChannelId();
+ TioServerMessageVo message = new TioServerMessageVo();
+ message.setServerClusterMsgType(TioServerMessageConfig.MESSAGE_TYPE_CONTROL);
+ message.setChannelId(channelId);
+// message.setChannelContext(channelContext);
+ controlDeviceOriginal.setFromSubscribe("1");
+ iotPacket.setBody((JSONObject.toJSONString(controlDeviceOriginal) + "\r\n").getBytes(Charset.forName(IotPacket.CHARSET)));
+ message.setPacket(iotPacket);
+ tioServerMessagePublisher.publish(message);
+ return null;
+ }
+
+ TioConfig tioConfig = deviceChannelContext.getTioConfig();
if (controlDevice.getIsControl()) {
//控制
// deviceContext.setControlChannelContext(channelContext);
- IotDeviceSessionForRedisContext deviceContext = redisSessionComponent.getByDeviceId(deviceId);
deviceContext.setControlRealName(controlDevice.getControlRealName());
+ deviceContext.setUserId(curUserId);
channelContext.set(ChannelContextConstant.CONTROL_DEVICE_ID, deviceCode);
controlDevice.setMsg("设备被“" + controlDevice.getControlRealName() + "”远程控制!");
redisSessionComponent.update(deviceContext);
@@ -106,18 +138,18 @@ public class IotCaaClientControlDeviceHander implements IotDataHander {
json.put("msgId", msgId);
json.put("cmd", Command.REPLY_CONTROL_DEVICE.getName());
json.put("clientType", ClientType.CAA_CLIENT.getName());
- controlDevice.setSuccess(true);
- json.put("data", controlDevice);
+// controlDevice.setSuccess(true);
+// json.put("data", controlDevice);
+ json.put("data", JSON.parseObject(data, ControlDevice.class));
iotPacket.setBody((json.toJSONString() + "\r\n").getBytes(Charset.forName(IotPacket.CHARSET)));
- //发送通知到其他服务实例
- TioConfig tioConfig = deviceChannelContext.getTioConfig();
+ //发送通知到其他服务实例(其他实例通知各自客户端)
TioClusterConfig tioClusterConfig = tioConfig.getTioClusterConfig();
TioClusterVo iotClusterVo = new TioClusterVo(iotPacket);
iotClusterVo.setGroup(CAA_ALL_CLIENT);
tioClusterConfig.publish(iotClusterVo);
//发送消息到本服务实例连接的客户端
- Tio.sendToGroup(channelContext.getTioConfig(), CAA_ALL_CLIENT, iotPacket, new ChannelContextFilter() {
+ Tio.sendToGroup(tioConfig, CAA_ALL_CLIENT, iotPacket, new ChannelContextFilter() {
@Override
public boolean filter(ChannelContext cc) {
if (channelContext.getId().equals(cc.getId())) {
@@ -126,6 +158,9 @@ public class IotCaaClientControlDeviceHander implements IotDataHander {
return true;
}}
);
+
+
+
return null;
}
diff --git a/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/caaclient/IotCaaClientRegisterHander.java b/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/caaclient/IotCaaClientRegisterHander.java
index bb53f5a..f75486e 100644
--- a/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/caaclient/IotCaaClientRegisterHander.java
+++ b/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/caaclient/IotCaaClientRegisterHander.java
@@ -1,16 +1,19 @@
package com.zt.plat.module.qms.iot.tcpserver.caaclient;
import com.alibaba.fastjson.JSON;
+import com.fhs.common.spring.SpringContextUtil;
import com.zt.plat.module.qms.iot.tcpserver.IotPacket;
import com.zt.plat.module.qms.iot.tcpserver.consoleclient.bean.Register;
import com.zt.plat.module.qms.iot.tcpserver.core.ClientType;
import com.zt.plat.module.qms.iot.tcpserver.core.Command;
import com.zt.plat.module.qms.iot.tcpserver.core.ReplyResult;
+import com.zt.plat.module.qms.iot.tcpserver.device.RedisSessionComponent;
import com.zt.plat.module.qms.iot.tcpserver.handler.IotDataHander;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import tech.zzjc.tio.core.ChannelContext;
import tech.zzjc.tio.core.Tio;
+import tech.zzjc.tio.utils.SystemTimer;
import java.nio.charset.Charset;
import java.util.Date;
@@ -34,11 +37,7 @@ public class IotCaaClientRegisterHander implements IotDataHander {
@Override
public Object hander(String msgId, String data, ChannelContext channelContext) throws Exception {
Register register = JSON.parseObject(data, Register.class);
- //获取客户端连接信息
- String realClientIp = channelContext.get("realClientIp") == null ? channelContext.getClientNode().getIp() : channelContext.get("realClientIp").toString();
- String realClientPort = channelContext.get("realClientPort") == null ? channelContext.getClientNode().getPort() + "" : channelContext.get("realClientPort").toString();
- String realClient = realClientIp + ":" + realClientPort;
- log.info("{},, APP客户端注册租户ID:{},用户ID:{}", realClient, register.getTenantId(), register.getUserId());
+
Tio.bindUser(channelContext, register.getUserId());
Tio.bindToken(channelContext, register.getUserId());
channelContext.set("regTime", new Date());
@@ -49,6 +48,15 @@ public class IotCaaClientRegisterHander implements IotDataHander {
Tio.send(channelContext, resppacket);
//绑定群组
Tio.bindGroup(channelContext, CAA_ALL_CLIENT);
+
+ //在redis记录客户端信息
+// RedisSessionComponent redisSessionComponent = (RedisSessionComponent) SpringContextUtil.getBean("redisSessionComponent");
+// redisSessionComponent.regClient(channelContext);
+
+
+ //
+
+
return null;
}
diff --git a/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/caaclient/bean/ControlDevice.java b/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/caaclient/bean/ControlDevice.java
index 8adb3e8..ba8cfc3 100644
--- a/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/caaclient/bean/ControlDevice.java
+++ b/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/caaclient/bean/ControlDevice.java
@@ -24,7 +24,8 @@ public class ControlDevice implements Serializable {
/** 控制者姓名 **/
private String controlRealName;
-
+ private String controlUserId;
+
/** 是否控制 **/
private Boolean isControl;
@@ -33,4 +34,7 @@ public class ControlDevice implements Serializable {
/** 回复app时使用 消息 **/
private String msg;
+
+ /** 来自消息订阅 **/
+ private String fromSubscribe;
}
diff --git a/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/cluster/InstanceIdProvider.java b/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/cluster/InstanceIdProvider.java
new file mode 100644
index 0000000..5c6fc9a
--- /dev/null
+++ b/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/cluster/InstanceIdProvider.java
@@ -0,0 +1,20 @@
+package com.zt.plat.module.qms.iot.tcpserver.cluster;
+
+import org.springframework.stereotype.Component;
+
+import java.util.UUID;
+
+@Component
+public class InstanceIdProvider {
+
+ private final String instanceId;
+
+ public InstanceIdProvider() {
+ //使用随机 UUID(每次重启会变)
+ this.instanceId = UUID.randomUUID().toString();
+ }
+
+ public String getInstanceId() {
+ return instanceId;
+ }
+}
diff --git a/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/cluster/TioServerMessageConfig.java b/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/cluster/TioServerMessageConfig.java
new file mode 100644
index 0000000..3d7209d
--- /dev/null
+++ b/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/cluster/TioServerMessageConfig.java
@@ -0,0 +1,16 @@
+package com.zt.plat.module.qms.iot.tcpserver.cluster;
+
+public class TioServerMessageConfig {
+
+ public static final String SERVER_CLUSTER_TOPIC_CHANNEL = "zzjc-cluster-tio-server";
+
+
+ public static final String MESSAGE_TYPE_CONTROL = "control"; //设备控制消息
+ public static final String MESSAGE_TYPE_CLEAR_MANUAL = "clearControlManual"; //手动清除控制
+ public static final String MESSAGE_TYPE_CLOSE_MANUAL = "closeManual"; //手动断开连接
+ public static final String MESSAGE_TYPE_NOTICE = "notice"; //通知消息。应该用不到。
+
+
+
+
+}
diff --git a/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/cluster/TioServerMessagePublisher.java b/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/cluster/TioServerMessagePublisher.java
new file mode 100644
index 0000000..c71643f
--- /dev/null
+++ b/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/cluster/TioServerMessagePublisher.java
@@ -0,0 +1,33 @@
+package com.zt.plat.module.qms.iot.tcpserver.cluster;
+
+import lombok.extern.slf4j.Slf4j;
+import org.redisson.api.RTopic;
+import org.redisson.api.RedissonClient;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+import tech.zzjc.tio.server.ServerTioConfig;
+
+import static com.zt.plat.module.qms.iot.tcpserver.cluster.TioServerMessageConfig.SERVER_CLUSTER_TOPIC_CHANNEL;
+
+@Service
+@Slf4j
+public class TioServerMessagePublisher {
+
+ @Autowired
+ private RedissonClient redissonClient;
+ @Autowired
+ private InstanceIdProvider instanceIdProvider; // 注入实例ID提供者
+
+
+
+ public void publish(TioServerMessageVo message) {
+ message.setSenderInstanceId(instanceIdProvider.getInstanceId());
+ RTopic topic = redissonClient.getTopic(SERVER_CLUSTER_TOPIC_CHANNEL);
+ topic.publish(message);
+ log.error("[Publisher] 已发布消息: " + message);
+
+
+ }
+
+
+}
diff --git a/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/cluster/TioServerMessageSubscriber.java b/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/cluster/TioServerMessageSubscriber.java
new file mode 100644
index 0000000..2105abd
--- /dev/null
+++ b/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/cluster/TioServerMessageSubscriber.java
@@ -0,0 +1,91 @@
+package com.zt.plat.module.qms.iot.tcpserver.cluster;
+
+import com.alibaba.fastjson.JSONObject;
+import com.zt.plat.module.qms.iot.service.IotConnectManagerStatsService;
+import com.zt.plat.module.qms.iot.tcpserver.IotPacket;
+import com.zt.plat.module.qms.iot.tcpserver.handler.IotDataHander;
+import jakarta.annotation.PostConstruct;
+import lombok.extern.slf4j.Slf4j;
+import org.redisson.api.RTopic;
+import org.redisson.api.RedissonClient;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+import tech.zzjc.tio.core.ChannelContext;
+import tech.zzjc.tio.core.Tio;
+import tech.zzjc.tio.core.TioConfig;
+import tech.zzjc.tio.server.ServerTioConfig;
+import tech.zzjc.tio.server.TioServer;
+import tech.zzjc.tio.starter.TioServerBootstrap;
+import tech.zzjc.tio.utils.lock.SetWithLock;
+
+import static com.zt.plat.module.qms.iot.tcpserver.cluster.TioServerMessageConfig.SERVER_CLUSTER_TOPIC_CHANNEL;
+
+@Component
+@Slf4j
+public class TioServerMessageSubscriber {
+
+ @Autowired
+ private RedissonClient redissonClient;
+ @Autowired
+ private InstanceIdProvider instanceIdProvider;
+
+ @Autowired
+ private IotDataHander iotCaaClientControlDeviceHander;
+
+ @Autowired private TioServerBootstrap tioServerBootstrap;
+ @Autowired private IotConnectManagerStatsService iotConnectManagerStatsService;
+
+ @PostConstruct
+ public void subscribe() {
+ RTopic topic = redissonClient.getTopic(SERVER_CLUSTER_TOPIC_CHANNEL);
+ topic.addListener(TioServerMessageVo.class, (channel, msg) -> {
+ String senderId = msg.getSenderInstanceId();
+ String channelId = msg.getChannelId();
+ String msgType = msg.getServerClusterMsgType();
+ String currentId = instanceIdProvider.getInstanceId();
+ if (currentId.equals(senderId)) {
+ log.debug("[Subscriber] 跳过自己发送的消息");
+ return;
+ }
+ log.debug("[Subscriber] 收到消息频道 '" + channel + "' 的内容: " + msg);
+
+ if(TioServerMessageConfig.MESSAGE_TYPE_CLEAR_MANUAL.equals(msgType)){
+ try {
+ iotConnectManagerStatsService.clearDeviceControl(channelId,"1");
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ return;
+ }
+ if(TioServerMessageConfig.MESSAGE_TYPE_CLOSE_MANUAL.equals(msgType)){
+ try {
+ iotConnectManagerStatsService.close(channelId,"1");
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ return;
+ }
+
+ ServerTioConfig serverTioConfig = tioServerBootstrap.getServerTioConfig();
+ SetWithLock all = Tio.getAll(serverTioConfig);
+ ChannelContext targetContext = null;
+ for(ChannelContext context : all.getObj()){
+ targetContext = context;
+ String id = context.getId();
+ if(id.equals(channelId)){
+ break;
+ }
+ }
+ IotPacket packet = (IotPacket) msg.getPacket();
+ byte[] body = packet.getBody();
+ String bodyStr = new String(body);
+ JSONObject json = JSONObject.parseObject(bodyStr);
+ try {
+ iotCaaClientControlDeviceHander.hander(json.getString("msgId"), bodyStr, targetContext);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+
+}
diff --git a/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/cluster/TioServerMessageVo.java b/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/cluster/TioServerMessageVo.java
new file mode 100644
index 0000000..aab434c
--- /dev/null
+++ b/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/cluster/TioServerMessageVo.java
@@ -0,0 +1,18 @@
+package com.zt.plat.module.qms.iot.tcpserver.cluster;
+
+import lombok.Data;
+import tech.zzjc.tio.cluster.TioClusterVo;
+import tech.zzjc.tio.core.ChannelContext;
+
+@Data
+public class TioServerMessageVo extends TioClusterVo {
+
+ private String senderInstanceId; //发布者id
+
+
+ private String serverClusterMsgType; //消息类型: 设备
+
+
+ private ChannelContext channelContext;
+
+}
diff --git a/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/consoleclient/IotConsoleClientCheckControlHander.java b/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/consoleclient/IotConsoleClientCheckControlHander.java
index 06f1d80..5c8cfc8 100644
--- a/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/consoleclient/IotConsoleClientCheckControlHander.java
+++ b/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/consoleclient/IotConsoleClientCheckControlHander.java
@@ -31,7 +31,7 @@ public class IotConsoleClientCheckControlHander implements IotDataHander {
String realClientPort = channelContext.get("realClientPort") == null ? channelContext.getClientNode().getPort() + "" : channelContext.get("realClientPort").toString();
String realClient = realClientIp + ":" + realClientPort;
log.info("{},检查控制点由谁控制", realClient);
- String userId = channelContext.getToken();
+ String token = channelContext.getToken();
IotPacket resppacket = null;
// List userMeasurePointList = userMeasurePointService.list(Wrappers.query().lambda().eq(UserMeasurePoint::getUserId, Func.toLong(userId)));
// for (UserMeasurePoint userMeasurePoint : userMeasurePointList) {
diff --git a/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/core/IotClientSessionContext.java b/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/core/IotClientSessionContext.java
index 0a55bf1..dd5c833 100644
--- a/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/core/IotClientSessionContext.java
+++ b/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/core/IotClientSessionContext.java
@@ -1,5 +1,6 @@
package com.zt.plat.module.qms.iot.tcpserver.core;
+import lombok.Data;
import tech.zzjc.tio.core.ChannelContext;
import java.io.Serializable;
@@ -15,6 +16,7 @@ import java.io.Serializable;
* @version V1.0
* @since 2021-6-28
*/
+@Data
public class IotClientSessionContext implements Serializable {
private static final long serialVersionUID = 8738456547503650108L;
@@ -31,29 +33,5 @@ public class IotClientSessionContext implements Serializable {
/** 当前控制计量点的ChannelContext **/
private ChannelContext controlConsoleClientChannelContext;
- public String getClientType() {
- return clientType;
- }
-
- public void setClientType(String clientType) {
- this.clientType = clientType;
- }
-
- public MeasurePoint getMeasurePoint() {
- return measurePoint;
- }
-
- public void setMeasurePoint(MeasurePoint measurePoint) {
- this.measurePoint = measurePoint;
- }
-
- public ChannelContext getControlConsoleClientChannelContext() {
- return controlConsoleClientChannelContext;
- }
-
- public void setControlConsoleClientChannelContext(ChannelContext controlConsoleClientChannelContext) {
- this.controlConsoleClientChannelContext = controlConsoleClientChannelContext;
- }
-
}
diff --git a/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/core/IotClientSessionForRedisContext.java b/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/core/IotClientSessionForRedisContext.java
new file mode 100644
index 0000000..6187bce
--- /dev/null
+++ b/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/core/IotClientSessionForRedisContext.java
@@ -0,0 +1,120 @@
+package com.zt.plat.module.qms.iot.tcpserver.core;
+
+import lombok.Data;
+
+import java.util.Date;
+import java.util.concurrent.atomic.AtomicLong;
+
+@Data
+public class IotClientSessionForRedisContext extends IotClientSessionContext{
+ private static final long serialVersionUID = 1L;
+
+ private String channelId;
+
+ private Date regTime; //注册时间
+ private Date expireTime; //过期时间
+ private Date lastCommunicateTime; //最近一次通信时间
+ private String tenantId; //租户ID
+ private String controlRealName;
+
+ /**
+ * 客户端IP
+ */
+ private String clientIp;
+
+ /**
+ * 端口
+ */
+ private int clientPort;
+
+ /**
+ * 协议
+ */
+ private String clientProtocol;
+
+ /**
+ * 代理ip
+ */
+ private String proxyIp;
+
+ /**
+ * 代理端口
+ */
+ private int proxyPort;
+
+ /**
+ * 代理协议
+ */
+ private String proxyProtocol;
+
+ /**
+ * 连接类型
+ */
+ private String connectType;
+
+ /**
+ * 解码失败数
+ */
+ private int decodeFailCount;
+
+ /**
+ * 用户ID
+ */
+ private String userId;
+
+ /**
+ * 用户token
+ */
+ private String token;
+
+
+ /**
+ * 统计开始时间
+ */
+ private Date gmtCreate;
+
+ /**
+ * 已接收字节数
+ */
+ private AtomicLong receivedBytes;
+
+ /**
+ * 已接收业务包
+ */
+ private AtomicLong receivedPackets;
+
+ /**
+ * 已接收TCP包
+ */
+ private AtomicLong receivedTcps;
+
+ /**
+ * 平均每个TCP包接收的字节数
+ */
+ private double bytesPerTcpReceive;
+
+ /**
+ * 平均每个TCP包接收的业务包
+ */
+ private double packetsPerTcpReceive;
+
+ /**
+ * 已处理的字节数
+ */
+ private AtomicLong handledBytes;
+
+ /**
+ * 已处理的业务包
+ */
+ private AtomicLong handledPackets;
+
+ /**
+ * 已发送的字节数
+ */
+ private AtomicLong sentBytes;
+
+ /**
+ * 已发送的业务包
+ */
+ private AtomicLong sentPackets;
+}
diff --git a/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/core/IotDeviceSessionContext.java b/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/core/IotDeviceSessionContext.java
index d02d7cc..7b3676d 100644
--- a/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/core/IotDeviceSessionContext.java
+++ b/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/core/IotDeviceSessionContext.java
@@ -1,5 +1,6 @@
package com.zt.plat.module.qms.iot.tcpserver.core;
+import lombok.Data;
import tech.zzjc.tio.core.ChannelContext;
import java.io.Serializable;
@@ -16,6 +17,7 @@ import java.util.List;
* @version V1.0
* @since 2021-6-28
*/
+@Data
public class IotDeviceSessionContext implements Serializable {
private static final long serialVersionUID = 9035659677475646771L;
@@ -44,61 +46,6 @@ public class IotDeviceSessionContext implements Serializable {
/** 控制的ChannelContext **/
private ChannelContext controlChannelContext;
- public String getDeviceId() {
- return deviceId;
- }
-
- public void setDeviceId(String deviceId) {
- this.deviceId = deviceId;
- }
-
- public String getDeviceCode() {
- return deviceCode;
- }
-
- public void setDeviceCode(String deviceCode) {
- this.deviceCode = deviceCode;
- }
-
- public String getDeviceType() {
- return deviceType;
- }
-
- public void setDeviceType(String deviceType) {
- this.deviceType = deviceType;
- }
-
- public String getDeviceName() {
- return deviceName;
- }
-
- public void setDeviceName(String deviceName) {
- this.deviceName = deviceName;
- }
-
- public List getWeightValueList() {
- return weightValueList;
- }
-
- public void setWeightValueList(List weightValueList) {
- this.weightValueList = weightValueList;
- }
-
- public String getControlRealName() {
- return controlRealName;
- }
-
- public void setControlRealName(String controlRealName) {
- this.controlRealName = controlRealName;
- }
-
- public ChannelContext getControlChannelContext() {
- return controlChannelContext;
- }
-
- public void setControlChannelContext(ChannelContext controlChannelContext) {
- this.controlChannelContext = controlChannelContext;
- }
}
diff --git a/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/core/IotDeviceSessionForRedisContext.java b/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/core/IotDeviceSessionForRedisContext.java
index 13acd6d..62fd3c4 100644
--- a/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/core/IotDeviceSessionForRedisContext.java
+++ b/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/core/IotDeviceSessionForRedisContext.java
@@ -1,55 +1,123 @@
package com.zt.plat.module.qms.iot.tcpserver.core;
-import java.util.Date;
+import lombok.Data;
+import tech.zzjc.tio.core.ChannelContext;
+import java.util.Date;
+import java.util.concurrent.atomic.AtomicLong;
+
+@Data
public class IotDeviceSessionForRedisContext extends IotDeviceSessionContext{
private static final long serialVersionUID = 1L;
+ private String channelId;
+
private Date regTime; //注册时间
private Date expireTime; //过期时间
private String balanceWeightUnit; //天平重量单位
private Long lastBalanceDataTime; //天平最后数据时间
-
private String tenantId; //租户ID
- public Date getRegTime() {
- return regTime;
- }
+ /**
+ * 客户端IP
+ */
+ private String clientIp;
- public void setRegTime(Date regTime) {
- this.regTime = regTime;
- }
+ /**
+ * 端口
+ */
+ private int clientPort;
- public Date getExpireTime() {
- return expireTime;
- }
+ /**
+ * 协议
+ */
+ private String clientProtocol;
- public void setExpireTime(Date expireTime) {
- this.expireTime = expireTime;
- }
+ /**
+ * 代理ip
+ */
+ private String proxyIp;
- public String getBalanceWeightUnit() {
- return balanceWeightUnit;
- }
+ /**
+ * 代理端口
+ */
+ private int proxyPort;
- public void setBalanceWeightUnit(String balanceWeightUnit) {
- this.balanceWeightUnit = balanceWeightUnit;
- }
+ /**
+ * 代理协议
+ */
+ private String proxyProtocol;
- public Long getLastBalanceDataTime() {
- return lastBalanceDataTime;
- }
+ /**
+ * 连接类型
+ */
+ private String connectType;
- public void setLastBalanceDataTime(Long lastBalanceDataTime) {
- this.lastBalanceDataTime = lastBalanceDataTime;
- }
+ /**
+ * 解码失败数
+ */
+ private int decodeFailCount;
- public String getTenantId() {
- return tenantId;
- }
+ /**
+ * 用户ID
+ */
+ private String userId;
+
+ /**
+ * 用户token
+ */
+ private String token;
+
+
+ /**
+ * 统计开始时间
+ */
+ private Date gmtCreate;
+
+ /**
+ * 已接收字节数
+ */
+ private AtomicLong receivedBytes;
+
+ /**
+ * 已接收业务包
+ */
+ private AtomicLong receivedPackets;
+
+ /**
+ * 已接收TCP包
+ */
+ private AtomicLong receivedTcps;
+
+ /**
+ * 平均每个TCP包接收的字节数
+ */
+ private double bytesPerTcpReceive;
+
+ /**
+ * 平均每个TCP包接收的业务包
+ */
+ private double packetsPerTcpReceive;
+
+ /**
+ * 已处理的字节数
+ */
+ private AtomicLong handledBytes;
+
+ /**
+ * 已处理的业务包
+ */
+ private AtomicLong handledPackets;
+
+ /**
+ * 已发送的字节数
+ */
+ private AtomicLong sentBytes;
+
+ /**
+ * 已发送的业务包
+ */
+ private AtomicLong sentPackets;
- public void setTenantId(String tenantId) {
- this.tenantId = tenantId;
- }
}
diff --git a/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/device/RedisSessionComponent.java b/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/device/RedisSessionComponent.java
index 190b27e..89a0fc4 100644
--- a/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/device/RedisSessionComponent.java
+++ b/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/device/RedisSessionComponent.java
@@ -3,19 +3,27 @@ package com.zt.plat.module.qms.iot.tcpserver.device;
import com.zt.plat.framework.tenant.core.context.TenantContextHolder;
import com.zt.plat.module.qms.iot.tcpserver.IotUtils;
import com.zt.plat.module.qms.iot.tcpserver.core.ChannelContextConstant;
+import com.zt.plat.module.qms.iot.tcpserver.core.IotClientSessionForRedisContext;
import com.zt.plat.module.qms.iot.tcpserver.core.IotDeviceSessionForRedisContext;
import com.zt.plat.module.qms.iot.tcpserver.publisher.IotDeviceRegisterPublisher;
import com.zt.plat.module.qms.resource.device.dal.dataobject.DeviceInfomationDO;
import com.zt.plat.module.qms.resource.device.service.DeviceInfomationService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.data.redis.core.Cursor;
+import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
+import org.springframework.data.redis.core.ScanOptions;
import org.springframework.stereotype.Service;
import tech.zzjc.tio.core.ChannelContext;
import tech.zzjc.tio.core.Tio;
+import tech.zzjc.tio.core.stat.ChannelStat;
+import tech.zzjc.tio.utils.SystemTimer;
import java.time.LocalDateTime;
import java.util.Date;
+import java.util.HashSet;
+import java.util.Set;
import java.util.concurrent.TimeUnit;
@Service
@@ -51,16 +59,25 @@ public class RedisSessionComponent {
}
public void deleteByDeviceId(String deviceId){
- redisTemplate.delete(getRedisKeyByDeviceCode(deviceId));
+ redisTemplate.delete(getRedisKeyByDeviceId(deviceId));
}
public IotDeviceSessionForRedisContext getByDeviceCode(String deviceCode){
- String deviceId = IotUtils.transDeviceCodeToId(deviceCode);
- return getByDeviceId(deviceId);
+ String redisKey = getRedisKeyByDeviceCode(deviceCode);
+ return getByKey(redisKey);
}
public IotDeviceSessionForRedisContext getByDeviceId(String deviceId){
- return (IotDeviceSessionForRedisContext) redisTemplate.opsForValue().get(getRedisKeyByDeviceId(deviceId));
+ String redisKey = getRedisKeyByDeviceId(deviceId);
+ return getByKey(redisKey);
+// return (IotDeviceSessionForRedisContext) redisTemplate.opsForValue().get(getRedisKeyByDeviceId(deviceId));
+ }
+ public IotDeviceSessionForRedisContext getByKey(String key){
+ return (IotDeviceSessionForRedisContext) redisTemplate.opsForValue().get(key);
+ }
+
+ public IotClientSessionForRedisContext getClientByKey(String key){
+ return (IotClientSessionForRedisContext) redisTemplate.opsForValue().get(key);
}
public String getRedisKeyByDeviceCode(String deviceCode){
@@ -73,8 +90,8 @@ public class RedisSessionComponent {
}
public void regDevice(ChannelContext channelContext, String deviceCode, String deviceType){
- IotDeviceSessionForRedisContext deviceContext = getByDeviceCode(deviceCode);
- if(deviceContext != null && Tio.getByBsId(channelContext.getTioConfig(), deviceCode) != null){
+ IotDeviceSessionForRedisContext clientContext = getByDeviceCode(deviceCode);
+ if(clientContext != null && Tio.getByBsId(channelContext.getTioConfig(), deviceCode) != null){
return;
}
@@ -83,6 +100,7 @@ public class RedisSessionComponent {
String deviceId = IotUtils.transDeviceCodeToId(deviceCode);
DeviceInfomationDO deviceInfo = deviceInfomationService.getDeviceInfomation(Long.valueOf(deviceId));
IotDeviceSessionForRedisContext context = new IotDeviceSessionForRedisContext();
+ context.setChannelId(channelContext.getId());
context.setDeviceId(deviceId);
context.setDeviceType(deviceType);
context.setDeviceCode(deviceCode);
@@ -111,5 +129,109 @@ public class RedisSessionComponent {
}
+ public void regClient(ChannelContext channelContext){
+ String channelId = channelContext.getId();
+ IotClientSessionForRedisContext context = getByClientChannelId(channelId);
+ if(context != null){
+ return;
+ }
+
+ TenantContextHolder.setIgnore(true);
+ try{
+ context = new IotClientSessionForRedisContext();
+
+ String realClientIp = channelContext.get("realClientIp") == null ? channelContext.getClientNode().getIp() : channelContext.get("realClientIp").toString();
+ String realClientPort = channelContext.get("realClientPort") == null ? channelContext.getClientNode().getPort() + "" : channelContext.get("realClientPort").toString();
+ String connectType = channelContext.get(ChannelContextConstant.CONNECT_TYPE) == null ? "" : channelContext.get(ChannelContextConstant.CONNECT_TYPE).toString();
+ String userName = channelContext.get("user-real-name") == null ? "无" : channelContext.get("user-real-name").toString();
+// ChannelContext clientChanelContext = Tio.getByBsId(channelContext.getTioConfig(), deviceCode);
+// if(clientChanelContext != null){
+// context.setClientProtocol(clientChanelContext.getClientNode().getProtocol());
+// context.setProxyIp(clientChanelContext.getClientNode().getIp());
+// context.setProxyPort(clientChanelContext.getClientNode().getPort());
+// context.setProxyProtocol(clientChanelContext.getClientNode().getProtocol());
+// }
+ context.setChannelId(channelId);
+ context.setControlRealName(userName);
+ context.setClientIp(realClientIp);
+ context.setClientPort(Integer.parseInt(realClientPort));
+ context.setConnectType(connectType);
+
+ updateClientCommonAttrs(context, channelContext);
+
+ String redisKey = getClientRedisKeyByChannelId(channelId);
+ redisTemplate.opsForValue().set(redisKey, context, timeoutSeconds, TimeUnit.SECONDS);
+ //todo 客户端注册事件
+// IotDeviceRegisterPublisher.publishEvent(deviceCode, deviceType);
+ }catch (Exception e){
+ e.printStackTrace();
+ log.error("设备注册异常:{}", e.getMessage());
+ }finally {
+ TenantContextHolder.clear();
+ }
+
+ }
+
+
+ public void updateClient(IotClientSessionForRedisContext context, ChannelContext channelContext){
+ String channelId = channelContext.getId();
+ updateClientCommonAttrs(context, channelContext);
+ String redisKey = getClientRedisKeyByChannelId(channelId);
+ redisTemplate.opsForValue().set(redisKey, context, timeoutSeconds, TimeUnit.SECONDS);
+ }
+
+ private void updateClientCommonAttrs(IotClientSessionForRedisContext context, ChannelContext channelContext){
+ ChannelStat stat = channelContext.stat;
+ context.setGmtCreate(new Date(stat.getTimeCreated()));
+ context.setDecodeFailCount(stat.getDecodeFailCount());
+ context.setReceivedBytes(stat.getReceivedBytes());
+ context.setReceivedPackets(stat.getReceivedPackets());
+ context.setReceivedTcps(stat.getReceivedTcps());
+ context.setBytesPerTcpReceive(stat.getBytesPerTcpReceive());
+ context.setPacketsPerTcpReceive(stat.getPacketsPerTcpReceive());
+ context.setHandledBytes(stat.getHandledBytes());
+ context.setHandledPackets(stat.getHandledPackets());
+ context.setSentBytes(stat.getSentBytes());
+ context.setSentPackets(stat.getSentPackets());
+ context.setLastCommunicateTime(new Date());
+ }
+
+ public String getClientRedisKeyByChannelId(String channelId){
+ String redisKey = IotClientSessionForRedisContext.DEFAULT_CLIENT_SESSION_CONTEXT_KEY + "-" + channelId;
+ return redisKey;
+ }
+
+
+ public IotClientSessionForRedisContext getByClientChannelId(String channelId){
+ String redisKey = getClientRedisKeyByChannelId(channelId);
+ return getClientByKey(redisKey);
+ }
+
+
+ /**
+ * 使用Scan方式查询key(推荐用于大数据量)
+ * 避免使用KEYS命令导致Redis阻塞
+ */
+ public Set scanKeys(String matchPattern) {
+ Set keys = new HashSet<>();
+ try {
+ redisTemplate.execute((RedisCallback) connection -> {
+ try (Cursor cursor = connection.scan(
+ ScanOptions.scanOptions()
+ .match(matchPattern)
+ .count(1000) // 每次扫描的数量
+ .build())) {
+ while (cursor.hasNext()) {
+ keys.add(new String(cursor.next()));
+ }
+ }
+ return null;
+ });
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ return keys;
+ }
+
}
diff --git a/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/device/handler/IotDeviceBalanceHandler.java b/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/device/handler/IotDeviceBalanceHandler.java
index 75c8bc9..b4da687 100644
--- a/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/device/handler/IotDeviceBalanceHandler.java
+++ b/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/device/handler/IotDeviceBalanceHandler.java
@@ -1,14 +1,11 @@
package com.zt.plat.module.qms.iot.tcpserver.device.handler;
-import com.zt.plat.module.qms.iot.tcpserver.core.IotDeviceSessionForRedisContext;
+import com.zt.plat.module.qms.iot.tcpserver.core.*;
import com.zt.plat.module.qms.iot.tcpserver.device.RedisSessionComponent;
import com.alibaba.fastjson.JSONObject;
import com.zt.plat.module.qms.iot.tcpserver.IotDeviceType;
import com.zt.plat.module.qms.iot.tcpserver.IotPacket;
import com.zt.plat.module.qms.iot.tcpserver.IotUtils;
-import com.zt.plat.module.qms.iot.tcpserver.core.ClientType;
-import com.zt.plat.module.qms.iot.tcpserver.core.Command;
-import com.zt.plat.module.qms.iot.tcpserver.core.IotDeviceSessionContext;
import com.zt.plat.module.qms.iot.tcpserver.handler.IotDataHander;
import com.zt.plat.module.qms.iot.tcpserver.publisher.BalanceDataPublisher;
import com.fhs.common.spring.SpringContextUtil;
@@ -21,12 +18,14 @@ import tech.zzjc.tio.cluster.TioClusterVo;
import tech.zzjc.tio.core.ChannelContext;
import tech.zzjc.tio.core.Tio;
import tech.zzjc.tio.core.TioConfig;
+import tech.zzjc.tio.core.stat.ChannelStat;
import tech.zzjc.tio.utils.SystemTimer;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.Date;
import java.util.List;
import static com.zt.plat.module.qms.iot.tcpserver.core.GroupConstant.CAA_ALL_CLIENT;
@@ -52,8 +51,10 @@ public class IotDeviceBalanceHandler implements IotDataHander {
@Override
public Object hander(String msgId, String data, ChannelContext channelContext) throws Exception {
//获取客户端连接信息
+ ChannelStat stat = channelContext.stat;
String realClientIp = channelContext.get("realClientIp") == null ? channelContext.getClientNode().getIp() : channelContext.get("realClientIp").toString();
String realClientPort = channelContext.get("realClientPort") == null ? channelContext.getClientNode().getPort() + "" : channelContext.get("realClientPort").toString();
+ String connectType = channelContext.get(ChannelContextConstant.CONNECT_TYPE) == null ? "" : channelContext.get(ChannelContextConstant.CONNECT_TYPE).toString();
String realClient = realClientIp + ":" + realClientPort;
//通过sessionContext传入设备ID
IotDeviceSessionContext sessionContext = (IotDeviceSessionContext) channelContext.get(IotDeviceSessionContext.DEFAULT_DEVICE_SESSION_CONTEXT_KEY);
@@ -111,7 +112,30 @@ public class IotDeviceBalanceHandler implements IotDataHander {
weightUnit = deviceContext.getBalanceWeightUnit() == null ? "" :deviceContext.getBalanceWeightUnit();
}
- //设置末次天平正确数据时间
+ //更新deviceContext信息
+ ChannelContext deviceChannelContext = Tio.getByBsId(channelContext.getTioConfig(), deviceCode);
+ if(deviceChannelContext != null){
+ deviceContext.setClientProtocol(deviceChannelContext.getClientNode().getProtocol());
+ deviceContext.setProxyIp(deviceChannelContext.getClientNode().getIp());
+ deviceContext.setProxyPort(deviceChannelContext.getClientNode().getPort());
+ deviceContext.setProxyProtocol(deviceChannelContext.getClientNode().getProtocol());
+ }
+ deviceContext.setClientIp(realClientIp);
+ deviceContext.setClientPort(Integer.parseInt(realClientPort));
+ deviceContext.setConnectType(connectType);
+ deviceContext.setDecodeFailCount(stat.getDecodeFailCount());
+ deviceContext.setGmtCreate(new Date(stat.getTimeCreated()));
+ deviceContext.setReceivedBytes(stat.getReceivedBytes());
+ deviceContext.setReceivedPackets(stat.getReceivedPackets());
+ deviceContext.setReceivedTcps(stat.getReceivedTcps());
+ deviceContext.setBytesPerTcpReceive(stat.getBytesPerTcpReceive());
+ deviceContext.setPacketsPerTcpReceive(stat.getPacketsPerTcpReceive());
+ deviceContext.setHandledBytes(stat.getHandledBytes());
+ deviceContext.setHandledPackets(stat.getHandledPackets());
+ deviceContext.setSentBytes(stat.getSentBytes());
+ deviceContext.setSentPackets(stat.getSentPackets());
+
+
deviceContext.setLastBalanceDataTime(SystemTimer.currTime);
redisSessionComponent.update(deviceContext);
//租户ID
@@ -140,7 +164,8 @@ public class IotDeviceBalanceHandler implements IotDataHander {
TioClusterConfig tioClusterConfig = tioConfig.getTioClusterConfig();
TioClusterVo iotClusterVo = new TioClusterVo(packet);
iotClusterVo.setGroup(CAA_ALL_CLIENT);
- tioClusterConfig.publish(iotClusterVo);
+// if(!"0.00".equals(weightData))
+ tioClusterConfig.publish(iotClusterVo);
return null;
}
diff --git a/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/handler/IotClientHandler.java b/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/handler/IotClientHandler.java
index c6d0786..dbbf0d4 100644
--- a/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/handler/IotClientHandler.java
+++ b/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/handler/IotClientHandler.java
@@ -10,6 +10,8 @@ import com.zt.plat.module.qms.iot.tcpserver.consoleclient.IotConsoleClientCheckC
import com.zt.plat.module.qms.iot.tcpserver.consoleclient.IotConsoleClientRegisterHander;
import com.zt.plat.module.qms.iot.tcpserver.core.ChannelContextConstant;
import com.zt.plat.module.qms.iot.tcpserver.core.Command;
+import com.zt.plat.module.qms.iot.tcpserver.core.IotClientSessionForRedisContext;
+import com.zt.plat.module.qms.iot.tcpserver.device.RedisSessionComponent;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@@ -42,6 +44,7 @@ public class IotClientHandler implements IotHander {
@Autowired
private IotCaaClientControlDeviceHander iotCaaClientControlDeviceHander;
+ @Autowired public RedisSessionComponent redisSessionComponent;
public IotClientHandler() {
}
@@ -59,6 +62,8 @@ public class IotClientHandler implements IotHander {
String cmd = jsonObject.getString("cmd");
Command command = Command.getByName(cmd);
IotDataHander iotDataHander = null;
+ String channelId = channelContext.getId();
+ IotClientSessionForRedisContext iotClientSessionForRedisContext = redisSessionComponent.getByClientChannelId(channelId);
if ("consoleClient".equals(clientType)) {
channelContext.set(ChannelContextConstant.CONNECT_TYPE, ChannelContextConstant.CONNECT_CONSOLE_CLIENT);
switch (command) {
@@ -89,6 +94,13 @@ public class IotClientHandler implements IotHander {
log.error("{}, 找不到处理类, 客户端消息ID:{},客户端类型:{},命令:{}", realClient, msgId, clientType, cmd);
return null;
}
+
+ if(iotClientSessionForRedisContext == null){
+ redisSessionComponent.regClient(channelContext);
+ }else{
+ redisSessionComponent.updateClient(iotClientSessionForRedisContext, channelContext);
+ }
+
iotDataHander.hander(msgId, jsonObject.getString("data"), channelContext);
return null;
}
diff --git a/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/handler/IotDeviceHandler.java b/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/handler/IotDeviceHandler.java
index 8ffcd43..fae14e1 100644
--- a/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/handler/IotDeviceHandler.java
+++ b/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/handler/IotDeviceHandler.java
@@ -53,7 +53,8 @@ public class IotDeviceHandler implements IotHander {
String realClientPort = channelContext.get("realClientPort") == null ? channelContext.getClientNode().getPort() + "" : channelContext.get("realClientPort").toString();
String realClient = realClientIp + ":" + realClientPort + ", bsId=" + channelContext.getBsId();
String text = new String(iotPacket.getBody(), IotPacket.CHARSET);
-// log.info("{}, 接收到数据:{}", realClient, text);
+// if(!text.contains("0.00"))
+// log.info("{}, 接收到数据:{}", realClient, text);
String[] infos = text.split("\\$");
String reginfo = infos[0];
String[] regs = reginfo.split("-");
@@ -102,7 +103,6 @@ public class IotDeviceHandler implements IotHander {
sessionContext.setDeviceId(IotUtils.transDeviceCodeToId(deviceCode));
return null;
}
-
if(deviceContext == null || Tio.getByBsId(channelContext.getTioConfig(), deviceCode) == null){
redisSessionComponent.regDevice(channelContext, deviceCode, deviceType);
}else{