feat:tio集群逻辑优化

This commit is contained in:
FCL
2025-11-04 18:05:11 +08:00
parent e75217009f
commit 536f971e8e
20 changed files with 986 additions and 313 deletions

View File

@@ -1,10 +1,13 @@
package com.zt.plat.module.qms.iot.controller.admin; package com.zt.plat.module.qms.iot.controller.admin;
import com.alibaba.fastjson.JSONObject;
import com.zt.plat.framework.common.pojo.CommonResult; import com.zt.plat.framework.common.pojo.CommonResult;
import com.zt.plat.module.qms.iot.tcpserver.core.ChannelContextConstant; import com.zt.plat.module.qms.iot.service.IotConnectManagerStatsService;
import com.zt.plat.module.qms.iot.tcpserver.core.IotDeviceSessionContext; import com.zt.plat.module.qms.iot.tcpserver.cluster.TioServerMessageConfig;
import com.zt.plat.module.qms.iot.tcpserver.cluster.TioServerMessagePublisher;
import com.zt.plat.module.qms.iot.tcpserver.cluster.TioServerMessageVo;
import com.zt.plat.module.qms.iot.tcpserver.core.*;
import com.github.xingfudeshi.knife4j.annotations.ApiOperationSupport; import com.github.xingfudeshi.knife4j.annotations.ApiOperationSupport;
import com.zt.plat.module.qms.iot.tcpserver.core.IotDeviceSessionForRedisContext;
import com.zt.plat.module.qms.iot.tcpserver.device.RedisSessionComponent; import com.zt.plat.module.qms.iot.tcpserver.device.RedisSessionComponent;
import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.tags.Tag; import io.swagger.v3.oas.annotations.tags.Tag;
@@ -13,9 +16,9 @@ import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.env.Environment;
import org.springframework.util.ObjectUtils; import org.springframework.util.ObjectUtils;
import org.springframework.web.bind.annotation.*; import org.springframework.web.bind.annotation.*;
import com.zt.plat.module.qms.iot.tcpserver.core.ChannelStatInfo;
import tech.zzjc.tio.core.ChannelContext; import tech.zzjc.tio.core.ChannelContext;
import tech.zzjc.tio.core.Tio; import tech.zzjc.tio.core.Tio;
import tech.zzjc.tio.core.stat.ChannelStat; import tech.zzjc.tio.core.stat.ChannelStat;
@@ -27,6 +30,8 @@ import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import static java.lang.Thread.sleep;
/** /**
* <b>IotManagerStatsController</b> * <b>IotManagerStatsController</b>
* iot 管理统计 * iot 管理统计
@@ -47,151 +52,255 @@ public class IotConnectManagerStatsController {
@Autowired private TioServerBootstrap tioServerBootstrap; @Autowired private TioServerBootstrap tioServerBootstrap;
@Autowired public RedisSessionComponent redisSessionComponent; @Autowired public RedisSessionComponent redisSessionComponent;
@Autowired private Environment environment;
@Autowired private TioServerMessagePublisher tioServerMessagePublisher;
@Autowired private IotConnectManagerStatsService iotConnectManagerStatsService;
@GetMapping("/all-connect") @GetMapping("/all-connect")
@ApiOperationSupport(order = 1) @ApiOperationSupport(order = 1)
@Operation(summary = "获取当前所有连接") @Operation(summary = "获取当前所有连接")
public CommonResult<?> currentAllConnect(@RequestParam(name = "clientIp", required = false) String clientIp) { public CommonResult<?> currentAllConnect(@RequestParam(name = "clientIp", required = false) String clientIp) {
//获取spring服务端口
String serverPort = environment.getProperty("server.port");
String serverName = environment.getProperty("spring.application.name");
String msg = "当前服务端口:" + serverPort + ",服务名称:" + serverName;
log.error(msg);
List<ChannelStatInfo> list = new ArrayList<>();
Set<String> deviceKeys = redisSessionComponent.scanKeys(IotDeviceSessionContext.DEFAULT_DEVICE_SESSION_CONTEXT_KEY + "*");
Set<String> clientKeys = redisSessionComponent.scanKeys(IotClientSessionContext.DEFAULT_CLIENT_SESSION_CONTEXT_KEY + "*");
for(String key : deviceKeys){
IotDeviceSessionForRedisContext deviceContext = redisSessionComponent.getByKey( key);
String deviceId = deviceContext.getDeviceId();
String bsId = Long.toString(Long.valueOf(deviceId), Character.MAX_RADIX);
ChannelStatInfo info = new ChannelStatInfo();
info.setId(deviceContext.getChannelId());
// info.setId(deviceContext.getDeviceCode());
info.setBsId(bsId);
String controlRealName = ObjectUtils.isEmpty(deviceContext.getControlRealName())? "" : deviceContext.getControlRealName();
String deviceName = deviceContext.getDeviceName();
String deviceCode = deviceContext.getDeviceCode();
// info.setRealName(userRealName);
// info.setRemarks("操作人:" + userRealName);
info.setDeviceId(deviceContext.getDeviceId());
info.setDeviceCode(deviceCode);
info.setDeviceType(deviceContext.getDeviceType());
info.setDeviceName(deviceName);
StringBuilder sb = new StringBuilder();
if (StringUtils.isNoneBlank(deviceName)) {
sb.append("设备名称:").append(deviceName).append(";");
} else {
sb.append("注册号:").append(deviceCode).append(";");
//deviceInfo.append("设备Id:").append(deviceId).append(";");
}
sb.append("控制人:").append(controlRealName);
info.setRemarks(sb.toString());
info.setClientIp(deviceContext.getClientIp());
info.setClientPort(deviceContext.getClientPort());
info.setClientProtocol(deviceContext.getClientProtocol());
info.setProxyIp(deviceContext.getProxyIp());
info.setProxyPort(deviceContext.getProxyPort());
info.setProxyProtocol(deviceContext.getProxyProtocol());
info.setConnectType(deviceContext.getConnectType());
//info.setReConnCount(reconnCount);
info.setDecodeFailCount(deviceContext.getDecodeFailCount());
info.setUserId(deviceContext.getUserId());
// info.setToken(token);
info.setGmtCreate(deviceContext.getGmtCreate());
info.setReceivedBytes(deviceContext.getReceivedBytes());
info.setReceivedPackets(deviceContext.getReceivedPackets());
info.setReceivedTcps(deviceContext.getReceivedTcps());
info.setBytesPerTcpReceive(deviceContext.getBytesPerTcpReceive());
info.setPacketsPerTcpReceive(deviceContext.getPacketsPerTcpReceive());
info.setHandledBytes(deviceContext.getHandledBytes());
info.setHandledPackets(deviceContext.getHandledPackets());
info.setSentBytes(deviceContext.getSentBytes());
info.setSentPackets(deviceContext.getSentPackets());
if(deviceContext.getLastBalanceDataTime() != null)
info.setGmtLatestCommunicate(new Date(deviceContext.getLastBalanceDataTime()));
if(deviceContext.getRegTime() != null)
info.setRegTime(deviceContext.getRegTime());
list.add(info);
}
for(String key : clientKeys){
IotClientSessionForRedisContext clientContext = redisSessionComponent.getClientByKey( key);
ChannelStatInfo info = new ChannelStatInfo();
info.setId(clientContext.getChannelId());
String controlRealName = ObjectUtils.isEmpty(clientContext.getControlRealName())? "" : clientContext.getControlRealName();
// info.setRealName(userRealName);
// info.setRemarks("操作人:" + userRealName);
StringBuilder sb = new StringBuilder();
sb.append("操作人:").append(controlRealName);
info.setRemarks(sb.toString());
info.setClientIp(clientContext.getClientIp());
info.setClientPort(clientContext.getClientPort());
info.setClientProtocol(clientContext.getClientProtocol());
info.setProxyIp(clientContext.getProxyIp());
info.setProxyPort(clientContext.getProxyPort());
info.setProxyProtocol(clientContext.getProxyProtocol());
info.setConnectType(clientContext.getConnectType());
//info.setReConnCount(reconnCount);
info.setDecodeFailCount(clientContext.getDecodeFailCount());
info.setUserId(clientContext.getUserId());
// info.setToken(token);
info.setGmtCreate(clientContext.getGmtCreate());
info.setReceivedBytes(clientContext.getReceivedBytes());
info.setReceivedPackets(clientContext.getReceivedPackets());
info.setReceivedTcps(clientContext.getReceivedTcps());
info.setBytesPerTcpReceive(clientContext.getBytesPerTcpReceive());
info.setPacketsPerTcpReceive(clientContext.getPacketsPerTcpReceive());
info.setHandledBytes(clientContext.getHandledBytes());
info.setHandledPackets(clientContext.getHandledPackets());
info.setSentBytes(clientContext.getSentBytes());
info.setSentPackets(clientContext.getSentPackets());
info.setGmtLatestCommunicate(clientContext.getLastCommunicateTime());
info.setRegTime(clientContext.getRegTime());
list.add(info);
}
SetWithLock<ChannelContext> setWithLock = Tio.getAll(tioServerBootstrap.getServerTioConfig()); SetWithLock<ChannelContext> setWithLock = Tio.getAll(tioServerBootstrap.getServerTioConfig());
Set<ChannelContext> set = null; Set<ChannelContext> set = null;
ReentrantReadWriteLock.ReadLock readLock = setWithLock.getLock().readLock(); ReentrantReadWriteLock.ReadLock readLock = setWithLock.getLock().readLock();
List<ChannelStatInfo> list = null;
try {
readLock.lock();
set = setWithLock.getObj();
list = new ArrayList<>();
for (ChannelContext channelContext : set) { // try {
// readLock.lock();
ChannelStatInfo info = new ChannelStatInfo(); // set = setWithLock.getObj();
ChannelStat stat = channelContext.stat; // for (ChannelContext channelContext : set) {
//
// ChannelStatInfo info = new ChannelStatInfo();
String id = channelContext.getId(); // ChannelStat stat = channelContext.stat;
String bsId = channelContext.getBsId(); //
Date regTime = channelContext.get("regTime") == null ? null : (Date) channelContext.get("regTime"); //
String ip = channelContext.get("realClientIp") == null ? channelContext.getClientNode().getIp() : channelContext.get("realClientIp").toString(); // 客户端IP // String id = channelContext.getId();
int port = channelContext.get("realClientPort") == null ? channelContext.getClientNode().getPort() : Integer.parseInt(channelContext.get("realClientPort").toString()); // 客户端端口 // String bsId = channelContext.getBsId();
String protocol = channelContext.get("realClientProtocol") == null ? channelContext.getClientNode().getProtocol() : channelContext.get("realClientProtocol").toString();//协议 // Date regTime = channelContext.get("regTime") == null ? null : (Date) channelContext.get("regTime");
String ipProxy = channelContext.getProxyClientNode() == null ? "" : channelContext.getProxyClientNode().getIp();//代理 ip // String ip = channelContext.get("realClientIp") == null ? channelContext.getClientNode().getIp() : channelContext.get("realClientIp").toString(); // 客户端IP
int portProxy = channelContext.getProxyClientNode() == null ? -1 : channelContext.getProxyClientNode().getPort();//代理 端口 // int port = channelContext.get("realClientPort") == null ? channelContext.getClientNode().getPort() : Integer.parseInt(channelContext.get("realClientPort").toString()); // 客户端端口
String protocolProxy = channelContext.getProxyClientNode() == null ? "" : channelContext.getProxyClientNode().getProtocol(); //代理协议 // String protocol = channelContext.get("realClientProtocol") == null ? channelContext.getClientNode().getProtocol() : channelContext.get("realClientProtocol").toString();//协议
// String ipProxy = channelContext.getProxyClientNode() == null ? "" : channelContext.getProxyClientNode().getIp();//代理 ip
//连接类型 // int portProxy = channelContext.getProxyClientNode() == null ? -1 : channelContext.getProxyClientNode().getPort();//代理 端口
String connectType = channelContext.get(ChannelContextConstant.CONNECT_TYPE) == null ? "" : channelContext.get(ChannelContextConstant.CONNECT_TYPE).toString(); // String protocolProxy = channelContext.getProxyClientNode() == null ? "" : channelContext.getProxyClientNode().getProtocol(); //代理协议
//
switch (connectType) { // //连接类型
case ChannelContextConstant.CONNECT_CONSOLE_CLIENT://控制客户端 // String connectType = channelContext.get(ChannelContextConstant.CONNECT_TYPE) == null ? "" : channelContext.get(ChannelContextConstant.CONNECT_TYPE).toString();
String userRealName = channelContext.get("user-real-name") == null ? "" : channelContext.get("user-real-name").toString(); // if(connectType.equals(ChannelContextConstant.CONNECT_DEVICE)) //设备
info.setRealName(userRealName); // continue;
info.setRemarks("操作人:" + userRealName); // switch (connectType) {
break; // case ChannelContextConstant.CONNECT_CONSOLE_CLIENT://控制客户端
case ChannelContextConstant.CONNECT_CAA_CLIENT://平板端 // String userRealName = channelContext.get("user-real-name") == null ? "无" : channelContext.get("user-real-name").toString();
String caaUserRealName = channelContext.get("user-real-name") == null ? "" : channelContext.get("user-real-name").toString(); // info.setRealName(userRealName);
info.setRealName(caaUserRealName); // info.setRemarks("操作人:" + userRealName);
info.setRemarks("操作人:" + caaUserRealName); // break;
break; // case ChannelContextConstant.CONNECT_CAA_CLIENT://平板端
case ChannelContextConstant.CONNECT_DEVICE://设备 // String caaUserRealName = channelContext.get("user-real-name") == null ? "无" : channelContext.get("user-real-name").toString();
IotDeviceSessionContext deviceSessionContext = (IotDeviceSessionContext) channelContext.get(IotDeviceSessionContext.DEFAULT_DEVICE_SESSION_CONTEXT_KEY); // info.setRealName(caaUserRealName);
String deviceCode = deviceSessionContext.getDeviceCode(); // info.setRemarks("操作人:" + caaUserRealName);
IotDeviceSessionForRedisContext deviceContext = redisSessionComponent.getByDeviceCode(deviceCode); // break;
String deviceId = ""; // case ChannelContextConstant.CONNECT_DEVICE://设备
String deviceType = ""; // IotDeviceSessionContext deviceSessionContext = (IotDeviceSessionContext) channelContext.get(IotDeviceSessionContext.DEFAULT_DEVICE_SESSION_CONTEXT_KEY);
String deviceName = ""; // String deviceCode = deviceSessionContext.getDeviceCode();
String controlRealName = ""; // IotDeviceSessionForRedisContext deviceContext = redisSessionComponent.getByDeviceCode(deviceCode);
if(deviceContext != null){ // String deviceId = "";
deviceId = deviceContext.getDeviceId(); // String deviceType = "";
deviceType = deviceContext.getDeviceType(); // String deviceName = "";
deviceName = deviceContext.getDeviceName(); // String controlRealName = "";
controlRealName = deviceContext.getControlRealName(); // if(deviceContext != null){
} // deviceId = deviceContext.getDeviceId();
info.setDeviceId(deviceId); // deviceType = deviceContext.getDeviceType();
info.setDeviceCode(deviceCode); // deviceName = deviceContext.getDeviceName();
info.setDeviceType(deviceType); // controlRealName = deviceContext.getControlRealName();
info.setDeviceName(deviceName); // }
info.setControlRealName(controlRealName); // info.setDeviceId(deviceId);
StringBuilder deviceInfo = new StringBuilder(); // info.setDeviceCode(deviceCode);
if (StringUtils.isNoneBlank(deviceName)) { // info.setDeviceType(deviceType);
deviceInfo.append("设备名称:").append(deviceName).append(";"); // info.setDeviceName(deviceName);
} else { // info.setControlRealName(controlRealName);
deviceInfo.append("注册号:").append(deviceCode).append(";"); // StringBuilder deviceInfo = new StringBuilder();
//deviceInfo.append("设备Id:").append(deviceId).append(";"); // if (StringUtils.isNoneBlank(deviceName)) {
} // deviceInfo.append("设备名称:").append(deviceName).append(";");
deviceInfo.append("控制人:").append(controlRealName); // } else {
info.setRemarks(deviceInfo.toString()); // deviceInfo.append("注册号:").append(deviceCode).append(";");
break; // //deviceInfo.append("设备Id:").append(deviceId).append(";");
// }
default: // deviceInfo.append("控制人:").append(controlRealName);
break; // info.setRemarks(deviceInfo.toString());
} // break;
//
// default:
int decodeFailCount = stat.getDecodeFailCount(); // 解码异常次数 // break;
String userId = channelContext.userid; // 用户ID // }
String token = channelContext.getToken(); // 用户token //
long timeStart = stat.getTimeCreated(); // 统计开始时间 //
// 统计时长 // int decodeFailCount = stat.getDecodeFailCount(); // 解码异常次数
AtomicLong receivedBytes = stat.getReceivedBytes(); // 已接收字节 // String userId = channelContext.userid; // 用户ID
AtomicLong receivedPackets = stat.getReceivedPackets(); // 已接收业务包 // String token = channelContext.getToken(); // 用户token
AtomicLong receivedTcps = stat.getReceivedTcps(); // 已接收TCP包 // long timeStart = stat.getTimeCreated(); // 统计开始时间
// // 统计时长
double bytesPerTcpReceive = stat.getBytesPerTcpReceive(); // 平均每次TCP包接收字节 // AtomicLong receivedBytes = stat.getReceivedBytes(); // 已接收字节
double packetsPerTcpReceive = stat.getPacketsPerTcpReceive(); // 平均每次TCP包接收业务包 // AtomicLong receivedPackets = stat.getReceivedPackets(); // 已接收业务包
// AtomicLong receivedTcps = stat.getReceivedTcps(); // 已接收TCP包
AtomicLong handledBytes = stat.getHandledBytes(); // 已处理字节 //
AtomicLong handledPackets = stat.getHandledPackets(); // 已处理业务包 // double bytesPerTcpReceive = stat.getBytesPerTcpReceive(); // 平均每次TCP包接收的字节数
// double packetsPerTcpReceive = stat.getPacketsPerTcpReceive(); // 平均每次TCP包接收的业务包
AtomicLong sentBytes = stat.getSentBytes(); // 已发送字节数 //
AtomicLong sentPackets = stat.getSentPackets(); // 已发送业务包 // AtomicLong handledBytes = stat.getHandledBytes(); // 已处理字节
// AtomicLong handledPackets = stat.getHandledPackets(); // 已处理业务包
long timeLatestReceivedMsg = stat.getLatestTimeOfReceivedByte(); //
long timeLatestSentMsg = stat.getLatestTimeOfSentPacket(); // AtomicLong sentBytes = stat.getSentBytes(); // 已发送字节数
long latestCommunicateTime = Math.max(timeLatestReceivedMsg, timeLatestSentMsg); // 最近一次通信时间 // AtomicLong sentPackets = stat.getSentPackets(); // 已发送业务包
//
info.setId(id); // long timeLatestReceivedMsg = stat.getLatestTimeOfReceivedByte();
info.setBsId(bsId); // long timeLatestSentMsg = stat.getLatestTimeOfSentPacket();
info.setClientIp(ip); // long latestCommunicateTime = Math.max(timeLatestReceivedMsg, timeLatestSentMsg); // 最近一次通信时间
info.setClientPort(port); //
info.setClientProtocol(protocol); // info.setId(id);
info.setProxyIp(ipProxy); // info.setBsId(bsId);
info.setProxyPort(portProxy); // info.setClientIp(ip);
info.setProxyProtocol(protocolProxy); // info.setClientPort(port);
info.setConnectType(connectType); // info.setClientProtocol(protocol);
//info.setReConnCount(reconnCount); // info.setProxyIp(ipProxy);
info.setDecodeFailCount(decodeFailCount); // info.setProxyPort(portProxy);
info.setUserId(userId); // info.setProxyProtocol(protocolProxy);
info.setToken(token); // info.setConnectType(connectType);
info.setGmtCreate(new Date(timeStart)); // //info.setReConnCount(reconnCount);
info.setReceivedBytes(receivedBytes); // info.setDecodeFailCount(decodeFailCount);
info.setReceivedPackets(receivedPackets); // info.setUserId(userId);
info.setReceivedTcps(receivedTcps); // info.setToken(token);
info.setBytesPerTcpReceive(bytesPerTcpReceive); // info.setGmtCreate(new Date(timeStart));
info.setPacketsPerTcpReceive(packetsPerTcpReceive); // info.setReceivedBytes(receivedBytes);
info.setHandledBytes(handledBytes); // info.setReceivedPackets(receivedPackets);
info.setHandledPackets(handledPackets); // info.setReceivedTcps(receivedTcps);
info.setSentBytes(sentBytes); // info.setBytesPerTcpReceive(bytesPerTcpReceive);
info.setSentPackets(sentPackets); // info.setPacketsPerTcpReceive(packetsPerTcpReceive);
info.setGmtLatestCommunicate(new Date(latestCommunicateTime)); // info.setHandledBytes(handledBytes);
info.setRegTime(regTime); // info.setHandledPackets(handledPackets);
// info.setSentBytes(sentBytes);
list.add(info); // info.setSentPackets(sentPackets);
// info.setGmtLatestCommunicate(new Date(latestCommunicateTime));
log.info("Current Client Info : [{}].", info); // info.setRegTime(regTime);
} //
} catch (Throwable e) { // list.add(info);
log.error("获取在线连接失败!", e); //
} finally { //// log.info("Current Client Info : [{}].", info);
readLock.unlock(); // }
} // } catch (Throwable e) {
// log.error("获取在线连接失败!", e);
// } finally {
// readLock.unlock();
// }
List<ChannelStatInfo> resultList = list.stream().filter(f -> { // List<ChannelStatInfo> resultList = list.stream().filter(f -> {
if (StringUtils.isNotBlank(clientIp)) { // if (StringUtils.isNotBlank(clientIp)) {
return clientIp.equals(f.getClientIp()); // return clientIp.equals(f.getClientIp());
} // }
return true; // return true;
}).sorted(Comparator.comparing(ChannelStatInfo::getClientIp).thenComparing(ChannelStatInfo::getConnectType).thenComparing(ChannelStatInfo::getClientPort)) // }).sorted(Comparator.comparing(ChannelStatInfo::getClientIp).thenComparing(ChannelStatInfo::getConnectType).thenComparing(ChannelStatInfo::getClientPort)).collect(Collectors.toList());
.collect(Collectors.toList()); return CommonResult.success(list);
return CommonResult.success(resultList);
} }
/** /**
@@ -202,17 +311,8 @@ public class IotConnectManagerStatsController {
@PostMapping("/close") @PostMapping("/close")
@ApiOperationSupport(order = 6) @ApiOperationSupport(order = 6)
@Operation(summary = "关闭连接") @Operation(summary = "关闭连接")
public CommonResult<?> close(@RequestParam(name="id",required=true) String id) { public CommonResult<?> close(@RequestParam(name="id",required=true) String id) throws InterruptedException {
ChannelContext channelContext = Tio.getByChannelContextId(tioServerBootstrap.getServerTioConfig(), id); return iotConnectManagerStatsService.close(id, "0");
IotDeviceSessionContext sessionContext = (IotDeviceSessionContext) channelContext.get(IotDeviceSessionContext.DEFAULT_DEVICE_SESSION_CONTEXT_KEY);
if(sessionContext != null){
String deviceId = sessionContext.getDeviceId();
if(!ObjectUtils.isEmpty(deviceId))
redisSessionComponent.deleteByDeviceId(deviceId);
}
Tio.IpBlacklist.clear();//清空全局黑名单
Tio.close(channelContext, "控制端主动关闭连接");
return CommonResult.success("ok");
} }
/** /**
@@ -223,20 +323,17 @@ public class IotConnectManagerStatsController {
@PostMapping("/clearDeviceControl") @PostMapping("/clearDeviceControl")
@ApiOperationSupport(order = 7) @ApiOperationSupport(order = 7)
@Operation(summary = "清除设备控制") @Operation(summary = "清除设备控制")
public CommonResult<?> clearDeviceControl(@RequestParam(name="id",required=true) String id) { public CommonResult<?> clearDeviceControl(@RequestParam(name="id") String id) throws InterruptedException {
ChannelContext channelContext = Tio.getByChannelContextId(tioServerBootstrap.getServerTioConfig(), id); return iotConnectManagerStatsService.clearDeviceControl(id, "0");
Tio.IpBlacklist.clear();//清空全局黑名单 }
if(channelContext != null) {
IotDeviceSessionContext deviceSessionContext = (IotDeviceSessionContext) channelContext.get(IotDeviceSessionContext.DEFAULT_DEVICE_SESSION_CONTEXT_KEY); @RequestMapping("/testMsg")
String deviceId = deviceSessionContext.getDeviceId(); public CommonResult<?> testMsg() {
if(!ObjectUtils.isEmpty(deviceId)){
redisSessionComponent.clearControlByDeviceId(deviceId, channelContext); TioServerMessageVo vo = new TioServerMessageVo();
} vo.setServerClusterMsgType("type1");
//设置控制点的channelContext vo.setGroup("aaaa");
// deviceSessionContext.setControlChannelContext(null); tioServerMessagePublisher.publish( vo);
// deviceSessionContext.setControlRealName("");
// channelContext.set(ChannelContextConstant.CONTROL_DEVICE_ID, "");
}
return CommonResult.success("ok"); return CommonResult.success("ok");
} }
} }

View File

@@ -0,0 +1,80 @@
package com.zt.plat.module.qms.iot.service;
import com.zt.plat.framework.common.pojo.CommonResult;
import com.zt.plat.module.qms.iot.tcpserver.cluster.TioServerMessageConfig;
import com.zt.plat.module.qms.iot.tcpserver.cluster.TioServerMessagePublisher;
import com.zt.plat.module.qms.iot.tcpserver.cluster.TioServerMessageVo;
import com.zt.plat.module.qms.iot.tcpserver.core.IotDeviceSessionContext;
import com.zt.plat.module.qms.iot.tcpserver.device.RedisSessionComponent;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Service;
import org.springframework.util.ObjectUtils;
import org.springframework.web.bind.annotation.RequestParam;
import tech.zzjc.tio.core.ChannelContext;
import tech.zzjc.tio.core.Tio;
import tech.zzjc.tio.starter.TioServerBootstrap;
import static java.lang.Thread.sleep;
@Service
public class IotConnectManagerStatsService {
@Autowired
private TioServerBootstrap tioServerBootstrap;
@Autowired public RedisSessionComponent redisSessionComponent;
@Autowired private TioServerMessagePublisher tioServerMessagePublisher;
public CommonResult<?> close(String id, String fromSubscribe) throws InterruptedException {
ChannelContext channelContext = Tio.getByChannelContextId(tioServerBootstrap.getServerTioConfig(), id);
if(channelContext == null){
if("1".equals(fromSubscribe))
return CommonResult.success("ok");
//广播消息处理
TioServerMessageVo message = new TioServerMessageVo();
message.setServerClusterMsgType(TioServerMessageConfig.MESSAGE_TYPE_CLOSE_MANUAL);
message.setChannelId(id);
tioServerMessagePublisher.publish(message);
//延迟0.5秒返回
sleep(500);
return CommonResult.success("ok");
}
IotDeviceSessionContext sessionContext = (IotDeviceSessionContext) channelContext.get(IotDeviceSessionContext.DEFAULT_DEVICE_SESSION_CONTEXT_KEY);
String deviceId = sessionContext.getDeviceId();
if(!ObjectUtils.isEmpty(deviceId))
redisSessionComponent.deleteByDeviceId(deviceId);
Tio.IpBlacklist.clear();//清空全局黑名单
Tio.close(channelContext, "控制端主动关闭连接");
return CommonResult.success("ok");
}
public CommonResult<?> clearDeviceControl(String id, String fromSubscribe) throws InterruptedException {
ChannelContext channelContext = Tio.getByChannelContextId(tioServerBootstrap.getServerTioConfig(), id);
Tio.IpBlacklist.clear();//清空全局黑名单
if(channelContext == null){
if("1".equals(fromSubscribe))
return CommonResult.success("ok");
//广播消息处理
TioServerMessageVo message = new TioServerMessageVo();
message.setServerClusterMsgType(TioServerMessageConfig.MESSAGE_TYPE_CLEAR_MANUAL);
message.setChannelId(id);
tioServerMessagePublisher.publish(message);
//延迟0.5秒返回
sleep(500);
return CommonResult.success("ok");
}
IotDeviceSessionContext deviceSessionContext = (IotDeviceSessionContext) channelContext.get(IotDeviceSessionContext.DEFAULT_DEVICE_SESSION_CONTEXT_KEY);
String deviceId = deviceSessionContext.getDeviceId();
if(!ObjectUtils.isEmpty(deviceId)){
redisSessionComponent.clearControlByDeviceId(deviceId, channelContext);
}
// if(channelContext != null) {
//设置控制点的channelContext
// deviceSessionContext.setControlChannelContext(null);
// deviceSessionContext.setControlRealName("");
// channelContext.set(ChannelContextConstant.CONTROL_DEVICE_ID, "");
// }
return CommonResult.success("ok");
}
}

View File

@@ -86,7 +86,6 @@ public class IotServerAioListener implements ServerAioListener {
if(!ObjectUtils.isEmpty(deviceCode)){ if(!ObjectUtils.isEmpty(deviceCode)){
redisSessionComponent.clearControlByDeviceCode(deviceCode.toString(), channelContext); redisSessionComponent.clearControlByDeviceCode(deviceCode.toString(), channelContext);
} }
//解绑业务ID //解绑业务ID
Tio.unbindBsId(channelContext); Tio.unbindBsId(channelContext);
//解绑群组 //解绑群组

View File

@@ -1,6 +1,9 @@
package com.zt.plat.module.qms.iot.tcpserver.caaclient; package com.zt.plat.module.qms.iot.tcpserver.caaclient;
import com.zt.plat.module.qms.iot.tcpserver.IotUtils; import com.zt.plat.module.qms.iot.tcpserver.IotUtils;
import com.zt.plat.module.qms.iot.tcpserver.cluster.TioServerMessageConfig;
import com.zt.plat.module.qms.iot.tcpserver.cluster.TioServerMessagePublisher;
import com.zt.plat.module.qms.iot.tcpserver.cluster.TioServerMessageVo;
import com.zt.plat.module.qms.iot.tcpserver.core.*; import com.zt.plat.module.qms.iot.tcpserver.core.*;
import com.zt.plat.module.qms.iot.tcpserver.core.ChannelContextConstant; import com.zt.plat.module.qms.iot.tcpserver.core.ChannelContextConstant;
import com.zt.plat.module.qms.iot.tcpserver.core.ClientType; import com.zt.plat.module.qms.iot.tcpserver.core.ClientType;
@@ -12,6 +15,7 @@ import com.alibaba.fastjson.JSONObject;
import com.zt.plat.module.qms.iot.tcpserver.IotPacket; import com.zt.plat.module.qms.iot.tcpserver.IotPacket;
import com.zt.plat.module.qms.iot.tcpserver.caaclient.bean.ControlDevice; import com.zt.plat.module.qms.iot.tcpserver.caaclient.bean.ControlDevice;
import com.zt.plat.module.qms.iot.tcpserver.handler.IotDataHander; import com.zt.plat.module.qms.iot.tcpserver.handler.IotDataHander;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
@@ -22,8 +26,10 @@ import tech.zzjc.tio.core.ChannelContext;
import tech.zzjc.tio.core.ChannelContextFilter; import tech.zzjc.tio.core.ChannelContextFilter;
import tech.zzjc.tio.core.Tio; import tech.zzjc.tio.core.Tio;
import tech.zzjc.tio.core.TioConfig; import tech.zzjc.tio.core.TioConfig;
import tech.zzjc.tio.core.stat.ChannelStat;
import java.nio.charset.Charset; import java.nio.charset.Charset;
import java.util.Date;
import static com.zt.plat.module.qms.iot.tcpserver.core.GroupConstant.CAA_ALL_CLIENT; import static com.zt.plat.module.qms.iot.tcpserver.core.GroupConstant.CAA_ALL_CLIENT;
@@ -42,17 +48,24 @@ import static com.zt.plat.module.qms.iot.tcpserver.core.GroupConstant.CAA_ALL_CL
public class IotCaaClientControlDeviceHander implements IotDataHander { public class IotCaaClientControlDeviceHander implements IotDataHander {
@Autowired public RedisSessionComponent redisSessionComponent; @Autowired public RedisSessionComponent redisSessionComponent;
@Autowired
private TioServerMessagePublisher tioServerMessagePublisher;
@Override @Override
public Object hander(String msgId, String data, ChannelContext channelContext) throws Exception { public Object hander(String msgId, String data, ChannelContext channelContext) throws Exception {
ControlDevice controlDeviceOriginal = JSON.parseObject(data, ControlDevice.class);
ControlDevice controlDevice = JSON.parseObject(data, ControlDevice.class); ControlDevice controlDevice = JSON.parseObject(data, ControlDevice.class);
//获取客户端连接信息 //获取客户端连接信息
ChannelStat stat = channelContext.stat;
String realClientIp = channelContext.get("realClientIp") == null ? channelContext.getClientNode().getIp() : channelContext.get("realClientIp").toString(); String realClientIp = channelContext.get("realClientIp") == null ? channelContext.getClientNode().getIp() : channelContext.get("realClientIp").toString();
String realClientPort = channelContext.get("realClientPort") == null ? channelContext.getClientNode().getPort() + "" : channelContext.get("realClientPort").toString(); String realClientPort = channelContext.get("realClientPort") == null ? channelContext.getClientNode().getPort() + "" : channelContext.get("realClientPort").toString();
String deviceId = controlDevice.getDeviceId();
String realClient = realClientIp + ":" + realClientPort; String realClient = realClientIp + ":" + realClientPort;
log.info("{}, APP客户端控制设备 {}", realClient, controlDevice); log.info("{}, APP客户端控制设备 {}", realClient, controlDevice);
String deviceId = controlDevice.getDeviceId(); JSONObject json = null;
IotPacket iotPacket = new IotPacket();
if (StringUtils.isBlank(deviceId)) { if (StringUtils.isBlank(deviceId)) {
JSONObject json = null;
json = new JSONObject(); json = new JSONObject();
json.put("msgId", msgId); json.put("msgId", msgId);
json.put("cmd", Command.REPLY_CONTROL_DEVICE.getName()); json.put("cmd", Command.REPLY_CONTROL_DEVICE.getName());
@@ -60,17 +73,16 @@ public class IotCaaClientControlDeviceHander implements IotDataHander {
controlDevice.setSuccess(false); controlDevice.setSuccess(false);
controlDevice.setMsg("设备解析为空!"); controlDevice.setMsg("设备解析为空!");
json.put("data", controlDevice); json.put("data", controlDevice);
IotPacket iotPacket = new IotPacket(); iotPacket = new IotPacket();
iotPacket.setBody((json.toJSONString() + "\r\n").getBytes(Charset.forName(IotPacket.CHARSET))); iotPacket.setBody((json.toJSONString() + "\r\n").getBytes(Charset.forName(IotPacket.CHARSET)));
Tio.send(channelContext, iotPacket); Tio.send(channelContext, iotPacket);
return null; return null;
} }
String deviceCode = IotUtils.transDeviceIdToCode(deviceId); IotDeviceSessionForRedisContext deviceContext = redisSessionComponent.getByDeviceId(deviceId);
ChannelContext deviceChannelContext = Tio.getByBsId(channelContext.getTioConfig(), deviceCode); if (deviceContext == null) {
//如果是订阅消息,不再广播
IotPacket iotPacket = new IotPacket(); if("1".equals(controlDevice.getFromSubscribe()))
JSONObject json = null; return null;
if (deviceChannelContext == null) {
json = new JSONObject(); json = new JSONObject();
json.put("msgId", msgId); json.put("msgId", msgId);
json.put("cmd", Command.REPLY_CONTROL_DEVICE.getName()); json.put("cmd", Command.REPLY_CONTROL_DEVICE.getName());
@@ -82,11 +94,31 @@ public class IotCaaClientControlDeviceHander implements IotDataHander {
Tio.send(channelContext, iotPacket); Tio.send(channelContext, iotPacket);
return null; return null;
} }
String deviceControlUserId = deviceContext.getUserId();
String curUserId = controlDevice.getControlUserId();
String deviceCode = IotUtils.transDeviceIdToCode(deviceId);
ChannelContext deviceChannelContext = Tio.getByBsId(channelContext.getTioConfig(), deviceCode);
//如果设备不在本服务实例,发送广播到其他服务处理
if(deviceChannelContext == null){
String channelId = deviceContext.getChannelId();
TioServerMessageVo message = new TioServerMessageVo();
message.setServerClusterMsgType(TioServerMessageConfig.MESSAGE_TYPE_CONTROL);
message.setChannelId(channelId);
// message.setChannelContext(channelContext);
controlDeviceOriginal.setFromSubscribe("1");
iotPacket.setBody((JSONObject.toJSONString(controlDeviceOriginal) + "\r\n").getBytes(Charset.forName(IotPacket.CHARSET)));
message.setPacket(iotPacket);
tioServerMessagePublisher.publish(message);
return null;
}
TioConfig tioConfig = deviceChannelContext.getTioConfig();
if (controlDevice.getIsControl()) { if (controlDevice.getIsControl()) {
//控制 //控制
// deviceContext.setControlChannelContext(channelContext); // deviceContext.setControlChannelContext(channelContext);
IotDeviceSessionForRedisContext deviceContext = redisSessionComponent.getByDeviceId(deviceId);
deviceContext.setControlRealName(controlDevice.getControlRealName()); deviceContext.setControlRealName(controlDevice.getControlRealName());
deviceContext.setUserId(curUserId);
channelContext.set(ChannelContextConstant.CONTROL_DEVICE_ID, deviceCode); channelContext.set(ChannelContextConstant.CONTROL_DEVICE_ID, deviceCode);
controlDevice.setMsg("设备被“" + controlDevice.getControlRealName() + "”远程控制!"); controlDevice.setMsg("设备被“" + controlDevice.getControlRealName() + "”远程控制!");
redisSessionComponent.update(deviceContext); redisSessionComponent.update(deviceContext);
@@ -106,18 +138,18 @@ public class IotCaaClientControlDeviceHander implements IotDataHander {
json.put("msgId", msgId); json.put("msgId", msgId);
json.put("cmd", Command.REPLY_CONTROL_DEVICE.getName()); json.put("cmd", Command.REPLY_CONTROL_DEVICE.getName());
json.put("clientType", ClientType.CAA_CLIENT.getName()); json.put("clientType", ClientType.CAA_CLIENT.getName());
controlDevice.setSuccess(true); // controlDevice.setSuccess(true);
json.put("data", controlDevice); // json.put("data", controlDevice);
json.put("data", JSON.parseObject(data, ControlDevice.class));
iotPacket.setBody((json.toJSONString() + "\r\n").getBytes(Charset.forName(IotPacket.CHARSET))); iotPacket.setBody((json.toJSONString() + "\r\n").getBytes(Charset.forName(IotPacket.CHARSET)));
//发送通知到其他服务实例 //发送通知到其他服务实例(其他实例通知各自客户端)
TioConfig tioConfig = deviceChannelContext.getTioConfig();
TioClusterConfig tioClusterConfig = tioConfig.getTioClusterConfig(); TioClusterConfig tioClusterConfig = tioConfig.getTioClusterConfig();
TioClusterVo iotClusterVo = new TioClusterVo(iotPacket); TioClusterVo iotClusterVo = new TioClusterVo(iotPacket);
iotClusterVo.setGroup(CAA_ALL_CLIENT); iotClusterVo.setGroup(CAA_ALL_CLIENT);
tioClusterConfig.publish(iotClusterVo); tioClusterConfig.publish(iotClusterVo);
//发送消息到本服务实例连接的客户端 //发送消息到本服务实例连接的客户端
Tio.sendToGroup(channelContext.getTioConfig(), CAA_ALL_CLIENT, iotPacket, new ChannelContextFilter() { Tio.sendToGroup(tioConfig, CAA_ALL_CLIENT, iotPacket, new ChannelContextFilter() {
@Override @Override
public boolean filter(ChannelContext cc) { public boolean filter(ChannelContext cc) {
if (channelContext.getId().equals(cc.getId())) { if (channelContext.getId().equals(cc.getId())) {
@@ -126,6 +158,9 @@ public class IotCaaClientControlDeviceHander implements IotDataHander {
return true; return true;
}} }}
); );
return null; return null;
} }

View File

@@ -1,16 +1,19 @@
package com.zt.plat.module.qms.iot.tcpserver.caaclient; package com.zt.plat.module.qms.iot.tcpserver.caaclient;
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSON;
import com.fhs.common.spring.SpringContextUtil;
import com.zt.plat.module.qms.iot.tcpserver.IotPacket; import com.zt.plat.module.qms.iot.tcpserver.IotPacket;
import com.zt.plat.module.qms.iot.tcpserver.consoleclient.bean.Register; import com.zt.plat.module.qms.iot.tcpserver.consoleclient.bean.Register;
import com.zt.plat.module.qms.iot.tcpserver.core.ClientType; import com.zt.plat.module.qms.iot.tcpserver.core.ClientType;
import com.zt.plat.module.qms.iot.tcpserver.core.Command; import com.zt.plat.module.qms.iot.tcpserver.core.Command;
import com.zt.plat.module.qms.iot.tcpserver.core.ReplyResult; import com.zt.plat.module.qms.iot.tcpserver.core.ReplyResult;
import com.zt.plat.module.qms.iot.tcpserver.device.RedisSessionComponent;
import com.zt.plat.module.qms.iot.tcpserver.handler.IotDataHander; import com.zt.plat.module.qms.iot.tcpserver.handler.IotDataHander;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import tech.zzjc.tio.core.ChannelContext; import tech.zzjc.tio.core.ChannelContext;
import tech.zzjc.tio.core.Tio; import tech.zzjc.tio.core.Tio;
import tech.zzjc.tio.utils.SystemTimer;
import java.nio.charset.Charset; import java.nio.charset.Charset;
import java.util.Date; import java.util.Date;
@@ -34,11 +37,7 @@ public class IotCaaClientRegisterHander implements IotDataHander {
@Override @Override
public Object hander(String msgId, String data, ChannelContext channelContext) throws Exception { public Object hander(String msgId, String data, ChannelContext channelContext) throws Exception {
Register register = JSON.parseObject(data, Register.class); Register register = JSON.parseObject(data, Register.class);
//获取客户端连接信息
String realClientIp = channelContext.get("realClientIp") == null ? channelContext.getClientNode().getIp() : channelContext.get("realClientIp").toString();
String realClientPort = channelContext.get("realClientPort") == null ? channelContext.getClientNode().getPort() + "" : channelContext.get("realClientPort").toString();
String realClient = realClientIp + ":" + realClientPort;
log.info("{}, APP客户端注册租户ID{}用户ID{}", realClient, register.getTenantId(), register.getUserId());
Tio.bindUser(channelContext, register.getUserId()); Tio.bindUser(channelContext, register.getUserId());
Tio.bindToken(channelContext, register.getUserId()); Tio.bindToken(channelContext, register.getUserId());
channelContext.set("regTime", new Date()); channelContext.set("regTime", new Date());
@@ -49,6 +48,15 @@ public class IotCaaClientRegisterHander implements IotDataHander {
Tio.send(channelContext, resppacket); Tio.send(channelContext, resppacket);
//绑定群组 //绑定群组
Tio.bindGroup(channelContext, CAA_ALL_CLIENT); Tio.bindGroup(channelContext, CAA_ALL_CLIENT);
//在redis记录客户端信息
// RedisSessionComponent redisSessionComponent = (RedisSessionComponent) SpringContextUtil.getBean("redisSessionComponent");
// redisSessionComponent.regClient(channelContext);
//
return null; return null;
} }

View File

@@ -24,7 +24,8 @@ public class ControlDevice implements Serializable {
/** 控制者姓名 **/ /** 控制者姓名 **/
private String controlRealName; private String controlRealName;
private String controlUserId;
/** 是否控制 **/ /** 是否控制 **/
private Boolean isControl; private Boolean isControl;
@@ -33,4 +34,7 @@ public class ControlDevice implements Serializable {
/** 回复app时使用 消息 **/ /** 回复app时使用 消息 **/
private String msg; private String msg;
/** 来自消息订阅 **/
private String fromSubscribe;
} }

View File

@@ -0,0 +1,20 @@
package com.zt.plat.module.qms.iot.tcpserver.cluster;
import org.springframework.stereotype.Component;
import java.util.UUID;
@Component
public class InstanceIdProvider {
private final String instanceId;
public InstanceIdProvider() {
//使用随机 UUID每次重启会变
this.instanceId = UUID.randomUUID().toString();
}
public String getInstanceId() {
return instanceId;
}
}

View File

@@ -0,0 +1,16 @@
package com.zt.plat.module.qms.iot.tcpserver.cluster;
public class TioServerMessageConfig {
public static final String SERVER_CLUSTER_TOPIC_CHANNEL = "zzjc-cluster-tio-server";
public static final String MESSAGE_TYPE_CONTROL = "control"; //设备控制消息
public static final String MESSAGE_TYPE_CLEAR_MANUAL = "clearControlManual"; //手动清除控制
public static final String MESSAGE_TYPE_CLOSE_MANUAL = "closeManual"; //手动断开连接
public static final String MESSAGE_TYPE_NOTICE = "notice"; //通知消息。应该用不到。
}

View File

@@ -0,0 +1,33 @@
package com.zt.plat.module.qms.iot.tcpserver.cluster;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RTopic;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import tech.zzjc.tio.server.ServerTioConfig;
import static com.zt.plat.module.qms.iot.tcpserver.cluster.TioServerMessageConfig.SERVER_CLUSTER_TOPIC_CHANNEL;
@Service
@Slf4j
public class TioServerMessagePublisher {
@Autowired
private RedissonClient redissonClient;
@Autowired
private InstanceIdProvider instanceIdProvider; // 注入实例ID提供者
public void publish(TioServerMessageVo message) {
message.setSenderInstanceId(instanceIdProvider.getInstanceId());
RTopic topic = redissonClient.getTopic(SERVER_CLUSTER_TOPIC_CHANNEL);
topic.publish(message);
log.error("[Publisher] 已发布消息: " + message);
}
}

View File

@@ -0,0 +1,91 @@
package com.zt.plat.module.qms.iot.tcpserver.cluster;
import com.alibaba.fastjson.JSONObject;
import com.zt.plat.module.qms.iot.service.IotConnectManagerStatsService;
import com.zt.plat.module.qms.iot.tcpserver.IotPacket;
import com.zt.plat.module.qms.iot.tcpserver.handler.IotDataHander;
import jakarta.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RTopic;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import tech.zzjc.tio.core.ChannelContext;
import tech.zzjc.tio.core.Tio;
import tech.zzjc.tio.core.TioConfig;
import tech.zzjc.tio.server.ServerTioConfig;
import tech.zzjc.tio.server.TioServer;
import tech.zzjc.tio.starter.TioServerBootstrap;
import tech.zzjc.tio.utils.lock.SetWithLock;
import static com.zt.plat.module.qms.iot.tcpserver.cluster.TioServerMessageConfig.SERVER_CLUSTER_TOPIC_CHANNEL;
@Component
@Slf4j
public class TioServerMessageSubscriber {
@Autowired
private RedissonClient redissonClient;
@Autowired
private InstanceIdProvider instanceIdProvider;
@Autowired
private IotDataHander iotCaaClientControlDeviceHander;
@Autowired private TioServerBootstrap tioServerBootstrap;
@Autowired private IotConnectManagerStatsService iotConnectManagerStatsService;
@PostConstruct
public void subscribe() {
RTopic topic = redissonClient.getTopic(SERVER_CLUSTER_TOPIC_CHANNEL);
topic.addListener(TioServerMessageVo.class, (channel, msg) -> {
String senderId = msg.getSenderInstanceId();
String channelId = msg.getChannelId();
String msgType = msg.getServerClusterMsgType();
String currentId = instanceIdProvider.getInstanceId();
if (currentId.equals(senderId)) {
log.debug("[Subscriber] 跳过自己发送的消息");
return;
}
log.debug("[Subscriber] 收到消息频道 '" + channel + "' 的内容: " + msg);
if(TioServerMessageConfig.MESSAGE_TYPE_CLEAR_MANUAL.equals(msgType)){
try {
iotConnectManagerStatsService.clearDeviceControl(channelId,"1");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return;
}
if(TioServerMessageConfig.MESSAGE_TYPE_CLOSE_MANUAL.equals(msgType)){
try {
iotConnectManagerStatsService.close(channelId,"1");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return;
}
ServerTioConfig serverTioConfig = tioServerBootstrap.getServerTioConfig();
SetWithLock<ChannelContext> all = Tio.getAll(serverTioConfig);
ChannelContext targetContext = null;
for(ChannelContext context : all.getObj()){
targetContext = context;
String id = context.getId();
if(id.equals(channelId)){
break;
}
}
IotPacket packet = (IotPacket) msg.getPacket();
byte[] body = packet.getBody();
String bodyStr = new String(body);
JSONObject json = JSONObject.parseObject(bodyStr);
try {
iotCaaClientControlDeviceHander.hander(json.getString("msgId"), bodyStr, targetContext);
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}
}

View File

@@ -0,0 +1,18 @@
package com.zt.plat.module.qms.iot.tcpserver.cluster;
import lombok.Data;
import tech.zzjc.tio.cluster.TioClusterVo;
import tech.zzjc.tio.core.ChannelContext;
@Data
public class TioServerMessageVo extends TioClusterVo {
private String senderInstanceId; //发布者id
private String serverClusterMsgType; //消息类型: 设备
private ChannelContext channelContext;
}

View File

@@ -31,7 +31,7 @@ public class IotConsoleClientCheckControlHander implements IotDataHander {
String realClientPort = channelContext.get("realClientPort") == null ? channelContext.getClientNode().getPort() + "" : channelContext.get("realClientPort").toString(); String realClientPort = channelContext.get("realClientPort") == null ? channelContext.getClientNode().getPort() + "" : channelContext.get("realClientPort").toString();
String realClient = realClientIp + ":" + realClientPort; String realClient = realClientIp + ":" + realClientPort;
log.info("{},检查控制点由谁控制", realClient); log.info("{},检查控制点由谁控制", realClient);
String userId = channelContext.getToken(); String token = channelContext.getToken();
IotPacket resppacket = null; IotPacket resppacket = null;
// List<UserMeasurePoint> userMeasurePointList = userMeasurePointService.list(Wrappers.<UserMeasurePoint>query().lambda().eq(UserMeasurePoint::getUserId, Func.toLong(userId))); // List<UserMeasurePoint> userMeasurePointList = userMeasurePointService.list(Wrappers.<UserMeasurePoint>query().lambda().eq(UserMeasurePoint::getUserId, Func.toLong(userId)));
// for (UserMeasurePoint userMeasurePoint : userMeasurePointList) { // for (UserMeasurePoint userMeasurePoint : userMeasurePointList) {

View File

@@ -1,5 +1,6 @@
package com.zt.plat.module.qms.iot.tcpserver.core; package com.zt.plat.module.qms.iot.tcpserver.core;
import lombok.Data;
import tech.zzjc.tio.core.ChannelContext; import tech.zzjc.tio.core.ChannelContext;
import java.io.Serializable; import java.io.Serializable;
@@ -15,6 +16,7 @@ import java.io.Serializable;
* @version V1.0 * @version V1.0
* @since 2021-6-28 * @since 2021-6-28
*/ */
@Data
public class IotClientSessionContext implements Serializable { public class IotClientSessionContext implements Serializable {
private static final long serialVersionUID = 8738456547503650108L; private static final long serialVersionUID = 8738456547503650108L;
@@ -31,29 +33,5 @@ public class IotClientSessionContext implements Serializable {
/** 当前控制计量点的ChannelContext **/ /** 当前控制计量点的ChannelContext **/
private ChannelContext controlConsoleClientChannelContext; private ChannelContext controlConsoleClientChannelContext;
public String getClientType() {
return clientType;
}
public void setClientType(String clientType) {
this.clientType = clientType;
}
public MeasurePoint getMeasurePoint() {
return measurePoint;
}
public void setMeasurePoint(MeasurePoint measurePoint) {
this.measurePoint = measurePoint;
}
public ChannelContext getControlConsoleClientChannelContext() {
return controlConsoleClientChannelContext;
}
public void setControlConsoleClientChannelContext(ChannelContext controlConsoleClientChannelContext) {
this.controlConsoleClientChannelContext = controlConsoleClientChannelContext;
}
} }

View File

@@ -0,0 +1,120 @@
package com.zt.plat.module.qms.iot.tcpserver.core;
import lombok.Data;
import java.util.Date;
import java.util.concurrent.atomic.AtomicLong;
@Data
public class IotClientSessionForRedisContext extends IotClientSessionContext{
private static final long serialVersionUID = 1L;
private String channelId;
private Date regTime; //注册时间
private Date expireTime; //过期时间
private Date lastCommunicateTime; //最近一次通信时间
private String tenantId; //租户ID
private String controlRealName;
/**
* 客户端IP
*/
private String clientIp;
/**
* 端口
*/
private int clientPort;
/**
* 协议
*/
private String clientProtocol;
/**
* 代理ip
*/
private String proxyIp;
/**
* 代理端口
*/
private int proxyPort;
/**
* 代理协议
*/
private String proxyProtocol;
/**
* 连接类型
*/
private String connectType;
/**
* 解码失败数
*/
private int decodeFailCount;
/**
* 用户ID
*/
private String userId;
/**
* 用户token
*/
private String token;
/**
* 统计开始时间
*/
private Date gmtCreate;
/**
* 已接收字节数
*/
private AtomicLong receivedBytes;
/**
* 已接收业务包
*/
private AtomicLong receivedPackets;
/**
* 已接收TCP包
*/
private AtomicLong receivedTcps;
/**
* 平均每个TCP包接收的字节数
*/
private double bytesPerTcpReceive;
/**
* 平均每个TCP包接收的业务包
*/
private double packetsPerTcpReceive;
/**
* 已处理的字节数
*/
private AtomicLong handledBytes;
/**
* 已处理的业务包
*/
private AtomicLong handledPackets;
/**
* 已发送的字节数
*/
private AtomicLong sentBytes;
/**
* 已发送的业务包
*/
private AtomicLong sentPackets;
}

View File

@@ -1,5 +1,6 @@
package com.zt.plat.module.qms.iot.tcpserver.core; package com.zt.plat.module.qms.iot.tcpserver.core;
import lombok.Data;
import tech.zzjc.tio.core.ChannelContext; import tech.zzjc.tio.core.ChannelContext;
import java.io.Serializable; import java.io.Serializable;
@@ -16,6 +17,7 @@ import java.util.List;
* @version V1.0 * @version V1.0
* @since 2021-6-28 * @since 2021-6-28
*/ */
@Data
public class IotDeviceSessionContext implements Serializable { public class IotDeviceSessionContext implements Serializable {
private static final long serialVersionUID = 9035659677475646771L; private static final long serialVersionUID = 9035659677475646771L;
@@ -44,61 +46,6 @@ public class IotDeviceSessionContext implements Serializable {
/** 控制的ChannelContext **/ /** 控制的ChannelContext **/
private ChannelContext controlChannelContext; private ChannelContext controlChannelContext;
public String getDeviceId() {
return deviceId;
}
public void setDeviceId(String deviceId) {
this.deviceId = deviceId;
}
public String getDeviceCode() {
return deviceCode;
}
public void setDeviceCode(String deviceCode) {
this.deviceCode = deviceCode;
}
public String getDeviceType() {
return deviceType;
}
public void setDeviceType(String deviceType) {
this.deviceType = deviceType;
}
public String getDeviceName() {
return deviceName;
}
public void setDeviceName(String deviceName) {
this.deviceName = deviceName;
}
public List<Double> getWeightValueList() {
return weightValueList;
}
public void setWeightValueList(List<Double> weightValueList) {
this.weightValueList = weightValueList;
}
public String getControlRealName() {
return controlRealName;
}
public void setControlRealName(String controlRealName) {
this.controlRealName = controlRealName;
}
public ChannelContext getControlChannelContext() {
return controlChannelContext;
}
public void setControlChannelContext(ChannelContext controlChannelContext) {
this.controlChannelContext = controlChannelContext;
}
} }

View File

@@ -1,55 +1,123 @@
package com.zt.plat.module.qms.iot.tcpserver.core; package com.zt.plat.module.qms.iot.tcpserver.core;
import java.util.Date; import lombok.Data;
import tech.zzjc.tio.core.ChannelContext;
import java.util.Date;
import java.util.concurrent.atomic.AtomicLong;
@Data
public class IotDeviceSessionForRedisContext extends IotDeviceSessionContext{ public class IotDeviceSessionForRedisContext extends IotDeviceSessionContext{
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
private String channelId;
private Date regTime; //注册时间 private Date regTime; //注册时间
private Date expireTime; //过期时间 private Date expireTime; //过期时间
private String balanceWeightUnit; //天平重量单位 private String balanceWeightUnit; //天平重量单位
private Long lastBalanceDataTime; //天平最后数据时间 private Long lastBalanceDataTime; //天平最后数据时间
private String tenantId; //租户ID private String tenantId; //租户ID
public Date getRegTime() { /**
return regTime; * 客户端IP
} */
private String clientIp;
public void setRegTime(Date regTime) { /**
this.regTime = regTime; * 端口
} */
private int clientPort;
public Date getExpireTime() { /**
return expireTime; * 协议
} */
private String clientProtocol;
public void setExpireTime(Date expireTime) { /**
this.expireTime = expireTime; * 代理ip
} */
private String proxyIp;
public String getBalanceWeightUnit() { /**
return balanceWeightUnit; * 代理端口
} */
private int proxyPort;
public void setBalanceWeightUnit(String balanceWeightUnit) { /**
this.balanceWeightUnit = balanceWeightUnit; * 代理协议
} */
private String proxyProtocol;
public Long getLastBalanceDataTime() { /**
return lastBalanceDataTime; * 连接类型
} */
private String connectType;
public void setLastBalanceDataTime(Long lastBalanceDataTime) { /**
this.lastBalanceDataTime = lastBalanceDataTime; * 解码失败数
} */
private int decodeFailCount;
public String getTenantId() { /**
return tenantId; * 用户ID
} */
private String userId;
/**
* 用户token
*/
private String token;
/**
* 统计开始时间
*/
private Date gmtCreate;
/**
* 已接收字节数
*/
private AtomicLong receivedBytes;
/**
* 已接收业务包
*/
private AtomicLong receivedPackets;
/**
* 已接收TCP包
*/
private AtomicLong receivedTcps;
/**
* 平均每个TCP包接收的字节数
*/
private double bytesPerTcpReceive;
/**
* 平均每个TCP包接收的业务包
*/
private double packetsPerTcpReceive;
/**
* 已处理的字节数
*/
private AtomicLong handledBytes;
/**
* 已处理的业务包
*/
private AtomicLong handledPackets;
/**
* 已发送的字节数
*/
private AtomicLong sentBytes;
/**
* 已发送的业务包
*/
private AtomicLong sentPackets;
public void setTenantId(String tenantId) {
this.tenantId = tenantId;
}
} }

View File

@@ -3,19 +3,27 @@ package com.zt.plat.module.qms.iot.tcpserver.device;
import com.zt.plat.framework.tenant.core.context.TenantContextHolder; import com.zt.plat.framework.tenant.core.context.TenantContextHolder;
import com.zt.plat.module.qms.iot.tcpserver.IotUtils; import com.zt.plat.module.qms.iot.tcpserver.IotUtils;
import com.zt.plat.module.qms.iot.tcpserver.core.ChannelContextConstant; import com.zt.plat.module.qms.iot.tcpserver.core.ChannelContextConstant;
import com.zt.plat.module.qms.iot.tcpserver.core.IotClientSessionForRedisContext;
import com.zt.plat.module.qms.iot.tcpserver.core.IotDeviceSessionForRedisContext; import com.zt.plat.module.qms.iot.tcpserver.core.IotDeviceSessionForRedisContext;
import com.zt.plat.module.qms.iot.tcpserver.publisher.IotDeviceRegisterPublisher; import com.zt.plat.module.qms.iot.tcpserver.publisher.IotDeviceRegisterPublisher;
import com.zt.plat.module.qms.resource.device.dal.dataobject.DeviceInfomationDO; import com.zt.plat.module.qms.resource.device.dal.dataobject.DeviceInfomationDO;
import com.zt.plat.module.qms.resource.device.service.DeviceInfomationService; import com.zt.plat.module.qms.resource.device.service.DeviceInfomationService;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.Cursor;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ScanOptions;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import tech.zzjc.tio.core.ChannelContext; import tech.zzjc.tio.core.ChannelContext;
import tech.zzjc.tio.core.Tio; import tech.zzjc.tio.core.Tio;
import tech.zzjc.tio.core.stat.ChannelStat;
import tech.zzjc.tio.utils.SystemTimer;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.util.Date; import java.util.Date;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@Service @Service
@@ -51,16 +59,25 @@ public class RedisSessionComponent {
} }
public void deleteByDeviceId(String deviceId){ public void deleteByDeviceId(String deviceId){
redisTemplate.delete(getRedisKeyByDeviceCode(deviceId)); redisTemplate.delete(getRedisKeyByDeviceId(deviceId));
} }
public IotDeviceSessionForRedisContext getByDeviceCode(String deviceCode){ public IotDeviceSessionForRedisContext getByDeviceCode(String deviceCode){
String deviceId = IotUtils.transDeviceCodeToId(deviceCode); String redisKey = getRedisKeyByDeviceCode(deviceCode);
return getByDeviceId(deviceId); return getByKey(redisKey);
} }
public IotDeviceSessionForRedisContext getByDeviceId(String deviceId){ public IotDeviceSessionForRedisContext getByDeviceId(String deviceId){
return (IotDeviceSessionForRedisContext) redisTemplate.opsForValue().get(getRedisKeyByDeviceId(deviceId)); String redisKey = getRedisKeyByDeviceId(deviceId);
return getByKey(redisKey);
// return (IotDeviceSessionForRedisContext) redisTemplate.opsForValue().get(getRedisKeyByDeviceId(deviceId));
}
public IotDeviceSessionForRedisContext getByKey(String key){
return (IotDeviceSessionForRedisContext) redisTemplate.opsForValue().get(key);
}
public IotClientSessionForRedisContext getClientByKey(String key){
return (IotClientSessionForRedisContext) redisTemplate.opsForValue().get(key);
} }
public String getRedisKeyByDeviceCode(String deviceCode){ public String getRedisKeyByDeviceCode(String deviceCode){
@@ -73,8 +90,8 @@ public class RedisSessionComponent {
} }
public void regDevice(ChannelContext channelContext, String deviceCode, String deviceType){ public void regDevice(ChannelContext channelContext, String deviceCode, String deviceType){
IotDeviceSessionForRedisContext deviceContext = getByDeviceCode(deviceCode); IotDeviceSessionForRedisContext clientContext = getByDeviceCode(deviceCode);
if(deviceContext != null && Tio.getByBsId(channelContext.getTioConfig(), deviceCode) != null){ if(clientContext != null && Tio.getByBsId(channelContext.getTioConfig(), deviceCode) != null){
return; return;
} }
@@ -83,6 +100,7 @@ public class RedisSessionComponent {
String deviceId = IotUtils.transDeviceCodeToId(deviceCode); String deviceId = IotUtils.transDeviceCodeToId(deviceCode);
DeviceInfomationDO deviceInfo = deviceInfomationService.getDeviceInfomation(Long.valueOf(deviceId)); DeviceInfomationDO deviceInfo = deviceInfomationService.getDeviceInfomation(Long.valueOf(deviceId));
IotDeviceSessionForRedisContext context = new IotDeviceSessionForRedisContext(); IotDeviceSessionForRedisContext context = new IotDeviceSessionForRedisContext();
context.setChannelId(channelContext.getId());
context.setDeviceId(deviceId); context.setDeviceId(deviceId);
context.setDeviceType(deviceType); context.setDeviceType(deviceType);
context.setDeviceCode(deviceCode); context.setDeviceCode(deviceCode);
@@ -111,5 +129,109 @@ public class RedisSessionComponent {
} }
public void regClient(ChannelContext channelContext){
String channelId = channelContext.getId();
IotClientSessionForRedisContext context = getByClientChannelId(channelId);
if(context != null){
return;
}
TenantContextHolder.setIgnore(true);
try{
context = new IotClientSessionForRedisContext();
String realClientIp = channelContext.get("realClientIp") == null ? channelContext.getClientNode().getIp() : channelContext.get("realClientIp").toString();
String realClientPort = channelContext.get("realClientPort") == null ? channelContext.getClientNode().getPort() + "" : channelContext.get("realClientPort").toString();
String connectType = channelContext.get(ChannelContextConstant.CONNECT_TYPE) == null ? "" : channelContext.get(ChannelContextConstant.CONNECT_TYPE).toString();
String userName = channelContext.get("user-real-name") == null ? "" : channelContext.get("user-real-name").toString();
// ChannelContext clientChanelContext = Tio.getByBsId(channelContext.getTioConfig(), deviceCode);
// if(clientChanelContext != null){
// context.setClientProtocol(clientChanelContext.getClientNode().getProtocol());
// context.setProxyIp(clientChanelContext.getClientNode().getIp());
// context.setProxyPort(clientChanelContext.getClientNode().getPort());
// context.setProxyProtocol(clientChanelContext.getClientNode().getProtocol());
// }
context.setChannelId(channelId);
context.setControlRealName(userName);
context.setClientIp(realClientIp);
context.setClientPort(Integer.parseInt(realClientPort));
context.setConnectType(connectType);
updateClientCommonAttrs(context, channelContext);
String redisKey = getClientRedisKeyByChannelId(channelId);
redisTemplate.opsForValue().set(redisKey, context, timeoutSeconds, TimeUnit.SECONDS);
//todo 客户端注册事件
// IotDeviceRegisterPublisher.publishEvent(deviceCode, deviceType);
}catch (Exception e){
e.printStackTrace();
log.error("设备注册异常:{}", e.getMessage());
}finally {
TenantContextHolder.clear();
}
}
public void updateClient(IotClientSessionForRedisContext context, ChannelContext channelContext){
String channelId = channelContext.getId();
updateClientCommonAttrs(context, channelContext);
String redisKey = getClientRedisKeyByChannelId(channelId);
redisTemplate.opsForValue().set(redisKey, context, timeoutSeconds, TimeUnit.SECONDS);
}
private void updateClientCommonAttrs(IotClientSessionForRedisContext context, ChannelContext channelContext){
ChannelStat stat = channelContext.stat;
context.setGmtCreate(new Date(stat.getTimeCreated()));
context.setDecodeFailCount(stat.getDecodeFailCount());
context.setReceivedBytes(stat.getReceivedBytes());
context.setReceivedPackets(stat.getReceivedPackets());
context.setReceivedTcps(stat.getReceivedTcps());
context.setBytesPerTcpReceive(stat.getBytesPerTcpReceive());
context.setPacketsPerTcpReceive(stat.getPacketsPerTcpReceive());
context.setHandledBytes(stat.getHandledBytes());
context.setHandledPackets(stat.getHandledPackets());
context.setSentBytes(stat.getSentBytes());
context.setSentPackets(stat.getSentPackets());
context.setLastCommunicateTime(new Date());
}
public String getClientRedisKeyByChannelId(String channelId){
String redisKey = IotClientSessionForRedisContext.DEFAULT_CLIENT_SESSION_CONTEXT_KEY + "-" + channelId;
return redisKey;
}
public IotClientSessionForRedisContext getByClientChannelId(String channelId){
String redisKey = getClientRedisKeyByChannelId(channelId);
return getClientByKey(redisKey);
}
/**
* 使用Scan方式查询key推荐用于大数据量
* 避免使用KEYS命令导致Redis阻塞
*/
public Set<String> scanKeys(String matchPattern) {
Set<String> keys = new HashSet<>();
try {
redisTemplate.execute((RedisCallback<Void>) connection -> {
try (Cursor<byte[]> cursor = connection.scan(
ScanOptions.scanOptions()
.match(matchPattern)
.count(1000) // 每次扫描的数量
.build())) {
while (cursor.hasNext()) {
keys.add(new String(cursor.next()));
}
}
return null;
});
} catch (Exception e) {
e.printStackTrace();
}
return keys;
}
} }

View File

@@ -1,14 +1,11 @@
package com.zt.plat.module.qms.iot.tcpserver.device.handler; package com.zt.plat.module.qms.iot.tcpserver.device.handler;
import com.zt.plat.module.qms.iot.tcpserver.core.IotDeviceSessionForRedisContext; import com.zt.plat.module.qms.iot.tcpserver.core.*;
import com.zt.plat.module.qms.iot.tcpserver.device.RedisSessionComponent; import com.zt.plat.module.qms.iot.tcpserver.device.RedisSessionComponent;
import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.JSONObject;
import com.zt.plat.module.qms.iot.tcpserver.IotDeviceType; import com.zt.plat.module.qms.iot.tcpserver.IotDeviceType;
import com.zt.plat.module.qms.iot.tcpserver.IotPacket; import com.zt.plat.module.qms.iot.tcpserver.IotPacket;
import com.zt.plat.module.qms.iot.tcpserver.IotUtils; import com.zt.plat.module.qms.iot.tcpserver.IotUtils;
import com.zt.plat.module.qms.iot.tcpserver.core.ClientType;
import com.zt.plat.module.qms.iot.tcpserver.core.Command;
import com.zt.plat.module.qms.iot.tcpserver.core.IotDeviceSessionContext;
import com.zt.plat.module.qms.iot.tcpserver.handler.IotDataHander; import com.zt.plat.module.qms.iot.tcpserver.handler.IotDataHander;
import com.zt.plat.module.qms.iot.tcpserver.publisher.BalanceDataPublisher; import com.zt.plat.module.qms.iot.tcpserver.publisher.BalanceDataPublisher;
import com.fhs.common.spring.SpringContextUtil; import com.fhs.common.spring.SpringContextUtil;
@@ -21,12 +18,14 @@ import tech.zzjc.tio.cluster.TioClusterVo;
import tech.zzjc.tio.core.ChannelContext; import tech.zzjc.tio.core.ChannelContext;
import tech.zzjc.tio.core.Tio; import tech.zzjc.tio.core.Tio;
import tech.zzjc.tio.core.TioConfig; import tech.zzjc.tio.core.TioConfig;
import tech.zzjc.tio.core.stat.ChannelStat;
import tech.zzjc.tio.utils.SystemTimer; import tech.zzjc.tio.utils.SystemTimer;
import java.nio.charset.Charset; import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.Date;
import java.util.List; import java.util.List;
import static com.zt.plat.module.qms.iot.tcpserver.core.GroupConstant.CAA_ALL_CLIENT; import static com.zt.plat.module.qms.iot.tcpserver.core.GroupConstant.CAA_ALL_CLIENT;
@@ -52,8 +51,10 @@ public class IotDeviceBalanceHandler implements IotDataHander {
@Override @Override
public Object hander(String msgId, String data, ChannelContext channelContext) throws Exception { public Object hander(String msgId, String data, ChannelContext channelContext) throws Exception {
//获取客户端连接信息 //获取客户端连接信息
ChannelStat stat = channelContext.stat;
String realClientIp = channelContext.get("realClientIp") == null ? channelContext.getClientNode().getIp() : channelContext.get("realClientIp").toString(); String realClientIp = channelContext.get("realClientIp") == null ? channelContext.getClientNode().getIp() : channelContext.get("realClientIp").toString();
String realClientPort = channelContext.get("realClientPort") == null ? channelContext.getClientNode().getPort() + "" : channelContext.get("realClientPort").toString(); String realClientPort = channelContext.get("realClientPort") == null ? channelContext.getClientNode().getPort() + "" : channelContext.get("realClientPort").toString();
String connectType = channelContext.get(ChannelContextConstant.CONNECT_TYPE) == null ? "" : channelContext.get(ChannelContextConstant.CONNECT_TYPE).toString();
String realClient = realClientIp + ":" + realClientPort; String realClient = realClientIp + ":" + realClientPort;
//通过sessionContext传入设备ID //通过sessionContext传入设备ID
IotDeviceSessionContext sessionContext = (IotDeviceSessionContext) channelContext.get(IotDeviceSessionContext.DEFAULT_DEVICE_SESSION_CONTEXT_KEY); IotDeviceSessionContext sessionContext = (IotDeviceSessionContext) channelContext.get(IotDeviceSessionContext.DEFAULT_DEVICE_SESSION_CONTEXT_KEY);
@@ -111,7 +112,30 @@ public class IotDeviceBalanceHandler implements IotDataHander {
weightUnit = deviceContext.getBalanceWeightUnit() == null ? "" :deviceContext.getBalanceWeightUnit(); weightUnit = deviceContext.getBalanceWeightUnit() == null ? "" :deviceContext.getBalanceWeightUnit();
} }
//设置末次天平正确数据时间 //更新deviceContext信息
ChannelContext deviceChannelContext = Tio.getByBsId(channelContext.getTioConfig(), deviceCode);
if(deviceChannelContext != null){
deviceContext.setClientProtocol(deviceChannelContext.getClientNode().getProtocol());
deviceContext.setProxyIp(deviceChannelContext.getClientNode().getIp());
deviceContext.setProxyPort(deviceChannelContext.getClientNode().getPort());
deviceContext.setProxyProtocol(deviceChannelContext.getClientNode().getProtocol());
}
deviceContext.setClientIp(realClientIp);
deviceContext.setClientPort(Integer.parseInt(realClientPort));
deviceContext.setConnectType(connectType);
deviceContext.setDecodeFailCount(stat.getDecodeFailCount());
deviceContext.setGmtCreate(new Date(stat.getTimeCreated()));
deviceContext.setReceivedBytes(stat.getReceivedBytes());
deviceContext.setReceivedPackets(stat.getReceivedPackets());
deviceContext.setReceivedTcps(stat.getReceivedTcps());
deviceContext.setBytesPerTcpReceive(stat.getBytesPerTcpReceive());
deviceContext.setPacketsPerTcpReceive(stat.getPacketsPerTcpReceive());
deviceContext.setHandledBytes(stat.getHandledBytes());
deviceContext.setHandledPackets(stat.getHandledPackets());
deviceContext.setSentBytes(stat.getSentBytes());
deviceContext.setSentPackets(stat.getSentPackets());
deviceContext.setLastBalanceDataTime(SystemTimer.currTime); deviceContext.setLastBalanceDataTime(SystemTimer.currTime);
redisSessionComponent.update(deviceContext); redisSessionComponent.update(deviceContext);
//租户ID //租户ID
@@ -140,7 +164,8 @@ public class IotDeviceBalanceHandler implements IotDataHander {
TioClusterConfig tioClusterConfig = tioConfig.getTioClusterConfig(); TioClusterConfig tioClusterConfig = tioConfig.getTioClusterConfig();
TioClusterVo iotClusterVo = new TioClusterVo(packet); TioClusterVo iotClusterVo = new TioClusterVo(packet);
iotClusterVo.setGroup(CAA_ALL_CLIENT); iotClusterVo.setGroup(CAA_ALL_CLIENT);
tioClusterConfig.publish(iotClusterVo); // if(!"0.00".equals(weightData))
tioClusterConfig.publish(iotClusterVo);
return null; return null;
} }

View File

@@ -10,6 +10,8 @@ import com.zt.plat.module.qms.iot.tcpserver.consoleclient.IotConsoleClientCheckC
import com.zt.plat.module.qms.iot.tcpserver.consoleclient.IotConsoleClientRegisterHander; import com.zt.plat.module.qms.iot.tcpserver.consoleclient.IotConsoleClientRegisterHander;
import com.zt.plat.module.qms.iot.tcpserver.core.ChannelContextConstant; import com.zt.plat.module.qms.iot.tcpserver.core.ChannelContextConstant;
import com.zt.plat.module.qms.iot.tcpserver.core.Command; import com.zt.plat.module.qms.iot.tcpserver.core.Command;
import com.zt.plat.module.qms.iot.tcpserver.core.IotClientSessionForRedisContext;
import com.zt.plat.module.qms.iot.tcpserver.device.RedisSessionComponent;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
@@ -42,6 +44,7 @@ public class IotClientHandler implements IotHander {
@Autowired @Autowired
private IotCaaClientControlDeviceHander iotCaaClientControlDeviceHander; private IotCaaClientControlDeviceHander iotCaaClientControlDeviceHander;
@Autowired public RedisSessionComponent redisSessionComponent;
public IotClientHandler() { public IotClientHandler() {
} }
@@ -59,6 +62,8 @@ public class IotClientHandler implements IotHander {
String cmd = jsonObject.getString("cmd"); String cmd = jsonObject.getString("cmd");
Command command = Command.getByName(cmd); Command command = Command.getByName(cmd);
IotDataHander iotDataHander = null; IotDataHander iotDataHander = null;
String channelId = channelContext.getId();
IotClientSessionForRedisContext iotClientSessionForRedisContext = redisSessionComponent.getByClientChannelId(channelId);
if ("consoleClient".equals(clientType)) { if ("consoleClient".equals(clientType)) {
channelContext.set(ChannelContextConstant.CONNECT_TYPE, ChannelContextConstant.CONNECT_CONSOLE_CLIENT); channelContext.set(ChannelContextConstant.CONNECT_TYPE, ChannelContextConstant.CONNECT_CONSOLE_CLIENT);
switch (command) { switch (command) {
@@ -89,6 +94,13 @@ public class IotClientHandler implements IotHander {
log.error("{}, 找不到处理类, 客户端消息ID{},客户端类型:{},命令:{}", realClient, msgId, clientType, cmd); log.error("{}, 找不到处理类, 客户端消息ID{},客户端类型:{},命令:{}", realClient, msgId, clientType, cmd);
return null; return null;
} }
if(iotClientSessionForRedisContext == null){
redisSessionComponent.regClient(channelContext);
}else{
redisSessionComponent.updateClient(iotClientSessionForRedisContext, channelContext);
}
iotDataHander.hander(msgId, jsonObject.getString("data"), channelContext); iotDataHander.hander(msgId, jsonObject.getString("data"), channelContext);
return null; return null;
} }

View File

@@ -53,7 +53,8 @@ public class IotDeviceHandler implements IotHander {
String realClientPort = channelContext.get("realClientPort") == null ? channelContext.getClientNode().getPort() + "" : channelContext.get("realClientPort").toString(); String realClientPort = channelContext.get("realClientPort") == null ? channelContext.getClientNode().getPort() + "" : channelContext.get("realClientPort").toString();
String realClient = realClientIp + ":" + realClientPort + ", bsId=" + channelContext.getBsId(); String realClient = realClientIp + ":" + realClientPort + ", bsId=" + channelContext.getBsId();
String text = new String(iotPacket.getBody(), IotPacket.CHARSET); String text = new String(iotPacket.getBody(), IotPacket.CHARSET);
// log.info("{}, 接收到数据:{}", realClient, text); // if(!text.contains("0.00"))
// log.info("{}, 接收到数据:{}", realClient, text);
String[] infos = text.split("\\$"); String[] infos = text.split("\\$");
String reginfo = infos[0]; String reginfo = infos[0];
String[] regs = reginfo.split("-"); String[] regs = reginfo.split("-");
@@ -102,7 +103,6 @@ public class IotDeviceHandler implements IotHander {
sessionContext.setDeviceId(IotUtils.transDeviceCodeToId(deviceCode)); sessionContext.setDeviceId(IotUtils.transDeviceCodeToId(deviceCode));
return null; return null;
} }
if(deviceContext == null || Tio.getByBsId(channelContext.getTioConfig(), deviceCode) == null){ if(deviceContext == null || Tio.getByBsId(channelContext.getTioConfig(), deviceCode) == null){
redisSessionComponent.regDevice(channelContext, deviceCode, deviceType); redisSessionComponent.regDevice(channelContext, deviceCode, deviceType);
}else{ }else{