From 536f971e8e772b3fc76fbe834b6e80378ef76519 Mon Sep 17 00:00:00 2001 From: FCL Date: Tue, 4 Nov 2025 18:05:11 +0800 Subject: [PATCH] =?UTF-8?q?feat:tio=E9=9B=86=E7=BE=A4=E9=80=BB=E8=BE=91?= =?UTF-8?q?=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../IotConnectManagerStatsController.java | 427 +++++++++++------- .../IotConnectManagerStatsService.java | 80 ++++ .../iot/tcpserver/IotServerAioListener.java | 1 - .../IotCaaClientControlDeviceHander.java | 65 ++- .../caaclient/IotCaaClientRegisterHander.java | 18 +- .../caaclient/bean/ControlDevice.java | 6 +- .../tcpserver/cluster/InstanceIdProvider.java | 20 + .../cluster/TioServerMessageConfig.java | 16 + .../cluster/TioServerMessagePublisher.java | 33 ++ .../cluster/TioServerMessageSubscriber.java | 91 ++++ .../tcpserver/cluster/TioServerMessageVo.java | 18 + .../IotConsoleClientCheckControlHander.java | 2 +- .../core/IotClientSessionContext.java | 26 +- .../core/IotClientSessionForRedisContext.java | 120 +++++ .../core/IotDeviceSessionContext.java | 57 +-- .../core/IotDeviceSessionForRedisContext.java | 132 ++++-- .../device/RedisSessionComponent.java | 134 +++++- .../handler/IotDeviceBalanceHandler.java | 37 +- .../tcpserver/handler/IotClientHandler.java | 12 + .../tcpserver/handler/IotDeviceHandler.java | 4 +- 20 files changed, 986 insertions(+), 313 deletions(-) create mode 100644 zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/service/IotConnectManagerStatsService.java create mode 100644 zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/cluster/InstanceIdProvider.java create mode 100644 zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/cluster/TioServerMessageConfig.java create mode 100644 zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/cluster/TioServerMessagePublisher.java create mode 100644 zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/cluster/TioServerMessageSubscriber.java create mode 100644 zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/cluster/TioServerMessageVo.java create mode 100644 zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/core/IotClientSessionForRedisContext.java diff --git a/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/controller/admin/IotConnectManagerStatsController.java b/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/controller/admin/IotConnectManagerStatsController.java index 5c02fd2..4a52207 100644 --- a/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/controller/admin/IotConnectManagerStatsController.java +++ b/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/controller/admin/IotConnectManagerStatsController.java @@ -1,10 +1,13 @@ package com.zt.plat.module.qms.iot.controller.admin; +import com.alibaba.fastjson.JSONObject; import com.zt.plat.framework.common.pojo.CommonResult; -import com.zt.plat.module.qms.iot.tcpserver.core.ChannelContextConstant; -import com.zt.plat.module.qms.iot.tcpserver.core.IotDeviceSessionContext; +import com.zt.plat.module.qms.iot.service.IotConnectManagerStatsService; +import com.zt.plat.module.qms.iot.tcpserver.cluster.TioServerMessageConfig; +import com.zt.plat.module.qms.iot.tcpserver.cluster.TioServerMessagePublisher; +import com.zt.plat.module.qms.iot.tcpserver.cluster.TioServerMessageVo; +import com.zt.plat.module.qms.iot.tcpserver.core.*; import com.github.xingfudeshi.knife4j.annotations.ApiOperationSupport; -import com.zt.plat.module.qms.iot.tcpserver.core.IotDeviceSessionForRedisContext; import com.zt.plat.module.qms.iot.tcpserver.device.RedisSessionComponent; import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.tags.Tag; @@ -13,9 +16,9 @@ import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.core.env.Environment; import org.springframework.util.ObjectUtils; import org.springframework.web.bind.annotation.*; -import com.zt.plat.module.qms.iot.tcpserver.core.ChannelStatInfo; import tech.zzjc.tio.core.ChannelContext; import tech.zzjc.tio.core.Tio; import tech.zzjc.tio.core.stat.ChannelStat; @@ -27,6 +30,8 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.stream.Collectors; +import static java.lang.Thread.sleep; + /** * IotManagerStatsController * iot 管理统计 @@ -47,151 +52,255 @@ public class IotConnectManagerStatsController { @Autowired private TioServerBootstrap tioServerBootstrap; @Autowired public RedisSessionComponent redisSessionComponent; - + @Autowired private Environment environment; + @Autowired private TioServerMessagePublisher tioServerMessagePublisher; + @Autowired private IotConnectManagerStatsService iotConnectManagerStatsService; @GetMapping("/all-connect") @ApiOperationSupport(order = 1) @Operation(summary = "获取当前所有连接") public CommonResult currentAllConnect(@RequestParam(name = "clientIp", required = false) String clientIp) { + + //获取spring服务端口 + String serverPort = environment.getProperty("server.port"); + String serverName = environment.getProperty("spring.application.name"); + String msg = "当前服务端口:" + serverPort + ",服务名称:" + serverName; + log.error(msg); + List list = new ArrayList<>(); + Set deviceKeys = redisSessionComponent.scanKeys(IotDeviceSessionContext.DEFAULT_DEVICE_SESSION_CONTEXT_KEY + "*"); + Set clientKeys = redisSessionComponent.scanKeys(IotClientSessionContext.DEFAULT_CLIENT_SESSION_CONTEXT_KEY + "*"); + + for(String key : deviceKeys){ + IotDeviceSessionForRedisContext deviceContext = redisSessionComponent.getByKey( key); + String deviceId = deviceContext.getDeviceId(); + String bsId = Long.toString(Long.valueOf(deviceId), Character.MAX_RADIX); + ChannelStatInfo info = new ChannelStatInfo(); + info.setId(deviceContext.getChannelId()); +// info.setId(deviceContext.getDeviceCode()); + info.setBsId(bsId); + String controlRealName = ObjectUtils.isEmpty(deviceContext.getControlRealName())? "无" : deviceContext.getControlRealName(); + String deviceName = deviceContext.getDeviceName(); + String deviceCode = deviceContext.getDeviceCode(); +// info.setRealName(userRealName); +// info.setRemarks("操作人:" + userRealName); + info.setDeviceId(deviceContext.getDeviceId()); + info.setDeviceCode(deviceCode); + info.setDeviceType(deviceContext.getDeviceType()); + info.setDeviceName(deviceName); + StringBuilder sb = new StringBuilder(); + if (StringUtils.isNoneBlank(deviceName)) { + sb.append("设备名称:").append(deviceName).append(";"); + } else { + sb.append("注册号:").append(deviceCode).append(";"); + //deviceInfo.append("设备Id:").append(deviceId).append(";"); + } + sb.append("控制人:").append(controlRealName); + info.setRemarks(sb.toString()); + info.setClientIp(deviceContext.getClientIp()); + info.setClientPort(deviceContext.getClientPort()); + info.setClientProtocol(deviceContext.getClientProtocol()); + info.setProxyIp(deviceContext.getProxyIp()); + info.setProxyPort(deviceContext.getProxyPort()); + info.setProxyProtocol(deviceContext.getProxyProtocol()); + info.setConnectType(deviceContext.getConnectType()); + //info.setReConnCount(reconnCount); + info.setDecodeFailCount(deviceContext.getDecodeFailCount()); + info.setUserId(deviceContext.getUserId()); +// info.setToken(token); + info.setGmtCreate(deviceContext.getGmtCreate()); + info.setReceivedBytes(deviceContext.getReceivedBytes()); + info.setReceivedPackets(deviceContext.getReceivedPackets()); + info.setReceivedTcps(deviceContext.getReceivedTcps()); + info.setBytesPerTcpReceive(deviceContext.getBytesPerTcpReceive()); + info.setPacketsPerTcpReceive(deviceContext.getPacketsPerTcpReceive()); + info.setHandledBytes(deviceContext.getHandledBytes()); + info.setHandledPackets(deviceContext.getHandledPackets()); + info.setSentBytes(deviceContext.getSentBytes()); + info.setSentPackets(deviceContext.getSentPackets()); + if(deviceContext.getLastBalanceDataTime() != null) + info.setGmtLatestCommunicate(new Date(deviceContext.getLastBalanceDataTime())); + if(deviceContext.getRegTime() != null) + info.setRegTime(deviceContext.getRegTime()); + + list.add(info); + + } + + for(String key : clientKeys){ + IotClientSessionForRedisContext clientContext = redisSessionComponent.getClientByKey( key); + ChannelStatInfo info = new ChannelStatInfo(); + info.setId(clientContext.getChannelId()); + String controlRealName = ObjectUtils.isEmpty(clientContext.getControlRealName())? "无" : clientContext.getControlRealName(); +// info.setRealName(userRealName); +// info.setRemarks("操作人:" + userRealName); + StringBuilder sb = new StringBuilder(); + sb.append("操作人:").append(controlRealName); + info.setRemarks(sb.toString()); + info.setClientIp(clientContext.getClientIp()); + info.setClientPort(clientContext.getClientPort()); + info.setClientProtocol(clientContext.getClientProtocol()); + info.setProxyIp(clientContext.getProxyIp()); + info.setProxyPort(clientContext.getProxyPort()); + info.setProxyProtocol(clientContext.getProxyProtocol()); + info.setConnectType(clientContext.getConnectType()); + //info.setReConnCount(reconnCount); + info.setDecodeFailCount(clientContext.getDecodeFailCount()); + info.setUserId(clientContext.getUserId()); +// info.setToken(token); + info.setGmtCreate(clientContext.getGmtCreate()); + info.setReceivedBytes(clientContext.getReceivedBytes()); + info.setReceivedPackets(clientContext.getReceivedPackets()); + info.setReceivedTcps(clientContext.getReceivedTcps()); + info.setBytesPerTcpReceive(clientContext.getBytesPerTcpReceive()); + info.setPacketsPerTcpReceive(clientContext.getPacketsPerTcpReceive()); + info.setHandledBytes(clientContext.getHandledBytes()); + info.setHandledPackets(clientContext.getHandledPackets()); + info.setSentBytes(clientContext.getSentBytes()); + info.setSentPackets(clientContext.getSentPackets()); + info.setGmtLatestCommunicate(clientContext.getLastCommunicateTime()); + info.setRegTime(clientContext.getRegTime()); + + list.add(info); + + } + SetWithLock setWithLock = Tio.getAll(tioServerBootstrap.getServerTioConfig()); Set set = null; ReentrantReadWriteLock.ReadLock readLock = setWithLock.getLock().readLock(); - List list = null; - try { - readLock.lock(); - set = setWithLock.getObj(); - list = new ArrayList<>(); - for (ChannelContext channelContext : set) { - - ChannelStatInfo info = new ChannelStatInfo(); - ChannelStat stat = channelContext.stat; - - - String id = channelContext.getId(); - String bsId = channelContext.getBsId(); - Date regTime = channelContext.get("regTime") == null ? null : (Date) channelContext.get("regTime"); - String ip = channelContext.get("realClientIp") == null ? channelContext.getClientNode().getIp() : channelContext.get("realClientIp").toString(); // 客户端IP - int port = channelContext.get("realClientPort") == null ? channelContext.getClientNode().getPort() : Integer.parseInt(channelContext.get("realClientPort").toString()); // 客户端端口 - String protocol = channelContext.get("realClientProtocol") == null ? channelContext.getClientNode().getProtocol() : channelContext.get("realClientProtocol").toString();//协议 - String ipProxy = channelContext.getProxyClientNode() == null ? "" : channelContext.getProxyClientNode().getIp();//代理 ip - int portProxy = channelContext.getProxyClientNode() == null ? -1 : channelContext.getProxyClientNode().getPort();//代理 端口 - String protocolProxy = channelContext.getProxyClientNode() == null ? "" : channelContext.getProxyClientNode().getProtocol(); //代理协议 - - //连接类型 - String connectType = channelContext.get(ChannelContextConstant.CONNECT_TYPE) == null ? "" : channelContext.get(ChannelContextConstant.CONNECT_TYPE).toString(); - - switch (connectType) { - case ChannelContextConstant.CONNECT_CONSOLE_CLIENT://控制客户端 - String userRealName = channelContext.get("user-real-name") == null ? "无" : channelContext.get("user-real-name").toString(); - info.setRealName(userRealName); - info.setRemarks("操作人:" + userRealName); - break; - case ChannelContextConstant.CONNECT_CAA_CLIENT://平板端 - String caaUserRealName = channelContext.get("user-real-name") == null ? "无" : channelContext.get("user-real-name").toString(); - info.setRealName(caaUserRealName); - info.setRemarks("操作人:" + caaUserRealName); - break; - case ChannelContextConstant.CONNECT_DEVICE://设备 - IotDeviceSessionContext deviceSessionContext = (IotDeviceSessionContext) channelContext.get(IotDeviceSessionContext.DEFAULT_DEVICE_SESSION_CONTEXT_KEY); - String deviceCode = deviceSessionContext.getDeviceCode(); - IotDeviceSessionForRedisContext deviceContext = redisSessionComponent.getByDeviceCode(deviceCode); - String deviceId = ""; - String deviceType = ""; - String deviceName = ""; - String controlRealName = ""; - if(deviceContext != null){ - deviceId = deviceContext.getDeviceId(); - deviceType = deviceContext.getDeviceType(); - deviceName = deviceContext.getDeviceName(); - controlRealName = deviceContext.getControlRealName(); - } - info.setDeviceId(deviceId); - info.setDeviceCode(deviceCode); - info.setDeviceType(deviceType); - info.setDeviceName(deviceName); - info.setControlRealName(controlRealName); - StringBuilder deviceInfo = new StringBuilder(); - if (StringUtils.isNoneBlank(deviceName)) { - deviceInfo.append("设备名称:").append(deviceName).append(";"); - } else { - deviceInfo.append("注册号:").append(deviceCode).append(";"); - //deviceInfo.append("设备Id:").append(deviceId).append(";"); - } - deviceInfo.append("控制人:").append(controlRealName); - info.setRemarks(deviceInfo.toString()); - break; - - default: - break; - } - - - int decodeFailCount = stat.getDecodeFailCount(); // 解码异常次数 - String userId = channelContext.userid; // 用户ID - String token = channelContext.getToken(); // 用户token - long timeStart = stat.getTimeCreated(); // 统计开始时间 - // 统计时长 - AtomicLong receivedBytes = stat.getReceivedBytes(); // 已接收字节 - AtomicLong receivedPackets = stat.getReceivedPackets(); // 已接收业务包 - AtomicLong receivedTcps = stat.getReceivedTcps(); // 已接收TCP包 - - double bytesPerTcpReceive = stat.getBytesPerTcpReceive(); // 平均每次TCP包接收的字节数 - double packetsPerTcpReceive = stat.getPacketsPerTcpReceive(); // 平均每次TCP包接收的业务包 - - AtomicLong handledBytes = stat.getHandledBytes(); // 已处理字节 - AtomicLong handledPackets = stat.getHandledPackets(); // 已处理业务包 - - AtomicLong sentBytes = stat.getSentBytes(); // 已发送字节数 - AtomicLong sentPackets = stat.getSentPackets(); // 已发送业务包 - - long timeLatestReceivedMsg = stat.getLatestTimeOfReceivedByte(); - long timeLatestSentMsg = stat.getLatestTimeOfSentPacket(); - long latestCommunicateTime = Math.max(timeLatestReceivedMsg, timeLatestSentMsg); // 最近一次通信时间 - - info.setId(id); - info.setBsId(bsId); - info.setClientIp(ip); - info.setClientPort(port); - info.setClientProtocol(protocol); - info.setProxyIp(ipProxy); - info.setProxyPort(portProxy); - info.setProxyProtocol(protocolProxy); - info.setConnectType(connectType); - //info.setReConnCount(reconnCount); - info.setDecodeFailCount(decodeFailCount); - info.setUserId(userId); - info.setToken(token); - info.setGmtCreate(new Date(timeStart)); - info.setReceivedBytes(receivedBytes); - info.setReceivedPackets(receivedPackets); - info.setReceivedTcps(receivedTcps); - info.setBytesPerTcpReceive(bytesPerTcpReceive); - info.setPacketsPerTcpReceive(packetsPerTcpReceive); - info.setHandledBytes(handledBytes); - info.setHandledPackets(handledPackets); - info.setSentBytes(sentBytes); - info.setSentPackets(sentPackets); - info.setGmtLatestCommunicate(new Date(latestCommunicateTime)); - info.setRegTime(regTime); - - list.add(info); - - log.info("Current Client Info : [{}].", info); - } - } catch (Throwable e) { - log.error("获取在线连接失败!", e); - } finally { - readLock.unlock(); - } +// try { +// readLock.lock(); +// set = setWithLock.getObj(); +// for (ChannelContext channelContext : set) { +// +// ChannelStatInfo info = new ChannelStatInfo(); +// ChannelStat stat = channelContext.stat; +// +// +// String id = channelContext.getId(); +// String bsId = channelContext.getBsId(); +// Date regTime = channelContext.get("regTime") == null ? null : (Date) channelContext.get("regTime"); +// String ip = channelContext.get("realClientIp") == null ? channelContext.getClientNode().getIp() : channelContext.get("realClientIp").toString(); // 客户端IP +// int port = channelContext.get("realClientPort") == null ? channelContext.getClientNode().getPort() : Integer.parseInt(channelContext.get("realClientPort").toString()); // 客户端端口 +// String protocol = channelContext.get("realClientProtocol") == null ? channelContext.getClientNode().getProtocol() : channelContext.get("realClientProtocol").toString();//协议 +// String ipProxy = channelContext.getProxyClientNode() == null ? "" : channelContext.getProxyClientNode().getIp();//代理 ip +// int portProxy = channelContext.getProxyClientNode() == null ? -1 : channelContext.getProxyClientNode().getPort();//代理 端口 +// String protocolProxy = channelContext.getProxyClientNode() == null ? "" : channelContext.getProxyClientNode().getProtocol(); //代理协议 +// +// //连接类型 +// String connectType = channelContext.get(ChannelContextConstant.CONNECT_TYPE) == null ? "" : channelContext.get(ChannelContextConstant.CONNECT_TYPE).toString(); +// if(connectType.equals(ChannelContextConstant.CONNECT_DEVICE)) //设备 +// continue; +// switch (connectType) { +// case ChannelContextConstant.CONNECT_CONSOLE_CLIENT://控制客户端 +// String userRealName = channelContext.get("user-real-name") == null ? "无" : channelContext.get("user-real-name").toString(); +// info.setRealName(userRealName); +// info.setRemarks("操作人:" + userRealName); +// break; +// case ChannelContextConstant.CONNECT_CAA_CLIENT://平板端 +// String caaUserRealName = channelContext.get("user-real-name") == null ? "无" : channelContext.get("user-real-name").toString(); +// info.setRealName(caaUserRealName); +// info.setRemarks("操作人:" + caaUserRealName); +// break; +// case ChannelContextConstant.CONNECT_DEVICE://设备 +// IotDeviceSessionContext deviceSessionContext = (IotDeviceSessionContext) channelContext.get(IotDeviceSessionContext.DEFAULT_DEVICE_SESSION_CONTEXT_KEY); +// String deviceCode = deviceSessionContext.getDeviceCode(); +// IotDeviceSessionForRedisContext deviceContext = redisSessionComponent.getByDeviceCode(deviceCode); +// String deviceId = ""; +// String deviceType = ""; +// String deviceName = ""; +// String controlRealName = ""; +// if(deviceContext != null){ +// deviceId = deviceContext.getDeviceId(); +// deviceType = deviceContext.getDeviceType(); +// deviceName = deviceContext.getDeviceName(); +// controlRealName = deviceContext.getControlRealName(); +// } +// info.setDeviceId(deviceId); +// info.setDeviceCode(deviceCode); +// info.setDeviceType(deviceType); +// info.setDeviceName(deviceName); +// info.setControlRealName(controlRealName); +// StringBuilder deviceInfo = new StringBuilder(); +// if (StringUtils.isNoneBlank(deviceName)) { +// deviceInfo.append("设备名称:").append(deviceName).append(";"); +// } else { +// deviceInfo.append("注册号:").append(deviceCode).append(";"); +// //deviceInfo.append("设备Id:").append(deviceId).append(";"); +// } +// deviceInfo.append("控制人:").append(controlRealName); +// info.setRemarks(deviceInfo.toString()); +// break; +// +// default: +// break; +// } +// +// +// int decodeFailCount = stat.getDecodeFailCount(); // 解码异常次数 +// String userId = channelContext.userid; // 用户ID +// String token = channelContext.getToken(); // 用户token +// long timeStart = stat.getTimeCreated(); // 统计开始时间 +// // 统计时长 +// AtomicLong receivedBytes = stat.getReceivedBytes(); // 已接收字节 +// AtomicLong receivedPackets = stat.getReceivedPackets(); // 已接收业务包 +// AtomicLong receivedTcps = stat.getReceivedTcps(); // 已接收TCP包 +// +// double bytesPerTcpReceive = stat.getBytesPerTcpReceive(); // 平均每次TCP包接收的字节数 +// double packetsPerTcpReceive = stat.getPacketsPerTcpReceive(); // 平均每次TCP包接收的业务包 +// +// AtomicLong handledBytes = stat.getHandledBytes(); // 已处理字节 +// AtomicLong handledPackets = stat.getHandledPackets(); // 已处理业务包 +// +// AtomicLong sentBytes = stat.getSentBytes(); // 已发送字节数 +// AtomicLong sentPackets = stat.getSentPackets(); // 已发送业务包 +// +// long timeLatestReceivedMsg = stat.getLatestTimeOfReceivedByte(); +// long timeLatestSentMsg = stat.getLatestTimeOfSentPacket(); +// long latestCommunicateTime = Math.max(timeLatestReceivedMsg, timeLatestSentMsg); // 最近一次通信时间 +// +// info.setId(id); +// info.setBsId(bsId); +// info.setClientIp(ip); +// info.setClientPort(port); +// info.setClientProtocol(protocol); +// info.setProxyIp(ipProxy); +// info.setProxyPort(portProxy); +// info.setProxyProtocol(protocolProxy); +// info.setConnectType(connectType); +// //info.setReConnCount(reconnCount); +// info.setDecodeFailCount(decodeFailCount); +// info.setUserId(userId); +// info.setToken(token); +// info.setGmtCreate(new Date(timeStart)); +// info.setReceivedBytes(receivedBytes); +// info.setReceivedPackets(receivedPackets); +// info.setReceivedTcps(receivedTcps); +// info.setBytesPerTcpReceive(bytesPerTcpReceive); +// info.setPacketsPerTcpReceive(packetsPerTcpReceive); +// info.setHandledBytes(handledBytes); +// info.setHandledPackets(handledPackets); +// info.setSentBytes(sentBytes); +// info.setSentPackets(sentPackets); +// info.setGmtLatestCommunicate(new Date(latestCommunicateTime)); +// info.setRegTime(regTime); +// +// list.add(info); +// +//// log.info("Current Client Info : [{}].", info); +// } +// } catch (Throwable e) { +// log.error("获取在线连接失败!", e); +// } finally { +// readLock.unlock(); +// } - List resultList = list.stream().filter(f -> { - if (StringUtils.isNotBlank(clientIp)) { - return clientIp.equals(f.getClientIp()); - } - return true; - }).sorted(Comparator.comparing(ChannelStatInfo::getClientIp).thenComparing(ChannelStatInfo::getConnectType).thenComparing(ChannelStatInfo::getClientPort)) - .collect(Collectors.toList()); - return CommonResult.success(resultList); +// List resultList = list.stream().filter(f -> { +// if (StringUtils.isNotBlank(clientIp)) { +// return clientIp.equals(f.getClientIp()); +// } +// return true; +// }).sorted(Comparator.comparing(ChannelStatInfo::getClientIp).thenComparing(ChannelStatInfo::getConnectType).thenComparing(ChannelStatInfo::getClientPort)).collect(Collectors.toList()); + return CommonResult.success(list); } /** @@ -202,17 +311,8 @@ public class IotConnectManagerStatsController { @PostMapping("/close") @ApiOperationSupport(order = 6) @Operation(summary = "关闭连接") - public CommonResult close(@RequestParam(name="id",required=true) String id) { - ChannelContext channelContext = Tio.getByChannelContextId(tioServerBootstrap.getServerTioConfig(), id); - IotDeviceSessionContext sessionContext = (IotDeviceSessionContext) channelContext.get(IotDeviceSessionContext.DEFAULT_DEVICE_SESSION_CONTEXT_KEY); - if(sessionContext != null){ - String deviceId = sessionContext.getDeviceId(); - if(!ObjectUtils.isEmpty(deviceId)) - redisSessionComponent.deleteByDeviceId(deviceId); - } - Tio.IpBlacklist.clear();//清空全局黑名单 - Tio.close(channelContext, "控制端主动关闭连接"); - return CommonResult.success("ok"); + public CommonResult close(@RequestParam(name="id",required=true) String id) throws InterruptedException { + return iotConnectManagerStatsService.close(id, "0"); } /** @@ -223,20 +323,17 @@ public class IotConnectManagerStatsController { @PostMapping("/clearDeviceControl") @ApiOperationSupport(order = 7) @Operation(summary = "清除设备控制") - public CommonResult clearDeviceControl(@RequestParam(name="id",required=true) String id) { - ChannelContext channelContext = Tio.getByChannelContextId(tioServerBootstrap.getServerTioConfig(), id); - Tio.IpBlacklist.clear();//清空全局黑名单 - if(channelContext != null) { - IotDeviceSessionContext deviceSessionContext = (IotDeviceSessionContext) channelContext.get(IotDeviceSessionContext.DEFAULT_DEVICE_SESSION_CONTEXT_KEY); - String deviceId = deviceSessionContext.getDeviceId(); - if(!ObjectUtils.isEmpty(deviceId)){ - redisSessionComponent.clearControlByDeviceId(deviceId, channelContext); - } - //设置控制点的channelContext -// deviceSessionContext.setControlChannelContext(null); -// deviceSessionContext.setControlRealName(""); -// channelContext.set(ChannelContextConstant.CONTROL_DEVICE_ID, ""); - } + public CommonResult clearDeviceControl(@RequestParam(name="id") String id) throws InterruptedException { + return iotConnectManagerStatsService.clearDeviceControl(id, "0"); + } + + @RequestMapping("/testMsg") + public CommonResult testMsg() { + + TioServerMessageVo vo = new TioServerMessageVo(); + vo.setServerClusterMsgType("type1"); + vo.setGroup("aaaa"); + tioServerMessagePublisher.publish( vo); return CommonResult.success("ok"); } } diff --git a/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/service/IotConnectManagerStatsService.java b/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/service/IotConnectManagerStatsService.java new file mode 100644 index 0000000..7075b07 --- /dev/null +++ b/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/service/IotConnectManagerStatsService.java @@ -0,0 +1,80 @@ +package com.zt.plat.module.qms.iot.service; + +import com.zt.plat.framework.common.pojo.CommonResult; +import com.zt.plat.module.qms.iot.tcpserver.cluster.TioServerMessageConfig; +import com.zt.plat.module.qms.iot.tcpserver.cluster.TioServerMessagePublisher; +import com.zt.plat.module.qms.iot.tcpserver.cluster.TioServerMessageVo; +import com.zt.plat.module.qms.iot.tcpserver.core.IotDeviceSessionContext; +import com.zt.plat.module.qms.iot.tcpserver.device.RedisSessionComponent; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.core.env.Environment; +import org.springframework.stereotype.Service; +import org.springframework.util.ObjectUtils; +import org.springframework.web.bind.annotation.RequestParam; +import tech.zzjc.tio.core.ChannelContext; +import tech.zzjc.tio.core.Tio; +import tech.zzjc.tio.starter.TioServerBootstrap; + +import static java.lang.Thread.sleep; + +@Service +public class IotConnectManagerStatsService { + + @Autowired + private TioServerBootstrap tioServerBootstrap; + @Autowired public RedisSessionComponent redisSessionComponent; + @Autowired private TioServerMessagePublisher tioServerMessagePublisher; + + public CommonResult close(String id, String fromSubscribe) throws InterruptedException { + ChannelContext channelContext = Tio.getByChannelContextId(tioServerBootstrap.getServerTioConfig(), id); + if(channelContext == null){ + if("1".equals(fromSubscribe)) + return CommonResult.success("ok"); + //广播消息处理 + TioServerMessageVo message = new TioServerMessageVo(); + message.setServerClusterMsgType(TioServerMessageConfig.MESSAGE_TYPE_CLOSE_MANUAL); + message.setChannelId(id); + tioServerMessagePublisher.publish(message); + //延迟0.5秒返回 + sleep(500); + return CommonResult.success("ok"); + } + IotDeviceSessionContext sessionContext = (IotDeviceSessionContext) channelContext.get(IotDeviceSessionContext.DEFAULT_DEVICE_SESSION_CONTEXT_KEY); + String deviceId = sessionContext.getDeviceId(); + if(!ObjectUtils.isEmpty(deviceId)) + redisSessionComponent.deleteByDeviceId(deviceId); + Tio.IpBlacklist.clear();//清空全局黑名单 + Tio.close(channelContext, "控制端主动关闭连接"); + return CommonResult.success("ok"); + } + + public CommonResult clearDeviceControl(String id, String fromSubscribe) throws InterruptedException { + ChannelContext channelContext = Tio.getByChannelContextId(tioServerBootstrap.getServerTioConfig(), id); + Tio.IpBlacklist.clear();//清空全局黑名单 + if(channelContext == null){ + if("1".equals(fromSubscribe)) + return CommonResult.success("ok"); + //广播消息处理 + TioServerMessageVo message = new TioServerMessageVo(); + message.setServerClusterMsgType(TioServerMessageConfig.MESSAGE_TYPE_CLEAR_MANUAL); + message.setChannelId(id); + tioServerMessagePublisher.publish(message); + //延迟0.5秒返回 + sleep(500); + return CommonResult.success("ok"); + } + IotDeviceSessionContext deviceSessionContext = (IotDeviceSessionContext) channelContext.get(IotDeviceSessionContext.DEFAULT_DEVICE_SESSION_CONTEXT_KEY); + String deviceId = deviceSessionContext.getDeviceId(); + if(!ObjectUtils.isEmpty(deviceId)){ + redisSessionComponent.clearControlByDeviceId(deviceId, channelContext); + } +// if(channelContext != null) { + //设置控制点的channelContext +// deviceSessionContext.setControlChannelContext(null); +// deviceSessionContext.setControlRealName(""); +// channelContext.set(ChannelContextConstant.CONTROL_DEVICE_ID, ""); +// } + return CommonResult.success("ok"); + } + +} diff --git a/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/IotServerAioListener.java b/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/IotServerAioListener.java index 5c13bd1..38e39b5 100644 --- a/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/IotServerAioListener.java +++ b/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/IotServerAioListener.java @@ -86,7 +86,6 @@ public class IotServerAioListener implements ServerAioListener { if(!ObjectUtils.isEmpty(deviceCode)){ redisSessionComponent.clearControlByDeviceCode(deviceCode.toString(), channelContext); } - //解绑业务ID Tio.unbindBsId(channelContext); //解绑群组 diff --git a/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/caaclient/IotCaaClientControlDeviceHander.java b/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/caaclient/IotCaaClientControlDeviceHander.java index c6a35a7..b9d509b 100644 --- a/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/caaclient/IotCaaClientControlDeviceHander.java +++ b/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/caaclient/IotCaaClientControlDeviceHander.java @@ -1,6 +1,9 @@ package com.zt.plat.module.qms.iot.tcpserver.caaclient; import com.zt.plat.module.qms.iot.tcpserver.IotUtils; +import com.zt.plat.module.qms.iot.tcpserver.cluster.TioServerMessageConfig; +import com.zt.plat.module.qms.iot.tcpserver.cluster.TioServerMessagePublisher; +import com.zt.plat.module.qms.iot.tcpserver.cluster.TioServerMessageVo; import com.zt.plat.module.qms.iot.tcpserver.core.*; import com.zt.plat.module.qms.iot.tcpserver.core.ChannelContextConstant; import com.zt.plat.module.qms.iot.tcpserver.core.ClientType; @@ -12,6 +15,7 @@ import com.alibaba.fastjson.JSONObject; import com.zt.plat.module.qms.iot.tcpserver.IotPacket; import com.zt.plat.module.qms.iot.tcpserver.caaclient.bean.ControlDevice; import com.zt.plat.module.qms.iot.tcpserver.handler.IotDataHander; +import jakarta.annotation.Resource; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; @@ -22,8 +26,10 @@ import tech.zzjc.tio.core.ChannelContext; import tech.zzjc.tio.core.ChannelContextFilter; import tech.zzjc.tio.core.Tio; import tech.zzjc.tio.core.TioConfig; +import tech.zzjc.tio.core.stat.ChannelStat; import java.nio.charset.Charset; +import java.util.Date; import static com.zt.plat.module.qms.iot.tcpserver.core.GroupConstant.CAA_ALL_CLIENT; @@ -42,17 +48,24 @@ import static com.zt.plat.module.qms.iot.tcpserver.core.GroupConstant.CAA_ALL_CL public class IotCaaClientControlDeviceHander implements IotDataHander { @Autowired public RedisSessionComponent redisSessionComponent; + + @Autowired + private TioServerMessagePublisher tioServerMessagePublisher; + @Override public Object hander(String msgId, String data, ChannelContext channelContext) throws Exception { + ControlDevice controlDeviceOriginal = JSON.parseObject(data, ControlDevice.class); ControlDevice controlDevice = JSON.parseObject(data, ControlDevice.class); //获取客户端连接信息 + ChannelStat stat = channelContext.stat; String realClientIp = channelContext.get("realClientIp") == null ? channelContext.getClientNode().getIp() : channelContext.get("realClientIp").toString(); String realClientPort = channelContext.get("realClientPort") == null ? channelContext.getClientNode().getPort() + "" : channelContext.get("realClientPort").toString(); + String deviceId = controlDevice.getDeviceId(); String realClient = realClientIp + ":" + realClientPort; log.info("{},, APP客户端控制设备 :{}", realClient, controlDevice); - String deviceId = controlDevice.getDeviceId(); + JSONObject json = null; + IotPacket iotPacket = new IotPacket(); if (StringUtils.isBlank(deviceId)) { - JSONObject json = null; json = new JSONObject(); json.put("msgId", msgId); json.put("cmd", Command.REPLY_CONTROL_DEVICE.getName()); @@ -60,17 +73,16 @@ public class IotCaaClientControlDeviceHander implements IotDataHander { controlDevice.setSuccess(false); controlDevice.setMsg("设备解析为空!"); json.put("data", controlDevice); - IotPacket iotPacket = new IotPacket(); + iotPacket = new IotPacket(); iotPacket.setBody((json.toJSONString() + "\r\n").getBytes(Charset.forName(IotPacket.CHARSET))); Tio.send(channelContext, iotPacket); return null; } - String deviceCode = IotUtils.transDeviceIdToCode(deviceId); - ChannelContext deviceChannelContext = Tio.getByBsId(channelContext.getTioConfig(), deviceCode); - - IotPacket iotPacket = new IotPacket(); - JSONObject json = null; - if (deviceChannelContext == null) { + IotDeviceSessionForRedisContext deviceContext = redisSessionComponent.getByDeviceId(deviceId); + if (deviceContext == null) { + //如果是订阅消息,不再广播 + if("1".equals(controlDevice.getFromSubscribe())) + return null; json = new JSONObject(); json.put("msgId", msgId); json.put("cmd", Command.REPLY_CONTROL_DEVICE.getName()); @@ -82,11 +94,31 @@ public class IotCaaClientControlDeviceHander implements IotDataHander { Tio.send(channelContext, iotPacket); return null; } + String deviceControlUserId = deviceContext.getUserId(); + String curUserId = controlDevice.getControlUserId(); + + String deviceCode = IotUtils.transDeviceIdToCode(deviceId); + ChannelContext deviceChannelContext = Tio.getByBsId(channelContext.getTioConfig(), deviceCode); + //如果设备不在本服务实例,发送广播到其他服务处理 + if(deviceChannelContext == null){ + String channelId = deviceContext.getChannelId(); + TioServerMessageVo message = new TioServerMessageVo(); + message.setServerClusterMsgType(TioServerMessageConfig.MESSAGE_TYPE_CONTROL); + message.setChannelId(channelId); +// message.setChannelContext(channelContext); + controlDeviceOriginal.setFromSubscribe("1"); + iotPacket.setBody((JSONObject.toJSONString(controlDeviceOriginal) + "\r\n").getBytes(Charset.forName(IotPacket.CHARSET))); + message.setPacket(iotPacket); + tioServerMessagePublisher.publish(message); + return null; + } + + TioConfig tioConfig = deviceChannelContext.getTioConfig(); if (controlDevice.getIsControl()) { //控制 // deviceContext.setControlChannelContext(channelContext); - IotDeviceSessionForRedisContext deviceContext = redisSessionComponent.getByDeviceId(deviceId); deviceContext.setControlRealName(controlDevice.getControlRealName()); + deviceContext.setUserId(curUserId); channelContext.set(ChannelContextConstant.CONTROL_DEVICE_ID, deviceCode); controlDevice.setMsg("设备被“" + controlDevice.getControlRealName() + "”远程控制!"); redisSessionComponent.update(deviceContext); @@ -106,18 +138,18 @@ public class IotCaaClientControlDeviceHander implements IotDataHander { json.put("msgId", msgId); json.put("cmd", Command.REPLY_CONTROL_DEVICE.getName()); json.put("clientType", ClientType.CAA_CLIENT.getName()); - controlDevice.setSuccess(true); - json.put("data", controlDevice); +// controlDevice.setSuccess(true); +// json.put("data", controlDevice); + json.put("data", JSON.parseObject(data, ControlDevice.class)); iotPacket.setBody((json.toJSONString() + "\r\n").getBytes(Charset.forName(IotPacket.CHARSET))); - //发送通知到其他服务实例 - TioConfig tioConfig = deviceChannelContext.getTioConfig(); + //发送通知到其他服务实例(其他实例通知各自客户端) TioClusterConfig tioClusterConfig = tioConfig.getTioClusterConfig(); TioClusterVo iotClusterVo = new TioClusterVo(iotPacket); iotClusterVo.setGroup(CAA_ALL_CLIENT); tioClusterConfig.publish(iotClusterVo); //发送消息到本服务实例连接的客户端 - Tio.sendToGroup(channelContext.getTioConfig(), CAA_ALL_CLIENT, iotPacket, new ChannelContextFilter() { + Tio.sendToGroup(tioConfig, CAA_ALL_CLIENT, iotPacket, new ChannelContextFilter() { @Override public boolean filter(ChannelContext cc) { if (channelContext.getId().equals(cc.getId())) { @@ -126,6 +158,9 @@ public class IotCaaClientControlDeviceHander implements IotDataHander { return true; }} ); + + + return null; } diff --git a/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/caaclient/IotCaaClientRegisterHander.java b/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/caaclient/IotCaaClientRegisterHander.java index bb53f5a..f75486e 100644 --- a/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/caaclient/IotCaaClientRegisterHander.java +++ b/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/caaclient/IotCaaClientRegisterHander.java @@ -1,16 +1,19 @@ package com.zt.plat.module.qms.iot.tcpserver.caaclient; import com.alibaba.fastjson.JSON; +import com.fhs.common.spring.SpringContextUtil; import com.zt.plat.module.qms.iot.tcpserver.IotPacket; import com.zt.plat.module.qms.iot.tcpserver.consoleclient.bean.Register; import com.zt.plat.module.qms.iot.tcpserver.core.ClientType; import com.zt.plat.module.qms.iot.tcpserver.core.Command; import com.zt.plat.module.qms.iot.tcpserver.core.ReplyResult; +import com.zt.plat.module.qms.iot.tcpserver.device.RedisSessionComponent; import com.zt.plat.module.qms.iot.tcpserver.handler.IotDataHander; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import tech.zzjc.tio.core.ChannelContext; import tech.zzjc.tio.core.Tio; +import tech.zzjc.tio.utils.SystemTimer; import java.nio.charset.Charset; import java.util.Date; @@ -34,11 +37,7 @@ public class IotCaaClientRegisterHander implements IotDataHander { @Override public Object hander(String msgId, String data, ChannelContext channelContext) throws Exception { Register register = JSON.parseObject(data, Register.class); - //获取客户端连接信息 - String realClientIp = channelContext.get("realClientIp") == null ? channelContext.getClientNode().getIp() : channelContext.get("realClientIp").toString(); - String realClientPort = channelContext.get("realClientPort") == null ? channelContext.getClientNode().getPort() + "" : channelContext.get("realClientPort").toString(); - String realClient = realClientIp + ":" + realClientPort; - log.info("{},, APP客户端注册租户ID:{},用户ID:{}", realClient, register.getTenantId(), register.getUserId()); + Tio.bindUser(channelContext, register.getUserId()); Tio.bindToken(channelContext, register.getUserId()); channelContext.set("regTime", new Date()); @@ -49,6 +48,15 @@ public class IotCaaClientRegisterHander implements IotDataHander { Tio.send(channelContext, resppacket); //绑定群组 Tio.bindGroup(channelContext, CAA_ALL_CLIENT); + + //在redis记录客户端信息 +// RedisSessionComponent redisSessionComponent = (RedisSessionComponent) SpringContextUtil.getBean("redisSessionComponent"); +// redisSessionComponent.regClient(channelContext); + + + // + + return null; } diff --git a/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/caaclient/bean/ControlDevice.java b/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/caaclient/bean/ControlDevice.java index 8adb3e8..ba8cfc3 100644 --- a/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/caaclient/bean/ControlDevice.java +++ b/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/caaclient/bean/ControlDevice.java @@ -24,7 +24,8 @@ public class ControlDevice implements Serializable { /** 控制者姓名 **/ private String controlRealName; - + private String controlUserId; + /** 是否控制 **/ private Boolean isControl; @@ -33,4 +34,7 @@ public class ControlDevice implements Serializable { /** 回复app时使用 消息 **/ private String msg; + + /** 来自消息订阅 **/ + private String fromSubscribe; } diff --git a/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/cluster/InstanceIdProvider.java b/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/cluster/InstanceIdProvider.java new file mode 100644 index 0000000..5c6fc9a --- /dev/null +++ b/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/cluster/InstanceIdProvider.java @@ -0,0 +1,20 @@ +package com.zt.plat.module.qms.iot.tcpserver.cluster; + +import org.springframework.stereotype.Component; + +import java.util.UUID; + +@Component +public class InstanceIdProvider { + + private final String instanceId; + + public InstanceIdProvider() { + //使用随机 UUID(每次重启会变) + this.instanceId = UUID.randomUUID().toString(); + } + + public String getInstanceId() { + return instanceId; + } +} diff --git a/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/cluster/TioServerMessageConfig.java b/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/cluster/TioServerMessageConfig.java new file mode 100644 index 0000000..3d7209d --- /dev/null +++ b/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/cluster/TioServerMessageConfig.java @@ -0,0 +1,16 @@ +package com.zt.plat.module.qms.iot.tcpserver.cluster; + +public class TioServerMessageConfig { + + public static final String SERVER_CLUSTER_TOPIC_CHANNEL = "zzjc-cluster-tio-server"; + + + public static final String MESSAGE_TYPE_CONTROL = "control"; //设备控制消息 + public static final String MESSAGE_TYPE_CLEAR_MANUAL = "clearControlManual"; //手动清除控制 + public static final String MESSAGE_TYPE_CLOSE_MANUAL = "closeManual"; //手动断开连接 + public static final String MESSAGE_TYPE_NOTICE = "notice"; //通知消息。应该用不到。 + + + + +} diff --git a/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/cluster/TioServerMessagePublisher.java b/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/cluster/TioServerMessagePublisher.java new file mode 100644 index 0000000..c71643f --- /dev/null +++ b/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/cluster/TioServerMessagePublisher.java @@ -0,0 +1,33 @@ +package com.zt.plat.module.qms.iot.tcpserver.cluster; + +import lombok.extern.slf4j.Slf4j; +import org.redisson.api.RTopic; +import org.redisson.api.RedissonClient; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; +import tech.zzjc.tio.server.ServerTioConfig; + +import static com.zt.plat.module.qms.iot.tcpserver.cluster.TioServerMessageConfig.SERVER_CLUSTER_TOPIC_CHANNEL; + +@Service +@Slf4j +public class TioServerMessagePublisher { + + @Autowired + private RedissonClient redissonClient; + @Autowired + private InstanceIdProvider instanceIdProvider; // 注入实例ID提供者 + + + + public void publish(TioServerMessageVo message) { + message.setSenderInstanceId(instanceIdProvider.getInstanceId()); + RTopic topic = redissonClient.getTopic(SERVER_CLUSTER_TOPIC_CHANNEL); + topic.publish(message); + log.error("[Publisher] 已发布消息: " + message); + + + } + + +} diff --git a/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/cluster/TioServerMessageSubscriber.java b/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/cluster/TioServerMessageSubscriber.java new file mode 100644 index 0000000..2105abd --- /dev/null +++ b/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/cluster/TioServerMessageSubscriber.java @@ -0,0 +1,91 @@ +package com.zt.plat.module.qms.iot.tcpserver.cluster; + +import com.alibaba.fastjson.JSONObject; +import com.zt.plat.module.qms.iot.service.IotConnectManagerStatsService; +import com.zt.plat.module.qms.iot.tcpserver.IotPacket; +import com.zt.plat.module.qms.iot.tcpserver.handler.IotDataHander; +import jakarta.annotation.PostConstruct; +import lombok.extern.slf4j.Slf4j; +import org.redisson.api.RTopic; +import org.redisson.api.RedissonClient; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; +import tech.zzjc.tio.core.ChannelContext; +import tech.zzjc.tio.core.Tio; +import tech.zzjc.tio.core.TioConfig; +import tech.zzjc.tio.server.ServerTioConfig; +import tech.zzjc.tio.server.TioServer; +import tech.zzjc.tio.starter.TioServerBootstrap; +import tech.zzjc.tio.utils.lock.SetWithLock; + +import static com.zt.plat.module.qms.iot.tcpserver.cluster.TioServerMessageConfig.SERVER_CLUSTER_TOPIC_CHANNEL; + +@Component +@Slf4j +public class TioServerMessageSubscriber { + + @Autowired + private RedissonClient redissonClient; + @Autowired + private InstanceIdProvider instanceIdProvider; + + @Autowired + private IotDataHander iotCaaClientControlDeviceHander; + + @Autowired private TioServerBootstrap tioServerBootstrap; + @Autowired private IotConnectManagerStatsService iotConnectManagerStatsService; + + @PostConstruct + public void subscribe() { + RTopic topic = redissonClient.getTopic(SERVER_CLUSTER_TOPIC_CHANNEL); + topic.addListener(TioServerMessageVo.class, (channel, msg) -> { + String senderId = msg.getSenderInstanceId(); + String channelId = msg.getChannelId(); + String msgType = msg.getServerClusterMsgType(); + String currentId = instanceIdProvider.getInstanceId(); + if (currentId.equals(senderId)) { + log.debug("[Subscriber] 跳过自己发送的消息"); + return; + } + log.debug("[Subscriber] 收到消息频道 '" + channel + "' 的内容: " + msg); + + if(TioServerMessageConfig.MESSAGE_TYPE_CLEAR_MANUAL.equals(msgType)){ + try { + iotConnectManagerStatsService.clearDeviceControl(channelId,"1"); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + return; + } + if(TioServerMessageConfig.MESSAGE_TYPE_CLOSE_MANUAL.equals(msgType)){ + try { + iotConnectManagerStatsService.close(channelId,"1"); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + return; + } + + ServerTioConfig serverTioConfig = tioServerBootstrap.getServerTioConfig(); + SetWithLock all = Tio.getAll(serverTioConfig); + ChannelContext targetContext = null; + for(ChannelContext context : all.getObj()){ + targetContext = context; + String id = context.getId(); + if(id.equals(channelId)){ + break; + } + } + IotPacket packet = (IotPacket) msg.getPacket(); + byte[] body = packet.getBody(); + String bodyStr = new String(body); + JSONObject json = JSONObject.parseObject(bodyStr); + try { + iotCaaClientControlDeviceHander.hander(json.getString("msgId"), bodyStr, targetContext); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } + +} diff --git a/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/cluster/TioServerMessageVo.java b/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/cluster/TioServerMessageVo.java new file mode 100644 index 0000000..aab434c --- /dev/null +++ b/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/cluster/TioServerMessageVo.java @@ -0,0 +1,18 @@ +package com.zt.plat.module.qms.iot.tcpserver.cluster; + +import lombok.Data; +import tech.zzjc.tio.cluster.TioClusterVo; +import tech.zzjc.tio.core.ChannelContext; + +@Data +public class TioServerMessageVo extends TioClusterVo { + + private String senderInstanceId; //发布者id + + + private String serverClusterMsgType; //消息类型: 设备 + + + private ChannelContext channelContext; + +} diff --git a/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/consoleclient/IotConsoleClientCheckControlHander.java b/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/consoleclient/IotConsoleClientCheckControlHander.java index 06f1d80..5c8cfc8 100644 --- a/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/consoleclient/IotConsoleClientCheckControlHander.java +++ b/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/consoleclient/IotConsoleClientCheckControlHander.java @@ -31,7 +31,7 @@ public class IotConsoleClientCheckControlHander implements IotDataHander { String realClientPort = channelContext.get("realClientPort") == null ? channelContext.getClientNode().getPort() + "" : channelContext.get("realClientPort").toString(); String realClient = realClientIp + ":" + realClientPort; log.info("{},检查控制点由谁控制", realClient); - String userId = channelContext.getToken(); + String token = channelContext.getToken(); IotPacket resppacket = null; // List userMeasurePointList = userMeasurePointService.list(Wrappers.query().lambda().eq(UserMeasurePoint::getUserId, Func.toLong(userId))); // for (UserMeasurePoint userMeasurePoint : userMeasurePointList) { diff --git a/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/core/IotClientSessionContext.java b/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/core/IotClientSessionContext.java index 0a55bf1..dd5c833 100644 --- a/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/core/IotClientSessionContext.java +++ b/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/core/IotClientSessionContext.java @@ -1,5 +1,6 @@ package com.zt.plat.module.qms.iot.tcpserver.core; +import lombok.Data; import tech.zzjc.tio.core.ChannelContext; import java.io.Serializable; @@ -15,6 +16,7 @@ import java.io.Serializable; * @version V1.0 * @since 2021-6-28 */ +@Data public class IotClientSessionContext implements Serializable { private static final long serialVersionUID = 8738456547503650108L; @@ -31,29 +33,5 @@ public class IotClientSessionContext implements Serializable { /** 当前控制计量点的ChannelContext **/ private ChannelContext controlConsoleClientChannelContext; - public String getClientType() { - return clientType; - } - - public void setClientType(String clientType) { - this.clientType = clientType; - } - - public MeasurePoint getMeasurePoint() { - return measurePoint; - } - - public void setMeasurePoint(MeasurePoint measurePoint) { - this.measurePoint = measurePoint; - } - - public ChannelContext getControlConsoleClientChannelContext() { - return controlConsoleClientChannelContext; - } - - public void setControlConsoleClientChannelContext(ChannelContext controlConsoleClientChannelContext) { - this.controlConsoleClientChannelContext = controlConsoleClientChannelContext; - } - } diff --git a/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/core/IotClientSessionForRedisContext.java b/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/core/IotClientSessionForRedisContext.java new file mode 100644 index 0000000..6187bce --- /dev/null +++ b/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/core/IotClientSessionForRedisContext.java @@ -0,0 +1,120 @@ +package com.zt.plat.module.qms.iot.tcpserver.core; + +import lombok.Data; + +import java.util.Date; +import java.util.concurrent.atomic.AtomicLong; + +@Data +public class IotClientSessionForRedisContext extends IotClientSessionContext{ + private static final long serialVersionUID = 1L; + + private String channelId; + + private Date regTime; //注册时间 + private Date expireTime; //过期时间 + private Date lastCommunicateTime; //最近一次通信时间 + private String tenantId; //租户ID + private String controlRealName; + + /** + * 客户端IP + */ + private String clientIp; + + /** + * 端口 + */ + private int clientPort; + + /** + * 协议 + */ + private String clientProtocol; + + /** + * 代理ip + */ + private String proxyIp; + + /** + * 代理端口 + */ + private int proxyPort; + + /** + * 代理协议 + */ + private String proxyProtocol; + + /** + * 连接类型 + */ + private String connectType; + + /** + * 解码失败数 + */ + private int decodeFailCount; + + /** + * 用户ID + */ + private String userId; + + /** + * 用户token + */ + private String token; + + + /** + * 统计开始时间 + */ + private Date gmtCreate; + + /** + * 已接收字节数 + */ + private AtomicLong receivedBytes; + + /** + * 已接收业务包 + */ + private AtomicLong receivedPackets; + + /** + * 已接收TCP包 + */ + private AtomicLong receivedTcps; + + /** + * 平均每个TCP包接收的字节数 + */ + private double bytesPerTcpReceive; + + /** + * 平均每个TCP包接收的业务包 + */ + private double packetsPerTcpReceive; + + /** + * 已处理的字节数 + */ + private AtomicLong handledBytes; + + /** + * 已处理的业务包 + */ + private AtomicLong handledPackets; + + /** + * 已发送的字节数 + */ + private AtomicLong sentBytes; + + /** + * 已发送的业务包 + */ + private AtomicLong sentPackets; +} diff --git a/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/core/IotDeviceSessionContext.java b/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/core/IotDeviceSessionContext.java index d02d7cc..7b3676d 100644 --- a/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/core/IotDeviceSessionContext.java +++ b/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/core/IotDeviceSessionContext.java @@ -1,5 +1,6 @@ package com.zt.plat.module.qms.iot.tcpserver.core; +import lombok.Data; import tech.zzjc.tio.core.ChannelContext; import java.io.Serializable; @@ -16,6 +17,7 @@ import java.util.List; * @version V1.0 * @since 2021-6-28 */ +@Data public class IotDeviceSessionContext implements Serializable { private static final long serialVersionUID = 9035659677475646771L; @@ -44,61 +46,6 @@ public class IotDeviceSessionContext implements Serializable { /** 控制的ChannelContext **/ private ChannelContext controlChannelContext; - public String getDeviceId() { - return deviceId; - } - - public void setDeviceId(String deviceId) { - this.deviceId = deviceId; - } - - public String getDeviceCode() { - return deviceCode; - } - - public void setDeviceCode(String deviceCode) { - this.deviceCode = deviceCode; - } - - public String getDeviceType() { - return deviceType; - } - - public void setDeviceType(String deviceType) { - this.deviceType = deviceType; - } - - public String getDeviceName() { - return deviceName; - } - - public void setDeviceName(String deviceName) { - this.deviceName = deviceName; - } - - public List getWeightValueList() { - return weightValueList; - } - - public void setWeightValueList(List weightValueList) { - this.weightValueList = weightValueList; - } - - public String getControlRealName() { - return controlRealName; - } - - public void setControlRealName(String controlRealName) { - this.controlRealName = controlRealName; - } - - public ChannelContext getControlChannelContext() { - return controlChannelContext; - } - - public void setControlChannelContext(ChannelContext controlChannelContext) { - this.controlChannelContext = controlChannelContext; - } } diff --git a/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/core/IotDeviceSessionForRedisContext.java b/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/core/IotDeviceSessionForRedisContext.java index 13acd6d..62fd3c4 100644 --- a/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/core/IotDeviceSessionForRedisContext.java +++ b/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/core/IotDeviceSessionForRedisContext.java @@ -1,55 +1,123 @@ package com.zt.plat.module.qms.iot.tcpserver.core; -import java.util.Date; +import lombok.Data; +import tech.zzjc.tio.core.ChannelContext; +import java.util.Date; +import java.util.concurrent.atomic.AtomicLong; + +@Data public class IotDeviceSessionForRedisContext extends IotDeviceSessionContext{ private static final long serialVersionUID = 1L; + private String channelId; + private Date regTime; //注册时间 private Date expireTime; //过期时间 private String balanceWeightUnit; //天平重量单位 private Long lastBalanceDataTime; //天平最后数据时间 - private String tenantId; //租户ID - public Date getRegTime() { - return regTime; - } + /** + * 客户端IP + */ + private String clientIp; - public void setRegTime(Date regTime) { - this.regTime = regTime; - } + /** + * 端口 + */ + private int clientPort; - public Date getExpireTime() { - return expireTime; - } + /** + * 协议 + */ + private String clientProtocol; - public void setExpireTime(Date expireTime) { - this.expireTime = expireTime; - } + /** + * 代理ip + */ + private String proxyIp; - public String getBalanceWeightUnit() { - return balanceWeightUnit; - } + /** + * 代理端口 + */ + private int proxyPort; - public void setBalanceWeightUnit(String balanceWeightUnit) { - this.balanceWeightUnit = balanceWeightUnit; - } + /** + * 代理协议 + */ + private String proxyProtocol; - public Long getLastBalanceDataTime() { - return lastBalanceDataTime; - } + /** + * 连接类型 + */ + private String connectType; - public void setLastBalanceDataTime(Long lastBalanceDataTime) { - this.lastBalanceDataTime = lastBalanceDataTime; - } + /** + * 解码失败数 + */ + private int decodeFailCount; - public String getTenantId() { - return tenantId; - } + /** + * 用户ID + */ + private String userId; + + /** + * 用户token + */ + private String token; + + + /** + * 统计开始时间 + */ + private Date gmtCreate; + + /** + * 已接收字节数 + */ + private AtomicLong receivedBytes; + + /** + * 已接收业务包 + */ + private AtomicLong receivedPackets; + + /** + * 已接收TCP包 + */ + private AtomicLong receivedTcps; + + /** + * 平均每个TCP包接收的字节数 + */ + private double bytesPerTcpReceive; + + /** + * 平均每个TCP包接收的业务包 + */ + private double packetsPerTcpReceive; + + /** + * 已处理的字节数 + */ + private AtomicLong handledBytes; + + /** + * 已处理的业务包 + */ + private AtomicLong handledPackets; + + /** + * 已发送的字节数 + */ + private AtomicLong sentBytes; + + /** + * 已发送的业务包 + */ + private AtomicLong sentPackets; - public void setTenantId(String tenantId) { - this.tenantId = tenantId; - } } diff --git a/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/device/RedisSessionComponent.java b/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/device/RedisSessionComponent.java index 190b27e..89a0fc4 100644 --- a/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/device/RedisSessionComponent.java +++ b/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/device/RedisSessionComponent.java @@ -3,19 +3,27 @@ package com.zt.plat.module.qms.iot.tcpserver.device; import com.zt.plat.framework.tenant.core.context.TenantContextHolder; import com.zt.plat.module.qms.iot.tcpserver.IotUtils; import com.zt.plat.module.qms.iot.tcpserver.core.ChannelContextConstant; +import com.zt.plat.module.qms.iot.tcpserver.core.IotClientSessionForRedisContext; import com.zt.plat.module.qms.iot.tcpserver.core.IotDeviceSessionForRedisContext; import com.zt.plat.module.qms.iot.tcpserver.publisher.IotDeviceRegisterPublisher; import com.zt.plat.module.qms.resource.device.dal.dataobject.DeviceInfomationDO; import com.zt.plat.module.qms.resource.device.service.DeviceInfomationService; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.redis.core.Cursor; +import org.springframework.data.redis.core.RedisCallback; import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.data.redis.core.ScanOptions; import org.springframework.stereotype.Service; import tech.zzjc.tio.core.ChannelContext; import tech.zzjc.tio.core.Tio; +import tech.zzjc.tio.core.stat.ChannelStat; +import tech.zzjc.tio.utils.SystemTimer; import java.time.LocalDateTime; import java.util.Date; +import java.util.HashSet; +import java.util.Set; import java.util.concurrent.TimeUnit; @Service @@ -51,16 +59,25 @@ public class RedisSessionComponent { } public void deleteByDeviceId(String deviceId){ - redisTemplate.delete(getRedisKeyByDeviceCode(deviceId)); + redisTemplate.delete(getRedisKeyByDeviceId(deviceId)); } public IotDeviceSessionForRedisContext getByDeviceCode(String deviceCode){ - String deviceId = IotUtils.transDeviceCodeToId(deviceCode); - return getByDeviceId(deviceId); + String redisKey = getRedisKeyByDeviceCode(deviceCode); + return getByKey(redisKey); } public IotDeviceSessionForRedisContext getByDeviceId(String deviceId){ - return (IotDeviceSessionForRedisContext) redisTemplate.opsForValue().get(getRedisKeyByDeviceId(deviceId)); + String redisKey = getRedisKeyByDeviceId(deviceId); + return getByKey(redisKey); +// return (IotDeviceSessionForRedisContext) redisTemplate.opsForValue().get(getRedisKeyByDeviceId(deviceId)); + } + public IotDeviceSessionForRedisContext getByKey(String key){ + return (IotDeviceSessionForRedisContext) redisTemplate.opsForValue().get(key); + } + + public IotClientSessionForRedisContext getClientByKey(String key){ + return (IotClientSessionForRedisContext) redisTemplate.opsForValue().get(key); } public String getRedisKeyByDeviceCode(String deviceCode){ @@ -73,8 +90,8 @@ public class RedisSessionComponent { } public void regDevice(ChannelContext channelContext, String deviceCode, String deviceType){ - IotDeviceSessionForRedisContext deviceContext = getByDeviceCode(deviceCode); - if(deviceContext != null && Tio.getByBsId(channelContext.getTioConfig(), deviceCode) != null){ + IotDeviceSessionForRedisContext clientContext = getByDeviceCode(deviceCode); + if(clientContext != null && Tio.getByBsId(channelContext.getTioConfig(), deviceCode) != null){ return; } @@ -83,6 +100,7 @@ public class RedisSessionComponent { String deviceId = IotUtils.transDeviceCodeToId(deviceCode); DeviceInfomationDO deviceInfo = deviceInfomationService.getDeviceInfomation(Long.valueOf(deviceId)); IotDeviceSessionForRedisContext context = new IotDeviceSessionForRedisContext(); + context.setChannelId(channelContext.getId()); context.setDeviceId(deviceId); context.setDeviceType(deviceType); context.setDeviceCode(deviceCode); @@ -111,5 +129,109 @@ public class RedisSessionComponent { } + public void regClient(ChannelContext channelContext){ + String channelId = channelContext.getId(); + IotClientSessionForRedisContext context = getByClientChannelId(channelId); + if(context != null){ + return; + } + + TenantContextHolder.setIgnore(true); + try{ + context = new IotClientSessionForRedisContext(); + + String realClientIp = channelContext.get("realClientIp") == null ? channelContext.getClientNode().getIp() : channelContext.get("realClientIp").toString(); + String realClientPort = channelContext.get("realClientPort") == null ? channelContext.getClientNode().getPort() + "" : channelContext.get("realClientPort").toString(); + String connectType = channelContext.get(ChannelContextConstant.CONNECT_TYPE) == null ? "" : channelContext.get(ChannelContextConstant.CONNECT_TYPE).toString(); + String userName = channelContext.get("user-real-name") == null ? "无" : channelContext.get("user-real-name").toString(); +// ChannelContext clientChanelContext = Tio.getByBsId(channelContext.getTioConfig(), deviceCode); +// if(clientChanelContext != null){ +// context.setClientProtocol(clientChanelContext.getClientNode().getProtocol()); +// context.setProxyIp(clientChanelContext.getClientNode().getIp()); +// context.setProxyPort(clientChanelContext.getClientNode().getPort()); +// context.setProxyProtocol(clientChanelContext.getClientNode().getProtocol()); +// } + context.setChannelId(channelId); + context.setControlRealName(userName); + context.setClientIp(realClientIp); + context.setClientPort(Integer.parseInt(realClientPort)); + context.setConnectType(connectType); + + updateClientCommonAttrs(context, channelContext); + + String redisKey = getClientRedisKeyByChannelId(channelId); + redisTemplate.opsForValue().set(redisKey, context, timeoutSeconds, TimeUnit.SECONDS); + //todo 客户端注册事件 +// IotDeviceRegisterPublisher.publishEvent(deviceCode, deviceType); + }catch (Exception e){ + e.printStackTrace(); + log.error("设备注册异常:{}", e.getMessage()); + }finally { + TenantContextHolder.clear(); + } + + } + + + public void updateClient(IotClientSessionForRedisContext context, ChannelContext channelContext){ + String channelId = channelContext.getId(); + updateClientCommonAttrs(context, channelContext); + String redisKey = getClientRedisKeyByChannelId(channelId); + redisTemplate.opsForValue().set(redisKey, context, timeoutSeconds, TimeUnit.SECONDS); + } + + private void updateClientCommonAttrs(IotClientSessionForRedisContext context, ChannelContext channelContext){ + ChannelStat stat = channelContext.stat; + context.setGmtCreate(new Date(stat.getTimeCreated())); + context.setDecodeFailCount(stat.getDecodeFailCount()); + context.setReceivedBytes(stat.getReceivedBytes()); + context.setReceivedPackets(stat.getReceivedPackets()); + context.setReceivedTcps(stat.getReceivedTcps()); + context.setBytesPerTcpReceive(stat.getBytesPerTcpReceive()); + context.setPacketsPerTcpReceive(stat.getPacketsPerTcpReceive()); + context.setHandledBytes(stat.getHandledBytes()); + context.setHandledPackets(stat.getHandledPackets()); + context.setSentBytes(stat.getSentBytes()); + context.setSentPackets(stat.getSentPackets()); + context.setLastCommunicateTime(new Date()); + } + + public String getClientRedisKeyByChannelId(String channelId){ + String redisKey = IotClientSessionForRedisContext.DEFAULT_CLIENT_SESSION_CONTEXT_KEY + "-" + channelId; + return redisKey; + } + + + public IotClientSessionForRedisContext getByClientChannelId(String channelId){ + String redisKey = getClientRedisKeyByChannelId(channelId); + return getClientByKey(redisKey); + } + + + /** + * 使用Scan方式查询key(推荐用于大数据量) + * 避免使用KEYS命令导致Redis阻塞 + */ + public Set scanKeys(String matchPattern) { + Set keys = new HashSet<>(); + try { + redisTemplate.execute((RedisCallback) connection -> { + try (Cursor cursor = connection.scan( + ScanOptions.scanOptions() + .match(matchPattern) + .count(1000) // 每次扫描的数量 + .build())) { + while (cursor.hasNext()) { + keys.add(new String(cursor.next())); + } + } + return null; + }); + } catch (Exception e) { + e.printStackTrace(); + } + return keys; + } + } diff --git a/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/device/handler/IotDeviceBalanceHandler.java b/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/device/handler/IotDeviceBalanceHandler.java index 75c8bc9..b4da687 100644 --- a/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/device/handler/IotDeviceBalanceHandler.java +++ b/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/device/handler/IotDeviceBalanceHandler.java @@ -1,14 +1,11 @@ package com.zt.plat.module.qms.iot.tcpserver.device.handler; -import com.zt.plat.module.qms.iot.tcpserver.core.IotDeviceSessionForRedisContext; +import com.zt.plat.module.qms.iot.tcpserver.core.*; import com.zt.plat.module.qms.iot.tcpserver.device.RedisSessionComponent; import com.alibaba.fastjson.JSONObject; import com.zt.plat.module.qms.iot.tcpserver.IotDeviceType; import com.zt.plat.module.qms.iot.tcpserver.IotPacket; import com.zt.plat.module.qms.iot.tcpserver.IotUtils; -import com.zt.plat.module.qms.iot.tcpserver.core.ClientType; -import com.zt.plat.module.qms.iot.tcpserver.core.Command; -import com.zt.plat.module.qms.iot.tcpserver.core.IotDeviceSessionContext; import com.zt.plat.module.qms.iot.tcpserver.handler.IotDataHander; import com.zt.plat.module.qms.iot.tcpserver.publisher.BalanceDataPublisher; import com.fhs.common.spring.SpringContextUtil; @@ -21,12 +18,14 @@ import tech.zzjc.tio.cluster.TioClusterVo; import tech.zzjc.tio.core.ChannelContext; import tech.zzjc.tio.core.Tio; import tech.zzjc.tio.core.TioConfig; +import tech.zzjc.tio.core.stat.ChannelStat; import tech.zzjc.tio.utils.SystemTimer; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collections; +import java.util.Date; import java.util.List; import static com.zt.plat.module.qms.iot.tcpserver.core.GroupConstant.CAA_ALL_CLIENT; @@ -52,8 +51,10 @@ public class IotDeviceBalanceHandler implements IotDataHander { @Override public Object hander(String msgId, String data, ChannelContext channelContext) throws Exception { //获取客户端连接信息 + ChannelStat stat = channelContext.stat; String realClientIp = channelContext.get("realClientIp") == null ? channelContext.getClientNode().getIp() : channelContext.get("realClientIp").toString(); String realClientPort = channelContext.get("realClientPort") == null ? channelContext.getClientNode().getPort() + "" : channelContext.get("realClientPort").toString(); + String connectType = channelContext.get(ChannelContextConstant.CONNECT_TYPE) == null ? "" : channelContext.get(ChannelContextConstant.CONNECT_TYPE).toString(); String realClient = realClientIp + ":" + realClientPort; //通过sessionContext传入设备ID IotDeviceSessionContext sessionContext = (IotDeviceSessionContext) channelContext.get(IotDeviceSessionContext.DEFAULT_DEVICE_SESSION_CONTEXT_KEY); @@ -111,7 +112,30 @@ public class IotDeviceBalanceHandler implements IotDataHander { weightUnit = deviceContext.getBalanceWeightUnit() == null ? "" :deviceContext.getBalanceWeightUnit(); } - //设置末次天平正确数据时间 + //更新deviceContext信息 + ChannelContext deviceChannelContext = Tio.getByBsId(channelContext.getTioConfig(), deviceCode); + if(deviceChannelContext != null){ + deviceContext.setClientProtocol(deviceChannelContext.getClientNode().getProtocol()); + deviceContext.setProxyIp(deviceChannelContext.getClientNode().getIp()); + deviceContext.setProxyPort(deviceChannelContext.getClientNode().getPort()); + deviceContext.setProxyProtocol(deviceChannelContext.getClientNode().getProtocol()); + } + deviceContext.setClientIp(realClientIp); + deviceContext.setClientPort(Integer.parseInt(realClientPort)); + deviceContext.setConnectType(connectType); + deviceContext.setDecodeFailCount(stat.getDecodeFailCount()); + deviceContext.setGmtCreate(new Date(stat.getTimeCreated())); + deviceContext.setReceivedBytes(stat.getReceivedBytes()); + deviceContext.setReceivedPackets(stat.getReceivedPackets()); + deviceContext.setReceivedTcps(stat.getReceivedTcps()); + deviceContext.setBytesPerTcpReceive(stat.getBytesPerTcpReceive()); + deviceContext.setPacketsPerTcpReceive(stat.getPacketsPerTcpReceive()); + deviceContext.setHandledBytes(stat.getHandledBytes()); + deviceContext.setHandledPackets(stat.getHandledPackets()); + deviceContext.setSentBytes(stat.getSentBytes()); + deviceContext.setSentPackets(stat.getSentPackets()); + + deviceContext.setLastBalanceDataTime(SystemTimer.currTime); redisSessionComponent.update(deviceContext); //租户ID @@ -140,7 +164,8 @@ public class IotDeviceBalanceHandler implements IotDataHander { TioClusterConfig tioClusterConfig = tioConfig.getTioClusterConfig(); TioClusterVo iotClusterVo = new TioClusterVo(packet); iotClusterVo.setGroup(CAA_ALL_CLIENT); - tioClusterConfig.publish(iotClusterVo); +// if(!"0.00".equals(weightData)) + tioClusterConfig.publish(iotClusterVo); return null; } diff --git a/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/handler/IotClientHandler.java b/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/handler/IotClientHandler.java index c6d0786..dbbf0d4 100644 --- a/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/handler/IotClientHandler.java +++ b/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/handler/IotClientHandler.java @@ -10,6 +10,8 @@ import com.zt.plat.module.qms.iot.tcpserver.consoleclient.IotConsoleClientCheckC import com.zt.plat.module.qms.iot.tcpserver.consoleclient.IotConsoleClientRegisterHander; import com.zt.plat.module.qms.iot.tcpserver.core.ChannelContextConstant; import com.zt.plat.module.qms.iot.tcpserver.core.Command; +import com.zt.plat.module.qms.iot.tcpserver.core.IotClientSessionForRedisContext; +import com.zt.plat.module.qms.iot.tcpserver.device.RedisSessionComponent; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @@ -42,6 +44,7 @@ public class IotClientHandler implements IotHander { @Autowired private IotCaaClientControlDeviceHander iotCaaClientControlDeviceHander; + @Autowired public RedisSessionComponent redisSessionComponent; public IotClientHandler() { } @@ -59,6 +62,8 @@ public class IotClientHandler implements IotHander { String cmd = jsonObject.getString("cmd"); Command command = Command.getByName(cmd); IotDataHander iotDataHander = null; + String channelId = channelContext.getId(); + IotClientSessionForRedisContext iotClientSessionForRedisContext = redisSessionComponent.getByClientChannelId(channelId); if ("consoleClient".equals(clientType)) { channelContext.set(ChannelContextConstant.CONNECT_TYPE, ChannelContextConstant.CONNECT_CONSOLE_CLIENT); switch (command) { @@ -89,6 +94,13 @@ public class IotClientHandler implements IotHander { log.error("{}, 找不到处理类, 客户端消息ID:{},客户端类型:{},命令:{}", realClient, msgId, clientType, cmd); return null; } + + if(iotClientSessionForRedisContext == null){ + redisSessionComponent.regClient(channelContext); + }else{ + redisSessionComponent.updateClient(iotClientSessionForRedisContext, channelContext); + } + iotDataHander.hander(msgId, jsonObject.getString("data"), channelContext); return null; } diff --git a/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/handler/IotDeviceHandler.java b/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/handler/IotDeviceHandler.java index 8ffcd43..fae14e1 100644 --- a/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/handler/IotDeviceHandler.java +++ b/zt-module-qms/zt-module-qms-server/src/main/java/com/zt/plat/module/qms/iot/tcpserver/handler/IotDeviceHandler.java @@ -53,7 +53,8 @@ public class IotDeviceHandler implements IotHander { String realClientPort = channelContext.get("realClientPort") == null ? channelContext.getClientNode().getPort() + "" : channelContext.get("realClientPort").toString(); String realClient = realClientIp + ":" + realClientPort + ", bsId=" + channelContext.getBsId(); String text = new String(iotPacket.getBody(), IotPacket.CHARSET); -// log.info("{}, 接收到数据:{}", realClient, text); +// if(!text.contains("0.00")) +// log.info("{}, 接收到数据:{}", realClient, text); String[] infos = text.split("\\$"); String reginfo = infos[0]; String[] regs = reginfo.split("-"); @@ -102,7 +103,6 @@ public class IotDeviceHandler implements IotHander { sessionContext.setDeviceId(IotUtils.transDeviceCodeToId(deviceCode)); return null; } - if(deviceContext == null || Tio.getByBsId(channelContext.getTioConfig(), deviceCode) == null){ redisSessionComponent.regDevice(channelContext, deviceCode, deviceType); }else{