fix(databus): 修复客户端消息处理和防止消息循环

1. 修复消息格式不匹配问题
   - 增量消息:兼容 SyncMessage 格式,从 dataSnapshot 字段反序列化数据
   - 批量消息:添加 getDataType() 方法获取泛型类型,正确转换 JSONObject

2. 防止消息循环
   - 添加 zt.databus.change.producer.enabled 配置项
   - 客户端禁用变更消息发送,避免 客户端写入 → 发送变更 → 循环

3. 修复 Feign 客户端注入
   - 在 RpcConfiguration 中添加 DeptApi、PostApi
   - 确保客户端能通过 Feign 调用本地 system-server API

相关文件:
- DatabusClientConsumer.java: 修复消息解析逻辑
- BatchSyncEventHandler.java: 添加 getDataType() 方法
- DatabusChangeProducer.java: 添加 enabled 开关
- RpcConfiguration.java: 启用 DeptApi/PostApi Feign 客户端

Ref: 修复 ClassCastException 和消息循环问题
This commit is contained in:
hewencai
2025-12-03 11:10:57 +08:00
parent adf3ec601a
commit 6ac4a356cd
37 changed files with 659 additions and 41 deletions

View File

@@ -27,6 +27,13 @@
<version>${revision}</version>
</dependency>
<!-- System API (for consuming change messages) -->
<dependency>
<groupId>com.zt.plat</groupId>
<artifactId>zt-module-system-api</artifactId>
<version>${revision}</version>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>

View File

@@ -0,0 +1,90 @@
package com.zt.plat.framework.databus.server.consumer;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.zt.plat.framework.databus.server.core.event.DatabusEvent;
import com.zt.plat.framework.databus.server.core.sync.DatabusIncrementalSyncService;
import com.zt.plat.module.system.api.mq.DatabusDeptChangeMessage;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
/**
* Databus 部门变更消息消费者
* <p>
* 消费来自 system-server 的部门变更消息,通过增量同步服务进行:
* 1. 三态判断(事件/客户端/订阅是否启用)
* 2. 记录到 event_record 流水表
* 3. 推送到客户端专属 Topicdatabus-sync-{clientCode}
*
* @author ZT
*/
@Slf4j
@Component
@RocketMQMessageListener(
topic = DatabusDeptChangeMessage.TOPIC,
consumerGroup = DatabusDeptChangeMessage.TOPIC + "_CONSUMER"
)
public class DatabusDeptChangeConsumer implements RocketMQListener<DatabusDeptChangeMessage> {
private static final String SOURCE_SERVICE = "system-server";
@Resource
private DatabusIncrementalSyncService databusIncrementalSyncService;
@Resource
private ObjectMapper objectMapper;
@Override
public void onMessage(DatabusDeptChangeMessage message) {
log.info("[Databus] 收到部门变更消息, action={}, deptId={}", message.getAction(), message.getDeptId());
try {
// 构建完整的业务数据快照
Map<String, Object> dataMap = new HashMap<>();
dataMap.put("id", message.getDeptId());
dataMap.put("code", message.getDeptCode());
dataMap.put("name", message.getDeptName());
dataMap.put("shortName", message.getShortName());
dataMap.put("parentId", message.getParentId());
dataMap.put("sort", message.getSort());
dataMap.put("leaderUserId", message.getLeaderUserId());
dataMap.put("phone", message.getPhone());
dataMap.put("email", message.getStatus());
dataMap.put("isCompany", message.getIsCompany());
dataMap.put("isGroup", message.getIsGroup());
dataMap.put("deptSource", message.getDeptSource());
dataMap.put("tenantId", message.getTenantId());
dataMap.put("eventTime", message.getEventTime());
// 构建完整的事件类型: SYSTEM_DEPT_{ACTION}(大写下划线格式,与数据库存储一致)
String eventType = String.format("SYSTEM_DEPT_%s", message.getAction().toUpperCase());
// 构建 Databus 事件
DatabusEvent databusEvent = DatabusEvent.builder()
.eventType(eventType)
.eventAction(message.getAction())
.dataSnapshot(objectMapper.writeValueAsString(dataMap))
.dataVersion(1)
.sourceService(SOURCE_SERVICE)
.sourceTopic(DatabusDeptChangeMessage.TOPIC)
.tenantId(message.getTenantId())
.eventTime(message.getEventTime())
.build();
// 调用增量同步服务处理(三态判断 + 记录流水 + 推送客户端Topic
databusIncrementalSyncService.processEvent(databusEvent);
log.info("[Databus] 部门变更事件处理完成, eventType={}, deptId={}",
eventType, message.getDeptId());
} catch (Exception e) {
log.error("[Databus] 处理部门变更消息失败, action={}, deptId={}",
message.getAction(), message.getDeptId(), e);
throw new RuntimeException("处理部门变更消息失败", e);
}
}
}

View File

@@ -0,0 +1,74 @@
package com.zt.plat.framework.databus.server.consumer;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.zt.plat.framework.databus.server.core.event.DatabusEvent;
import com.zt.plat.framework.databus.server.core.sync.DatabusIncrementalSyncService;
import com.zt.plat.module.system.api.mq.DatabusPostChangeMessage;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
/**
* Databus 岗位变更消息消费者
*/
@Slf4j
@Component
@RocketMQMessageListener(
topic = DatabusPostChangeMessage.TOPIC,
consumerGroup = DatabusPostChangeMessage.TOPIC + "_CONSUMER"
)
public class DatabusPostChangeConsumer implements RocketMQListener<DatabusPostChangeMessage> {
private static final String SOURCE_SERVICE = "system-server";
@Resource
private DatabusIncrementalSyncService databusIncrementalSyncService;
@Resource
private ObjectMapper objectMapper;
@Override
public void onMessage(DatabusPostChangeMessage message) {
log.info("[Databus] 收到岗位变更消息, action={}, postId={}", message.getAction(), message.getPostId());
try {
Map<String, Object> dataMap = new HashMap<>();
dataMap.put("id", message.getPostId());
dataMap.put("code", message.getPostCode());
dataMap.put("name", message.getPostName());
dataMap.put("sort", message.getSort());
dataMap.put("status", message.getStatus());
dataMap.put("remark", message.getRemark());
dataMap.put("tenantId", message.getTenantId());
dataMap.put("eventTime", message.getEventTime());
// 构建完整的事件类型: SYSTEM_POST_{ACTION}(大写下划线格式,与数据库存储一致)
String eventType = String.format("SYSTEM_POST_%s", message.getAction().toUpperCase());
DatabusEvent databusEvent = DatabusEvent.builder()
.eventType(eventType)
.eventAction(message.getAction())
.dataSnapshot(objectMapper.writeValueAsString(dataMap))
.dataVersion(1)
.sourceService(SOURCE_SERVICE)
.sourceTopic(DatabusPostChangeMessage.TOPIC)
.tenantId(message.getTenantId())
.eventTime(message.getEventTime())
.build();
databusIncrementalSyncService.processEvent(databusEvent);
log.info("[Databus] 岗位变更事件处理完成, eventType={}, postId={}",
eventType, message.getPostId());
} catch (Exception e) {
log.error("[Databus] 处理岗位变更消息失败, action={}, postId={}",
message.getAction(), message.getPostId(), e);
throw new RuntimeException("处理岗位变更消息失败", e);
}
}
}

View File

@@ -0,0 +1,80 @@
package com.zt.plat.framework.databus.server.consumer;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.zt.plat.framework.databus.server.core.event.DatabusEvent;
import com.zt.plat.framework.databus.server.core.sync.DatabusIncrementalSyncService;
import com.zt.plat.module.system.api.mq.DatabusUserChangeMessage;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
/**
* Databus 用户变更消息消费者
*/
@Slf4j
@Component
@RocketMQMessageListener(
topic = DatabusUserChangeMessage.TOPIC,
consumerGroup = DatabusUserChangeMessage.TOPIC + "_CONSUMER"
)
public class DatabusUserChangeConsumer implements RocketMQListener<DatabusUserChangeMessage> {
private static final String SOURCE_SERVICE = "system-server";
@Resource
private DatabusIncrementalSyncService databusIncrementalSyncService;
@Resource
private ObjectMapper objectMapper;
@Override
public void onMessage(DatabusUserChangeMessage message) {
log.info("[Databus] 收到用户变更消息, action={}, userId={}", message.getAction(), message.getUserId());
try {
Map<String, Object> dataMap = new HashMap<>();
dataMap.put("id", message.getUserId());
dataMap.put("username", message.getUsername());
dataMap.put("nickname", message.getNickname());
dataMap.put("remark", message.getRemark());
dataMap.put("deptIds", message.getDeptIds());
dataMap.put("postIds", message.getPostIds());
dataMap.put("email", message.getEmail());
dataMap.put("mobile", message.getMobile());
dataMap.put("sex", message.getSex());
dataMap.put("avatar", message.getAvatar());
dataMap.put("status", message.getStatus());
dataMap.put("userSource", message.getUserSource());
dataMap.put("tenantId", message.getTenantId());
dataMap.put("eventTime", message.getEventTime());
// 构建完整的事件类型: SYSTEM_USER_{ACTION}(大写下划线格式,与数据库存储一致)
String eventType = String.format("SYSTEM_USER_%s", message.getAction().toUpperCase());
DatabusEvent databusEvent = DatabusEvent.builder()
.eventType(eventType)
.eventAction(message.getAction())
.dataSnapshot(objectMapper.writeValueAsString(dataMap))
.dataVersion(1)
.sourceService(SOURCE_SERVICE)
.sourceTopic(DatabusUserChangeMessage.TOPIC)
.tenantId(message.getTenantId())
.eventTime(message.getEventTime())
.build();
databusIncrementalSyncService.processEvent(databusEvent);
log.info("[Databus] 用户变更事件处理完成, eventType={}, userId={}",
eventType, message.getUserId());
} catch (Exception e) {
log.error("[Databus] 处理用户变更消息失败, action={}, userId={}",
message.getAction(), message.getUserId(), e);
throw new RuntimeException("处理用户变更消息失败", e);
}
}
}

View File

@@ -2,6 +2,7 @@ package com.zt.plat.framework.databus.server.controller.admin;
import com.zt.plat.framework.common.pojo.CommonResult;
import com.zt.plat.framework.common.pojo.PageResult;
import com.zt.plat.framework.databus.server.controller.admin.vo.client.DatabusSyncClientBatchStatusReqVO;
import com.zt.plat.framework.databus.server.controller.admin.vo.client.DatabusSyncClientPageReqVO;
import com.zt.plat.framework.databus.server.controller.admin.vo.client.DatabusSyncClientRespVO;
import com.zt.plat.framework.databus.server.controller.admin.vo.client.DatabusSyncClientSaveReqVO;
@@ -89,4 +90,12 @@ public class DatabusSyncClientController {
return success(true);
}
@PutMapping("/batch-status")
@Operation(summary = "批量修改客户端启用状态")
@PreAuthorize("@ss.hasPermission('databus:sync:client:update')")
public CommonResult<Boolean> batchUpdateClientStatus(@Valid @RequestBody DatabusSyncClientBatchStatusReqVO reqVO) {
clientService.batchUpdateClientStatus(reqVO.getIds(), reqVO.getEnabled());
return success(true);
}
}

View File

@@ -2,6 +2,7 @@ package com.zt.plat.framework.databus.server.controller.admin;
import com.zt.plat.framework.common.pojo.CommonResult;
import com.zt.plat.framework.common.pojo.PageResult;
import com.zt.plat.framework.databus.server.controller.admin.vo.event.DatabusSyncEventBatchStatusReqVO;
import com.zt.plat.framework.databus.server.controller.admin.vo.event.DatabusSyncEventPageReqVO;
import com.zt.plat.framework.databus.server.controller.admin.vo.event.DatabusSyncEventRespVO;
import com.zt.plat.framework.databus.server.controller.admin.vo.event.DatabusSyncEventSaveReqVO;
@@ -89,4 +90,12 @@ public class DatabusSyncEventController {
return success(true);
}
@PutMapping("/batch-status")
@Operation(summary = "批量修改事件启用状态")
@PreAuthorize("@ss.hasPermission('databus:sync:event:update')")
public CommonResult<Boolean> batchUpdateEventStatus(@Valid @RequestBody DatabusSyncEventBatchStatusReqVO reqVO) {
eventService.batchUpdateEventStatus(reqVO.getIds(), reqVO.getEnabled());
return success(true);
}
}

View File

@@ -0,0 +1,38 @@
package com.zt.plat.framework.databus.server.controller.admin;
import com.zt.plat.framework.common.pojo.CommonResult;
import com.zt.plat.framework.databus.server.controller.admin.vo.statistics.DatabusSyncStatisticsRespVO;
import com.zt.plat.framework.databus.server.service.DatabusSyncStatisticsService;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.tags.Tag;
import jakarta.annotation.Resource;
import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import static com.zt.plat.framework.common.pojo.CommonResult.success;
/**
* DataBus 同步统计 Controller
*
* @author ZT
*/
@Tag(name = "管理后台 - DataBus 同步统计")
@RestController
@RequestMapping("/databus/sync")
@Validated
public class DatabusSyncStatisticsController {
@Resource
private DatabusSyncStatisticsService statisticsService;
@GetMapping("/statistics")
@Operation(summary = "获取同步统计数据")
@PreAuthorize("@ss.hasPermission('databus:sync:query')")
public CommonResult<DatabusSyncStatisticsRespVO> getStatistics() {
DatabusSyncStatisticsRespVO statistics = statisticsService.getStatistics();
return success(statistics);
}
}

View File

@@ -2,6 +2,7 @@ package com.zt.plat.framework.databus.server.controller.admin;
import com.zt.plat.framework.common.pojo.CommonResult;
import com.zt.plat.framework.common.pojo.PageResult;
import com.zt.plat.framework.databus.server.controller.admin.vo.subscription.DatabusSyncSubscriptionBatchStatusReqVO;
import com.zt.plat.framework.databus.server.controller.admin.vo.subscription.DatabusSyncSubscriptionPageReqVO;
import com.zt.plat.framework.databus.server.controller.admin.vo.subscription.DatabusSyncSubscriptionRespVO;
import com.zt.plat.framework.databus.server.controller.admin.vo.subscription.DatabusSyncSubscriptionSaveReqVO;
@@ -97,4 +98,21 @@ public class DatabusSyncSubscriptionController {
return success(true);
}
@GetMapping("/list-by-client")
@Operation(summary = "根据客户端ID获取订阅列表")
@Parameter(name = "clientId", description = "客户端ID", required = true, example = "1")
@PreAuthorize("@ss.hasPermission('databus:sync:subscription:query')")
public CommonResult<java.util.List<DatabusSyncSubscriptionRespVO>> getSubscriptionListByClient(@RequestParam("clientId") Long clientId) {
java.util.List<DatabusSyncSubscriptionDO> list = subscriptionService.getSubscriptionListByClient(clientId);
return success(DatabusSyncSubscriptionConvert.INSTANCE.convertList(list));
}
@PutMapping("/batch-status")
@Operation(summary = "批量修改订阅启用状态")
@PreAuthorize("@ss.hasPermission('databus:sync:subscription:update')")
public CommonResult<Boolean> batchUpdateSubscriptionStatus(@Valid @RequestBody DatabusSyncSubscriptionBatchStatusReqVO reqVO) {
subscriptionService.batchUpdateSubscriptionStatus(reqVO.getIds(), reqVO.getEnabled());
return success(true);
}
}

View File

@@ -0,0 +1,26 @@
package com.zt.plat.framework.databus.server.controller.admin.vo.client;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.NotNull;
import lombok.Data;
import java.util.List;
/**
* 客户端批量状态更新请求 VO
*
* @author ZT
*/
@Schema(description = "管理后台 - 客户端批量状态更新请求 VO")
@Data
public class DatabusSyncClientBatchStatusReqVO {
@Schema(description = "客户端ID列表", requiredMode = Schema.RequiredMode.REQUIRED, example = "[1, 2, 3]")
@NotEmpty(message = "客户端ID列表不能为空")
private List<Long> ids;
@Schema(description = "启用状态", requiredMode = Schema.RequiredMode.REQUIRED, example = "1")
@NotNull(message = "启用状态不能为空")
private Integer enabled;
}

View File

@@ -0,0 +1,26 @@
package com.zt.plat.framework.databus.server.controller.admin.vo.event;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.NotNull;
import lombok.Data;
import java.util.List;
/**
* 事件批量状态更新请求 VO
*
* @author ZT
*/
@Schema(description = "管理后台 - 事件批量状态更新请求 VO")
@Data
public class DatabusSyncEventBatchStatusReqVO {
@Schema(description = "事件ID列表", requiredMode = Schema.RequiredMode.REQUIRED, example = "[1, 2, 3]")
@NotEmpty(message = "事件ID列表不能为空")
private List<Long> ids;
@Schema(description = "启用状态", requiredMode = Schema.RequiredMode.REQUIRED, example = "1")
@NotNull(message = "启用状态不能为空")
private Integer enabled;
}

View File

@@ -0,0 +1,38 @@
package com.zt.plat.framework.databus.server.controller.admin.vo.statistics;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Data;
/**
* DataBus 同步统计响应 VO
*
* @author ZT
*/
@Schema(description = "管理后台 - DataBus 同步统计响应 VO")
@Data
public class DatabusSyncStatisticsRespVO {
@Schema(description = "事件总数", requiredMode = Schema.RequiredMode.REQUIRED, example = "12")
private Integer totalEvents;
@Schema(description = "客户端总数", requiredMode = Schema.RequiredMode.REQUIRED, example = "5")
private Integer totalClients;
@Schema(description = "订阅总数", requiredMode = Schema.RequiredMode.REQUIRED, example = "60")
private Integer totalSubscriptions;
@Schema(description = "活跃订阅数", requiredMode = Schema.RequiredMode.REQUIRED, example = "58")
private Integer activeSubscriptions;
@Schema(description = "今日推送总数", requiredMode = Schema.RequiredMode.REQUIRED, example = "1234")
private Integer todayPushCount;
@Schema(description = "今日成功数", requiredMode = Schema.RequiredMode.REQUIRED, example = "1200")
private Integer todaySuccessCount;
@Schema(description = "今日失败数", requiredMode = Schema.RequiredMode.REQUIRED, example = "34")
private Integer todayFailureCount;
@Schema(description = "死信队列数量", requiredMode = Schema.RequiredMode.REQUIRED, example = "10")
private Integer deadLetterCount;
}

View File

@@ -0,0 +1,26 @@
package com.zt.plat.framework.databus.server.controller.admin.vo.subscription;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.NotNull;
import lombok.Data;
import java.util.List;
/**
* 订阅批量状态更新请求 VO
*
* @author ZT
*/
@Schema(description = "管理后台 - 订阅批量状态更新请求 VO")
@Data
public class DatabusSyncSubscriptionBatchStatusReqVO {
@Schema(description = "订阅ID列表", requiredMode = Schema.RequiredMode.REQUIRED, example = "[1, 2, 3]")
@NotEmpty(message = "订阅ID列表不能为空")
private List<Long> ids;
@Schema(description = "启用状态", requiredMode = Schema.RequiredMode.REQUIRED, example = "1")
@NotNull(message = "启用状态不能为空")
private Integer enabled;
}

View File

@@ -9,6 +9,7 @@ import com.zt.plat.framework.databus.server.dal.dataobject.*;
import com.zt.plat.framework.databus.server.dal.mapper.*;
import com.zt.plat.framework.databus.server.enums.SyncStatusEnum;
import com.zt.plat.framework.databus.server.enums.TransportTypeEnum;
import com.zt.plat.framework.tenant.core.util.TenantUtils;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
@@ -48,6 +49,14 @@ public class DatabusIncrementalSyncServiceImpl implements DatabusIncrementalSync
log.info("[Databus增量同步] 开始处理事件, eventType={}, eventAction={}",
event.getEventType(), event.getEventAction());
// 使用 TenantUtils.executeIgnore 忽略租户隔离,因为事件定义、客户端、订阅关系是全局共享的
TenantUtils.executeIgnore(() -> processEventInternal(event));
}
/**
* 内部处理事件逻辑(忽略租户隔离后执行)
*/
private void processEventInternal(DatabusEvent event) {
// 1. 查询事件定义
DatabusSyncEventDO eventDef = eventMapper.selectByEventType(event.getEventType());
if (eventDef == null) {
@@ -216,11 +225,11 @@ public class DatabusIncrementalSyncServiceImpl implements DatabusIncrementalSync
/**
* 构建客户端专属Topic
* 格式: {topicBase}-{module}-{entity}-{action}-{clientCode}
* 示例: databus-sync-system-org-create-company-a
* 格式: {topicBase}-{clientCode}(简化版,所有事件共用一个 Topic
* 示例: databus-sync-branch-001
*
* @param topicBase 基础Topic名称如 databus-sync
* @param eventType 事件类型(格式: system-org-create
* @param eventType 事件类型(格式: system-org-create- 已不再使用,保留参数兼容性
* @param clientCode 客户端编码
*/
private String buildClientTopic(String topicBase, String eventType, String clientCode) {
@@ -228,8 +237,9 @@ public class DatabusIncrementalSyncServiceImpl implements DatabusIncrementalSync
if (topicBase == null || topicBase.isEmpty()) {
topicBase = "databus-sync";
}
// eventType 格式已经是 system-org-create直接拼接
return String.format("%s-%s-%s", topicBase, eventType.toLowerCase(), clientCode);
// 简化 Topic 格式databus-sync-{clientCode}
// 不再为每个事件创建独立 Topic而是通过消息体中的 eventType 字段路由
return String.format("%s-%s", topicBase, clientCode);
}
/**

View File

@@ -38,4 +38,13 @@ public interface DatabusSyncSubscriptionMapper extends BaseMapperX<DatabusSyncSu
.eq(DatabusSyncSubscriptionDO::getEnabled, 1));
}
/**
* 根据客户端ID查询所有订阅
*/
default java.util.List<DatabusSyncSubscriptionDO> selectListByClientId(Long clientId) {
return selectList(new LambdaQueryWrapperX<DatabusSyncSubscriptionDO>()
.eq(DatabusSyncSubscriptionDO::getClientId, clientId)
.orderByDesc(DatabusSyncSubscriptionDO::getId));
}
}

View File

@@ -0,0 +1,85 @@
package com.zt.plat.framework.databus.server.provider;
import com.zt.plat.framework.common.pojo.CommonResult;
import com.zt.plat.framework.databus.server.core.provider.DataProvider;
import com.zt.plat.framework.databus.server.core.provider.DataProviderRegistry;
import com.zt.plat.module.databus.api.dto.CursorPageReqDTO;
import com.zt.plat.module.databus.api.dto.CursorPageResult;
import com.zt.plat.module.databus.api.data.DatabusDeptData;
import com.zt.plat.module.databus.api.provider.DatabusDeptProviderApi;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
/**
* 部门数据提供者
* <p>
* 通过 Feign 调用 system-server 获取部门数据
*
* @author ZT
*/
@Slf4j
@Component
public class DeptDataFeignProvider implements DataProvider<DatabusDeptData> {
public static final String PROVIDER_TYPE = "DEPT";
@Resource
private DatabusDeptProviderApi deptProviderApi;
@Resource
private DataProviderRegistry dataProviderRegistry;
@PostConstruct
public void init() {
dataProviderRegistry.register(this);
}
@Override
public String getProviderType() {
return PROVIDER_TYPE;
}
@Override
public CursorPageData<DatabusDeptData> getPageByCursor(LocalDateTime cursorTime, Long cursorId,
int batchSize, Long tenantId) {
CursorPageReqDTO reqDTO = CursorPageReqDTO.builder()
.cursorTime(cursorTime)
.cursorId(cursorId)
.batchSize(batchSize)
.tenantId(tenantId)
.build();
CommonResult<CursorPageResult<DatabusDeptData>> result = deptProviderApi.getPageByCursor(reqDTO);
if (!result.isSuccess()) {
throw new RuntimeException("获取部门数据失败: " + result.getMsg());
}
CursorPageResult<DatabusDeptData> pageResult = result.getData();
return CursorPageData.of(
pageResult.getList(),
pageResult.getNextCursorTime(),
pageResult.getNextCursorId(),
pageResult.getCount(),
Boolean.TRUE.equals(pageResult.getHasMore()),
(pageResult.getTotal() != null ? pageResult.getTotal() : 0L)
);
}
@Override
public long count(Long tenantId) {
CommonResult<Long> result = deptProviderApi.count(tenantId);
if (!result.isSuccess()) {
throw new RuntimeException("获取部门总数失败: " + result.getMsg());
}
return result.getData();
}
@Override
public Long extractUid(DatabusDeptData data) {
return data.getId();
}
}

View File

@@ -0,0 +1,81 @@
package com.zt.plat.framework.databus.server.provider;
import com.zt.plat.framework.common.pojo.CommonResult;
import com.zt.plat.framework.databus.server.core.provider.DataProvider;
import com.zt.plat.framework.databus.server.core.provider.DataProviderRegistry;
import com.zt.plat.module.databus.api.dto.CursorPageReqDTO;
import com.zt.plat.module.databus.api.dto.CursorPageResult;
import com.zt.plat.module.databus.api.data.DatabusPostData;
import com.zt.plat.module.databus.api.provider.DatabusPostProviderApi;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
/**
* 岗位数据提供者
*/
@Slf4j
@Component
public class PostDataFeignProvider implements DataProvider<DatabusPostData> {
public static final String PROVIDER_TYPE = "POST";
@Resource
private DatabusPostProviderApi postProviderApi;
@Resource
private DataProviderRegistry dataProviderRegistry;
@PostConstruct
public void init() {
dataProviderRegistry.register(this);
}
@Override
public String getProviderType() {
return PROVIDER_TYPE;
}
@Override
public CursorPageData<DatabusPostData> getPageByCursor(LocalDateTime cursorTime, Long cursorId,
int batchSize, Long tenantId) {
CursorPageReqDTO reqDTO = CursorPageReqDTO.builder()
.cursorTime(cursorTime)
.cursorId(cursorId)
.batchSize(batchSize)
.tenantId(tenantId)
.build();
CommonResult<CursorPageResult<DatabusPostData>> result = postProviderApi.getPageByCursor(reqDTO);
if (!result.isSuccess()) {
throw new RuntimeException("获取岗位数据失败: " + result.getMsg());
}
CursorPageResult<DatabusPostData> pageResult = result.getData();
return CursorPageData.of(
pageResult.getList(),
pageResult.getNextCursorTime(),
pageResult.getNextCursorId(),
pageResult.getCount(),
Boolean.TRUE.equals(pageResult.getHasMore()),
(pageResult.getTotal() != null ? pageResult.getTotal() : 0L)
);
}
@Override
public long count(Long tenantId) {
CommonResult<Long> result = postProviderApi.count(tenantId);
if (!result.isSuccess()) {
throw new RuntimeException("获取岗位总数失败: " + result.getMsg());
}
return result.getData();
}
@Override
public Long extractUid(DatabusPostData data) {
return data.getId();
}
}

View File

@@ -0,0 +1,81 @@
package com.zt.plat.framework.databus.server.provider;
import com.zt.plat.framework.common.pojo.CommonResult;
import com.zt.plat.framework.databus.server.core.provider.DataProvider;
import com.zt.plat.framework.databus.server.core.provider.DataProviderRegistry;
import com.zt.plat.module.databus.api.dto.CursorPageReqDTO;
import com.zt.plat.module.databus.api.dto.CursorPageResult;
import com.zt.plat.module.databus.api.data.DatabusAdminUserData;
import com.zt.plat.module.databus.api.provider.DatabusUserProviderApi;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
/**
* 用户数据提供者
*/
@Slf4j
@Component
public class UserDataFeignProvider implements DataProvider<DatabusAdminUserData> {
public static final String PROVIDER_TYPE = "USER";
@Resource
private DatabusUserProviderApi userProviderApi;
@Resource
private DataProviderRegistry dataProviderRegistry;
@PostConstruct
public void init() {
dataProviderRegistry.register(this);
}
@Override
public String getProviderType() {
return PROVIDER_TYPE;
}
@Override
public CursorPageData<DatabusAdminUserData> getPageByCursor(LocalDateTime cursorTime, Long cursorId,
int batchSize, Long tenantId) {
CursorPageReqDTO reqDTO = CursorPageReqDTO.builder()
.cursorTime(cursorTime)
.cursorId(cursorId)
.batchSize(batchSize)
.tenantId(tenantId)
.build();
CommonResult<CursorPageResult<DatabusAdminUserData>> result = userProviderApi.getPageByCursor(reqDTO);
if (!result.isSuccess()) {
throw new RuntimeException("获取用户数据失败: " + result.getMsg());
}
CursorPageResult<DatabusAdminUserData> pageResult = result.getData();
return CursorPageData.of(
pageResult.getList(),
pageResult.getNextCursorTime(),
pageResult.getNextCursorId(),
pageResult.getCount(),
Boolean.TRUE.equals(pageResult.getHasMore()),
(pageResult.getTotal() != null ? pageResult.getTotal() : 0L)
);
}
@Override
public long count(Long tenantId) {
CommonResult<Long> result = userProviderApi.count(tenantId);
if (!result.isSuccess()) {
throw new RuntimeException("获取用户总数失败: " + result.getMsg());
}
return result.getData();
}
@Override
public Long extractUid(DatabusAdminUserData data) {
return data.getId();
}
}

View File

@@ -67,4 +67,12 @@ public interface DatabusSyncClientService {
*/
void updateClientStatus(Long id, Integer enabled);
/**
* 批量更新客户端启用状态
*
* @param ids 编号列表
* @param enabled 启用状态
*/
void batchUpdateClientStatus(List<Long> ids, Integer enabled);
}

View File

@@ -67,4 +67,12 @@ public interface DatabusSyncEventService {
*/
void updateEventStatus(Long id, Integer enabled);
/**
* 批量更新事件启用状态
*
* @param ids 编号列表
* @param enabled 启用状态
*/
void batchUpdateEventStatus(List<Long> ids, Integer enabled);
}

View File

@@ -0,0 +1,18 @@
package com.zt.plat.framework.databus.server.service;
import com.zt.plat.framework.databus.server.controller.admin.vo.statistics.DatabusSyncStatisticsRespVO;
/**
* DataBus 同步统计 Service 接口
*
* @author ZT
*/
public interface DatabusSyncStatisticsService {
/**
* 获取 DataBus 同步统计数据
*
* @return 统计数据
*/
DatabusSyncStatisticsRespVO getStatistics();
}

View File

@@ -72,4 +72,20 @@ public interface DatabusSyncSubscriptionService {
*/
void triggerSync(Long id);
/**
* 根据客户端 ID 获取订阅列表
*
* @param clientId 客户端 ID
* @return 订阅列表
*/
java.util.List<DatabusSyncSubscriptionDO> getSubscriptionListByClient(Long clientId);
/**
* 批量更新订阅启用状态
*
* @param ids 编号列表
* @param enabled 启用状态
*/
void batchUpdateSubscriptionStatus(java.util.List<Long> ids, Integer enabled);
}

View File

@@ -240,8 +240,8 @@ public class DatabusFullSyncServiceImpl implements DatabusFullSyncService {
BatchSyncMessage message) {
try {
if (TransportTypeEnum.isMqFirst(client.getTransportType()) && client.getMqEnabled() == 1) {
// 使用 getByTopicSuffix 支持数据库存储的 topic 格式(如 system-post-full
DatabusEventType databusEventType = DatabusEventType.getByTopicSuffix(eventType);
// 使用 getByEventType 支持大写下划线格式(如 SYSTEM_POST_FULL
DatabusEventType databusEventType = DatabusEventType.getByEventType(eventType);
if (databusEventType == null) {
log.error("[Databus] Unknown event type: {}", eventType);
return false;

View File

@@ -112,4 +112,17 @@ public class DatabusSyncClientServiceImpl implements DatabusSyncClientService {
clientMapper.updateById(updateObj);
}
@Override
@Transactional(rollbackFor = Exception.class)
public void batchUpdateClientStatus(List<Long> ids, Integer enabled) {
// 批量更新状态
for (Long id : ids) {
DatabusSyncClientDO updateObj = new DatabusSyncClientDO();
updateObj.setId(id);
updateObj.setEnabled(enabled);
clientMapper.updateById(updateObj);
}
log.info("[batchUpdateClientStatus] 批量更新客户端状态完成, ids={}, enabled={}", ids, enabled);
}
}

View File

@@ -112,4 +112,17 @@ public class DatabusSyncEventServiceImpl implements DatabusSyncEventService {
eventMapper.updateById(updateObj);
}
@Override
@Transactional(rollbackFor = Exception.class)
public void batchUpdateEventStatus(List<Long> ids, Integer enabled) {
// 批量更新状态
for (Long id : ids) {
DatabusSyncEventDO updateObj = new DatabusSyncEventDO();
updateObj.setId(id);
updateObj.setEnabled(enabled);
eventMapper.updateById(updateObj);
}
log.info("[batchUpdateEventStatus] 批量更新事件状态完成, ids={}, enabled={}", ids, enabled);
}
}

View File

@@ -0,0 +1,95 @@
package com.zt.plat.framework.databus.server.service.impl;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.zt.plat.framework.databus.server.controller.admin.vo.statistics.DatabusSyncStatisticsRespVO;
import com.zt.plat.framework.databus.server.dal.dataobject.*;
import com.zt.plat.framework.databus.server.dal.mapper.*;
import com.zt.plat.framework.databus.server.service.DatabusSyncStatisticsService;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
/**
* DataBus 同步统计 Service 实现类
*
* @author ZT
*/
@Slf4j
@Service
public class DatabusSyncStatisticsServiceImpl implements DatabusSyncStatisticsService {
@Resource
private DatabusSyncEventMapper eventMapper;
@Resource
private DatabusSyncClientMapper clientMapper;
@Resource
private DatabusSyncSubscriptionMapper subscriptionMapper;
@Resource
private DatabusSyncLogMapper logMapper;
@Resource
private DatabusSyncDeadLetterMapper deadLetterMapper;
@Override
public DatabusSyncStatisticsRespVO getStatistics() {
DatabusSyncStatisticsRespVO vo = new DatabusSyncStatisticsRespVO();
// 1. 事件总数
vo.setTotalEvents(Math.toIntExact(eventMapper.selectCount(null)));
// 2. 客户端总数
vo.setTotalClients(Math.toIntExact(clientMapper.selectCount(null)));
// 3. 订阅总数
vo.setTotalSubscriptions(Math.toIntExact(subscriptionMapper.selectCount(null)));
// 4. 活跃订阅数enabled = 1
vo.setActiveSubscriptions(Math.toIntExact(subscriptionMapper.selectCount(
new LambdaQueryWrapper<DatabusSyncSubscriptionDO>()
.eq(DatabusSyncSubscriptionDO::getEnabled, 1)
)));
// 5. 今日推送统计(基于 create_time
LocalDateTime todayStart = LocalDateTime.of(LocalDate.now(), LocalTime.MIN);
LocalDateTime todayEnd = LocalDateTime.of(LocalDate.now(), LocalTime.MAX);
// 今日推送总数
vo.setTodayPushCount(Math.toIntExact(logMapper.selectCount(
new LambdaQueryWrapper<DatabusSyncLogDO>()
.ge(DatabusSyncLogDO::getCreateTime, todayStart)
.le(DatabusSyncLogDO::getCreateTime, todayEnd)
)));
// 今日成功数status = 'SUCCESS'
vo.setTodaySuccessCount(Math.toIntExact(logMapper.selectCount(
new LambdaQueryWrapper<DatabusSyncLogDO>()
.ge(DatabusSyncLogDO::getCreateTime, todayStart)
.le(DatabusSyncLogDO::getCreateTime, todayEnd)
.eq(DatabusSyncLogDO::getStatus, "SUCCESS")
)));
// 今日失败数status = 'FAILURE'
vo.setTodayFailureCount(Math.toIntExact(logMapper.selectCount(
new LambdaQueryWrapper<DatabusSyncLogDO>()
.ge(DatabusSyncLogDO::getCreateTime, todayStart)
.le(DatabusSyncLogDO::getCreateTime, todayEnd)
.eq(DatabusSyncLogDO::getStatus, "FAILURE")
)));
// 6. 死信队列数量handled = 0
vo.setDeadLetterCount(Math.toIntExact(deadLetterMapper.selectCount(
new LambdaQueryWrapper<DatabusSyncDeadLetterDO>()
.eq(DatabusSyncDeadLetterDO::getHandled, 0)
)));
log.info("[DatabusSyncStatistics] 统计数据获取成功: {}", vo);
return vo;
}
}

View File

@@ -126,4 +126,22 @@ public class DatabusSyncSubscriptionServiceImpl implements DatabusSyncSubscripti
log.info("[triggerSync] 手动触发同步订阅ID: {}", id);
}
@Override
public java.util.List<DatabusSyncSubscriptionDO> getSubscriptionListByClient(Long clientId) {
return subscriptionMapper.selectListByClientId(clientId);
}
@Override
@Transactional(rollbackFor = Exception.class)
public void batchUpdateSubscriptionStatus(java.util.List<Long> ids, Integer enabled) {
// 批量更新状态
for (Long id : ids) {
DatabusSyncSubscriptionDO updateObj = new DatabusSyncSubscriptionDO();
updateObj.setId(id);
updateObj.setEnabled(enabled);
subscriptionMapper.updateById(updateObj);
}
log.info("[batchUpdateSubscriptionStatus] 批量更新订阅状态完成, ids={}, enabled={}", ids, enabled);
}
}