From 62494ced45f8c43e01428db74a2354443af320c5 Mon Sep 17 00:00:00 2001 From: hewencai <2357300448@qq.com> Date: Wed, 3 Dec 2025 11:10:57 +0800 Subject: [PATCH] =?UTF-8?q?fix(databus):=20=E4=BF=AE=E5=A4=8D=E5=AE=A2?= =?UTF-8?q?=E6=88=B7=E7=AB=AF=E6=B6=88=E6=81=AF=E5=A4=84=E7=90=86=E5=92=8C?= =?UTF-8?q?=E9=98=B2=E6=AD=A2=E6=B6=88=E6=81=AF=E5=BE=AA=E7=8E=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 1. 修复消息格式不匹配问题 - 增量消息:兼容 SyncMessage 格式,从 dataSnapshot 字段反序列化数据 - 批量消息:添加 getDataType() 方法获取泛型类型,正确转换 JSONObject 2. 防止消息循环 - 添加 zt.databus.change.producer.enabled 配置项 - 客户端禁用变更消息发送,避免 客户端写入 → 发送变更 → 循环 3. 修复 Feign 客户端注入 - 在 RpcConfiguration 中添加 DeptApi、PostApi - 确保客户端能通过 Feign 调用本地 system-server API 相关文件: - DatabusClientConsumer.java: 修复消息解析逻辑 - BatchSyncEventHandler.java: 添加 getDataType() 方法 - DatabusChangeProducer.java: 添加 enabled 开关 - RpcConfiguration.java: 启用 DeptApi/PostApi Feign 客户端 Ref: 修复 ClassCastException 和消息循环问题 --- pom.xml | 2 +- zt-framework/pom.xml | 3 +- .../core/consumer/DatabusClientConsumer.java | 103 +++++++++++++++--- .../client/handler/BatchSyncEventHandler.java | 28 +++++ .../pom.xml | 7 ++ .../consumer/DatabusDeptChangeConsumer.java | 6 +- .../consumer/DatabusPostChangeConsumer.java | 5 +- .../consumer/DatabusUserChangeConsumer.java | 5 +- .../admin/DatabusSyncClientController.java | 9 ++ .../admin/DatabusSyncEventController.java | 9 ++ .../DatabusSyncStatisticsController.java | 38 +++++++ .../DatabusSyncSubscriptionController.java | 18 +++ .../DatabusSyncClientBatchStatusReqVO.java | 26 +++++ .../DatabusSyncEventBatchStatusReqVO.java | 26 +++++ .../DatabusSyncStatisticsRespVO.java | 38 +++++++ ...tabusSyncSubscriptionBatchStatusReqVO.java | 26 +++++ .../DatabusIncrementalSyncServiceImpl.java | 20 +++- .../mapper/DatabusSyncSubscriptionMapper.java | 9 ++ .../provider/DeptDataFeignProvider.java | 2 +- .../provider/PostDataFeignProvider.java | 2 +- .../provider/UserDataFeignProvider.java | 2 +- .../service/DatabusSyncClientService.java | 8 ++ .../service/DatabusSyncEventService.java | 8 ++ .../service/DatabusSyncStatisticsService.java | 18 +++ .../DatabusSyncSubscriptionService.java | 16 +++ .../impl/DatabusFullSyncServiceImpl.java | 4 +- .../impl/DatabusSyncClientServiceImpl.java | 13 +++ .../impl/DatabusSyncEventServiceImpl.java | 13 +++ .../DatabusSyncStatisticsServiceImpl.java | 95 ++++++++++++++++ .../DatabusSyncSubscriptionServiceImpl.java | 18 +++ .../databus/enums/DatabusEventType.java | 32 +++++- .../rpc/config/RpcConfiguration.java | 12 +- .../src/main/resources/application-dev.yml | 29 ++++- .../src/main/resources/application-local.yml | 7 ++ .../src/main/resources/application.yml | 7 +- .../databus/DatabusChangeProducer.java | 28 +++++ .../src/main/resources/application-dev.yaml | 8 +- 37 files changed, 659 insertions(+), 41 deletions(-) rename {zt-module-databus/zt-module-databus-server/src/main/java/com/zt/plat/module/databus/mq => zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server}/consumer/DatabusDeptChangeConsumer.java (93%) rename {zt-module-databus/zt-module-databus-server/src/main/java/com/zt/plat/module/databus/mq => zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server}/consumer/DatabusPostChangeConsumer.java (91%) rename {zt-module-databus/zt-module-databus-server/src/main/java/com/zt/plat/module/databus/mq => zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server}/consumer/DatabusUserChangeConsumer.java (92%) create mode 100644 zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/controller/admin/DatabusSyncStatisticsController.java create mode 100644 zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/controller/admin/vo/client/DatabusSyncClientBatchStatusReqVO.java create mode 100644 zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/controller/admin/vo/event/DatabusSyncEventBatchStatusReqVO.java create mode 100644 zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/controller/admin/vo/statistics/DatabusSyncStatisticsRespVO.java create mode 100644 zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/controller/admin/vo/subscription/DatabusSyncSubscriptionBatchStatusReqVO.java rename {zt-module-databus/zt-module-databus-server/src/main/java/com/zt/plat/module/databus => zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server}/provider/DeptDataFeignProvider.java (98%) rename {zt-module-databus/zt-module-databus-server/src/main/java/com/zt/plat/module/databus => zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server}/provider/PostDataFeignProvider.java (97%) rename {zt-module-databus/zt-module-databus-server/src/main/java/com/zt/plat/module/databus => zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server}/provider/UserDataFeignProvider.java (98%) create mode 100644 zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/service/DatabusSyncStatisticsService.java create mode 100644 zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/service/impl/DatabusSyncStatisticsServiceImpl.java diff --git a/pom.xml b/pom.xml index e10371b8..ee0032e3 100644 --- a/pom.xml +++ b/pom.xml @@ -235,7 +235,7 @@ dev 172.16.46.63:30848 - dev + hwc DEFAULT_GROUP nacos P@ssword25 diff --git a/zt-framework/pom.xml b/zt-framework/pom.xml index f60ab308..241a211e 100644 --- a/zt-framework/pom.xml +++ b/zt-framework/pom.xml @@ -17,7 +17,8 @@ zt-spring-boot-starter-web zt-spring-boot-starter-security zt-spring-boot-starter-websocket - + zt-spring-boot-starter-databus-server + zt-spring-boot-starter-databus-client zt-spring-boot-starter-monitor zt-spring-boot-starter-protection diff --git a/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/core/consumer/DatabusClientConsumer.java b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/core/consumer/DatabusClientConsumer.java index 087dbf63..4a7c5cdb 100644 --- a/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/core/consumer/DatabusClientConsumer.java +++ b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/core/consumer/DatabusClientConsumer.java @@ -56,7 +56,7 @@ public class DatabusClientConsumer implements RocketMQListener { log.info("[DatabusClient] 收到消息, eventType={}", eventType); // 2. 根据 eventType 判断消息类型并分发 - if (isBatchMessage(body)) { + if (isBatchMessage(eventType)) { // 批量消息(全量同步) handleBatchMessage(body, eventType); } else { @@ -82,18 +82,60 @@ public class DatabusClientConsumer implements RocketMQListener { return; } - // 2. 解析批量消息 - DatabusBatchMessage message = JSON.parseObject(body, DatabusBatchMessage.class); + // 2. 获取数据类型 + Class dataType = handler.getDataType(); - // 3. 全量同步开始回调(第一批) + // 3. 解析批量消息(兼容服务端 BatchSyncMessage 格式) + var json = JSON.parseObject(body); + + // 兼容处理:服务端使用 fullTaskId,转换为 taskId + String taskId = json.getString("taskId"); + if (taskId == null) { + taskId = String.valueOf(json.getLong("fullTaskId")); + } + + DatabusBatchMessage message = new DatabusBatchMessage<>(); + message.setMessageId(json.getString("messageId")); + message.setTaskId(taskId); + message.setEventType(eventType); + message.setBatchNo(json.getInteger("batchNo")); + message.setTotalBatch(json.getInteger("totalBatch")); + message.setCount(json.getInteger("count")); + message.setTotalCount(json.getInteger("totalCount") != null ? json.getInteger("totalCount") : 0); + message.setIsLastBatch(json.getBoolean("isLastBatch")); + message.setTenantId(json.getLong("tenantId")); + + // 解析 dataList(服务端是 SyncDataItem 列表,需要提取 data 字段并转换为具体类型) + var dataListJson = json.getJSONArray("dataList"); + if (dataListJson != null) { + java.util.List dataList = new java.util.ArrayList<>(); + for (int i = 0; i < dataListJson.size(); i++) { + var item = dataListJson.getJSONObject(i); + // 服务端 SyncDataItem 结构:{action, uid, data} + // data 字段是 JSON 字符串,需要解析成具体类型 + String dataStr = item.getString("data"); + if (dataStr != null) { + // 使用 handler.getDataType() 反序列化成具体类型 + Object data = JSON.parseObject(dataStr, dataType); + dataList.add(data); + } else { + // 如果没有 data 字段,可能是直接的数据对象,也需要转换类型 + Object data = JSON.parseObject(item.toJSONString(), dataType); + dataList.add(data); + } + } + message.setDataList(dataList); + } + + // 4. 全量同步开始回调(第一批) if (message.getBatchNo() == 1) { handler.onFullSyncStart(message); } - // 4. 处理批次数据 + // 5. 处理批次数据 handler.handleBatch(message); - // 5. 全量同步完成回调(最后一批) + // 6. 全量同步完成回调(最后一批) if (message.getBatchNo().equals(message.getTotalBatch())) { handler.onFullSyncComplete(message); } @@ -104,6 +146,15 @@ public class DatabusClientConsumer implements RocketMQListener { /** * 处理增量消息 + *

+ * 兼容服务端 SyncMessage 格式: + * - syncId: 同步ID + * - eventRecordId: 事件记录ID + * - eventType: 事件类型 + * - eventAction: 事件动作 + * - dataSnapshot: 业务数据快照(JSON字符串) + * - dataVersion: 数据版本 + * - timestamp: 时间戳 */ @SuppressWarnings("unchecked") private void handleIncrementalMessage(String body, DatabusEventType eventType) { @@ -114,14 +165,36 @@ public class DatabusClientConsumer implements RocketMQListener { return; } - // 2. 解析增量消息 - DatabusMessage message = JSON.parseObject(body, DatabusMessage.class); + // 2. 解析增量消息(兼容服务端 SyncMessage 格式) + var json = JSON.parseObject(body); + + // 从 dataSnapshot 字段解析业务数据 + String dataSnapshot = json.getString("dataSnapshot"); + Object data = null; + Long dataId = null; + if (dataSnapshot != null && !dataSnapshot.isEmpty()) { + // dataSnapshot 是 JSON 字符串,需要解析成具体类型 + var dataJson = JSON.parseObject(dataSnapshot); + // 获取数据类型并反序列化 + Class dataType = handler.getDataType(); + data = JSON.parseObject(dataSnapshot, dataType); + // 提取 dataId + dataId = dataJson.getLong("id"); + } + + // 构建 DatabusMessage + DatabusMessage message = new DatabusMessage<>(); + message.setMessageId(json.getString("syncId")); + message.setEventType(eventType); + message.setDataId(dataId); + message.setData(data); + message.setTenantId(json.getLong("tenantId")); // 3. 处理消息 handler.handle(message); - log.info("[DatabusClient] 增量消息处理完成, eventType={}, messageId={}", - eventType, message.getMessageId()); + log.info("[DatabusClient] 增量消息处理完成, eventType={}, syncId={}, dataId={}", + eventType, message.getMessageId(), dataId); } /** @@ -143,13 +216,11 @@ public class DatabusClientConsumer implements RocketMQListener { /** * 判断是否为批量消息 */ - private boolean isBatchMessage(String body) { + private boolean isBatchMessage(DatabusEventType eventType) { try { - // 批量消息包含 taskId, batchNo, totalBatch 字段 - var json = JSON.parseObject(body); - return json.containsKey("taskId") - && json.containsKey("batchNo") - && json.containsKey("totalBatch"); + // 批量消息包含 batchNo, totalBatch 字段 + // 服务端使用 fullTaskId,客户端 API 使用 taskId,兼容两种格式 + return eventType.getAction().equals("full"); } catch (Exception e) { return false; } diff --git a/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/BatchSyncEventHandler.java b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/BatchSyncEventHandler.java index 4ab5a573..83c21f69 100644 --- a/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/BatchSyncEventHandler.java +++ b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/BatchSyncEventHandler.java @@ -3,6 +3,9 @@ package com.zt.plat.framework.databus.client.handler; import com.zt.plat.module.databus.api.message.DatabusBatchMessage; import com.zt.plat.module.databus.enums.DatabusEventType; +import java.lang.reflect.ParameterizedType; +import java.lang.reflect.Type; + /** * 批量同步事件处理器接口 *

@@ -48,4 +51,29 @@ public interface BatchSyncEventHandler { // 默认空实现,子类可覆盖 } + /** + * 获取数据类型 + *

+ * 默认通过反射获取泛型类型参数,子类可以覆盖此方法提供具体类型 + * + * @return 数据类型的 Class 对象 + */ + @SuppressWarnings("unchecked") + default Class getDataType() { + Type[] genericInterfaces = this.getClass().getGenericInterfaces(); + for (Type genericInterface : genericInterfaces) { + if (genericInterface instanceof ParameterizedType) { + ParameterizedType parameterizedType = (ParameterizedType) genericInterface; + if (parameterizedType.getRawType().equals(BatchSyncEventHandler.class)) { + Type[] typeArguments = parameterizedType.getActualTypeArguments(); + if (typeArguments.length > 0 && typeArguments[0] instanceof Class) { + return (Class) typeArguments[0]; + } + } + } + } + // 如果无法获取泛型类型,返回 Object.class + return (Class) Object.class; + } + } diff --git a/zt-framework/zt-spring-boot-starter-databus-server/pom.xml b/zt-framework/zt-spring-boot-starter-databus-server/pom.xml index 59150712..88c2555d 100644 --- a/zt-framework/zt-spring-boot-starter-databus-server/pom.xml +++ b/zt-framework/zt-spring-boot-starter-databus-server/pom.xml @@ -27,6 +27,13 @@ ${revision} + + + com.zt.plat + zt-module-system-api + ${revision} + + cn.hutool hutool-all diff --git a/zt-module-databus/zt-module-databus-server/src/main/java/com/zt/plat/module/databus/mq/consumer/DatabusDeptChangeConsumer.java b/zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/consumer/DatabusDeptChangeConsumer.java similarity index 93% rename from zt-module-databus/zt-module-databus-server/src/main/java/com/zt/plat/module/databus/mq/consumer/DatabusDeptChangeConsumer.java rename to zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/consumer/DatabusDeptChangeConsumer.java index e3794b25..474d8eb2 100644 --- a/zt-module-databus/zt-module-databus-server/src/main/java/com/zt/plat/module/databus/mq/consumer/DatabusDeptChangeConsumer.java +++ b/zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/consumer/DatabusDeptChangeConsumer.java @@ -1,4 +1,4 @@ -package com.zt.plat.module.databus.mq.consumer; +package com.zt.plat.framework.databus.server.consumer; import com.fasterxml.jackson.databind.ObjectMapper; import com.zt.plat.framework.databus.server.core.event.DatabusEvent; @@ -61,8 +61,8 @@ public class DatabusDeptChangeConsumer implements RocketMQListener batchUpdateClientStatus(@Valid @RequestBody DatabusSyncClientBatchStatusReqVO reqVO) { + clientService.batchUpdateClientStatus(reqVO.getIds(), reqVO.getEnabled()); + return success(true); + } + } diff --git a/zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/controller/admin/DatabusSyncEventController.java b/zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/controller/admin/DatabusSyncEventController.java index c74227d2..6e4f4a64 100644 --- a/zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/controller/admin/DatabusSyncEventController.java +++ b/zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/controller/admin/DatabusSyncEventController.java @@ -2,6 +2,7 @@ package com.zt.plat.framework.databus.server.controller.admin; import com.zt.plat.framework.common.pojo.CommonResult; import com.zt.plat.framework.common.pojo.PageResult; +import com.zt.plat.framework.databus.server.controller.admin.vo.event.DatabusSyncEventBatchStatusReqVO; import com.zt.plat.framework.databus.server.controller.admin.vo.event.DatabusSyncEventPageReqVO; import com.zt.plat.framework.databus.server.controller.admin.vo.event.DatabusSyncEventRespVO; import com.zt.plat.framework.databus.server.controller.admin.vo.event.DatabusSyncEventSaveReqVO; @@ -89,4 +90,12 @@ public class DatabusSyncEventController { return success(true); } + @PutMapping("/batch-status") + @Operation(summary = "批量修改事件启用状态") + @PreAuthorize("@ss.hasPermission('databus:sync:event:update')") + public CommonResult batchUpdateEventStatus(@Valid @RequestBody DatabusSyncEventBatchStatusReqVO reqVO) { + eventService.batchUpdateEventStatus(reqVO.getIds(), reqVO.getEnabled()); + return success(true); + } + } diff --git a/zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/controller/admin/DatabusSyncStatisticsController.java b/zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/controller/admin/DatabusSyncStatisticsController.java new file mode 100644 index 00000000..fb2f0a7d --- /dev/null +++ b/zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/controller/admin/DatabusSyncStatisticsController.java @@ -0,0 +1,38 @@ +package com.zt.plat.framework.databus.server.controller.admin; + +import com.zt.plat.framework.common.pojo.CommonResult; +import com.zt.plat.framework.databus.server.controller.admin.vo.statistics.DatabusSyncStatisticsRespVO; +import com.zt.plat.framework.databus.server.service.DatabusSyncStatisticsService; +import io.swagger.v3.oas.annotations.Operation; +import io.swagger.v3.oas.annotations.tags.Tag; +import jakarta.annotation.Resource; +import org.springframework.security.access.prepost.PreAuthorize; +import org.springframework.validation.annotation.Validated; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +import static com.zt.plat.framework.common.pojo.CommonResult.success; + +/** + * DataBus 同步统计 Controller + * + * @author ZT + */ +@Tag(name = "管理后台 - DataBus 同步统计") +@RestController +@RequestMapping("/databus/sync") +@Validated +public class DatabusSyncStatisticsController { + + @Resource + private DatabusSyncStatisticsService statisticsService; + + @GetMapping("/statistics") + @Operation(summary = "获取同步统计数据") + @PreAuthorize("@ss.hasPermission('databus:sync:query')") + public CommonResult getStatistics() { + DatabusSyncStatisticsRespVO statistics = statisticsService.getStatistics(); + return success(statistics); + } +} diff --git a/zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/controller/admin/DatabusSyncSubscriptionController.java b/zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/controller/admin/DatabusSyncSubscriptionController.java index 737f138f..7fd88490 100644 --- a/zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/controller/admin/DatabusSyncSubscriptionController.java +++ b/zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/controller/admin/DatabusSyncSubscriptionController.java @@ -2,6 +2,7 @@ package com.zt.plat.framework.databus.server.controller.admin; import com.zt.plat.framework.common.pojo.CommonResult; import com.zt.plat.framework.common.pojo.PageResult; +import com.zt.plat.framework.databus.server.controller.admin.vo.subscription.DatabusSyncSubscriptionBatchStatusReqVO; import com.zt.plat.framework.databus.server.controller.admin.vo.subscription.DatabusSyncSubscriptionPageReqVO; import com.zt.plat.framework.databus.server.controller.admin.vo.subscription.DatabusSyncSubscriptionRespVO; import com.zt.plat.framework.databus.server.controller.admin.vo.subscription.DatabusSyncSubscriptionSaveReqVO; @@ -97,4 +98,21 @@ public class DatabusSyncSubscriptionController { return success(true); } + @GetMapping("/list-by-client") + @Operation(summary = "根据客户端ID获取订阅列表") + @Parameter(name = "clientId", description = "客户端ID", required = true, example = "1") + @PreAuthorize("@ss.hasPermission('databus:sync:subscription:query')") + public CommonResult> getSubscriptionListByClient(@RequestParam("clientId") Long clientId) { + java.util.List list = subscriptionService.getSubscriptionListByClient(clientId); + return success(DatabusSyncSubscriptionConvert.INSTANCE.convertList(list)); + } + + @PutMapping("/batch-status") + @Operation(summary = "批量修改订阅启用状态") + @PreAuthorize("@ss.hasPermission('databus:sync:subscription:update')") + public CommonResult batchUpdateSubscriptionStatus(@Valid @RequestBody DatabusSyncSubscriptionBatchStatusReqVO reqVO) { + subscriptionService.batchUpdateSubscriptionStatus(reqVO.getIds(), reqVO.getEnabled()); + return success(true); + } + } diff --git a/zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/controller/admin/vo/client/DatabusSyncClientBatchStatusReqVO.java b/zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/controller/admin/vo/client/DatabusSyncClientBatchStatusReqVO.java new file mode 100644 index 00000000..6789d281 --- /dev/null +++ b/zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/controller/admin/vo/client/DatabusSyncClientBatchStatusReqVO.java @@ -0,0 +1,26 @@ +package com.zt.plat.framework.databus.server.controller.admin.vo.client; + +import io.swagger.v3.oas.annotations.media.Schema; +import jakarta.validation.constraints.NotEmpty; +import jakarta.validation.constraints.NotNull; +import lombok.Data; + +import java.util.List; + +/** + * 客户端批量状态更新请求 VO + * + * @author ZT + */ +@Schema(description = "管理后台 - 客户端批量状态更新请求 VO") +@Data +public class DatabusSyncClientBatchStatusReqVO { + + @Schema(description = "客户端ID列表", requiredMode = Schema.RequiredMode.REQUIRED, example = "[1, 2, 3]") + @NotEmpty(message = "客户端ID列表不能为空") + private List ids; + + @Schema(description = "启用状态", requiredMode = Schema.RequiredMode.REQUIRED, example = "1") + @NotNull(message = "启用状态不能为空") + private Integer enabled; +} diff --git a/zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/controller/admin/vo/event/DatabusSyncEventBatchStatusReqVO.java b/zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/controller/admin/vo/event/DatabusSyncEventBatchStatusReqVO.java new file mode 100644 index 00000000..8a657b66 --- /dev/null +++ b/zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/controller/admin/vo/event/DatabusSyncEventBatchStatusReqVO.java @@ -0,0 +1,26 @@ +package com.zt.plat.framework.databus.server.controller.admin.vo.event; + +import io.swagger.v3.oas.annotations.media.Schema; +import jakarta.validation.constraints.NotEmpty; +import jakarta.validation.constraints.NotNull; +import lombok.Data; + +import java.util.List; + +/** + * 事件批量状态更新请求 VO + * + * @author ZT + */ +@Schema(description = "管理后台 - 事件批量状态更新请求 VO") +@Data +public class DatabusSyncEventBatchStatusReqVO { + + @Schema(description = "事件ID列表", requiredMode = Schema.RequiredMode.REQUIRED, example = "[1, 2, 3]") + @NotEmpty(message = "事件ID列表不能为空") + private List ids; + + @Schema(description = "启用状态", requiredMode = Schema.RequiredMode.REQUIRED, example = "1") + @NotNull(message = "启用状态不能为空") + private Integer enabled; +} diff --git a/zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/controller/admin/vo/statistics/DatabusSyncStatisticsRespVO.java b/zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/controller/admin/vo/statistics/DatabusSyncStatisticsRespVO.java new file mode 100644 index 00000000..94f94f65 --- /dev/null +++ b/zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/controller/admin/vo/statistics/DatabusSyncStatisticsRespVO.java @@ -0,0 +1,38 @@ +package com.zt.plat.framework.databus.server.controller.admin.vo.statistics; + +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.Data; + +/** + * DataBus 同步统计响应 VO + * + * @author ZT + */ +@Schema(description = "管理后台 - DataBus 同步统计响应 VO") +@Data +public class DatabusSyncStatisticsRespVO { + + @Schema(description = "事件总数", requiredMode = Schema.RequiredMode.REQUIRED, example = "12") + private Integer totalEvents; + + @Schema(description = "客户端总数", requiredMode = Schema.RequiredMode.REQUIRED, example = "5") + private Integer totalClients; + + @Schema(description = "订阅总数", requiredMode = Schema.RequiredMode.REQUIRED, example = "60") + private Integer totalSubscriptions; + + @Schema(description = "活跃订阅数", requiredMode = Schema.RequiredMode.REQUIRED, example = "58") + private Integer activeSubscriptions; + + @Schema(description = "今日推送总数", requiredMode = Schema.RequiredMode.REQUIRED, example = "1234") + private Integer todayPushCount; + + @Schema(description = "今日成功数", requiredMode = Schema.RequiredMode.REQUIRED, example = "1200") + private Integer todaySuccessCount; + + @Schema(description = "今日失败数", requiredMode = Schema.RequiredMode.REQUIRED, example = "34") + private Integer todayFailureCount; + + @Schema(description = "死信队列数量", requiredMode = Schema.RequiredMode.REQUIRED, example = "10") + private Integer deadLetterCount; +} diff --git a/zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/controller/admin/vo/subscription/DatabusSyncSubscriptionBatchStatusReqVO.java b/zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/controller/admin/vo/subscription/DatabusSyncSubscriptionBatchStatusReqVO.java new file mode 100644 index 00000000..3f3af776 --- /dev/null +++ b/zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/controller/admin/vo/subscription/DatabusSyncSubscriptionBatchStatusReqVO.java @@ -0,0 +1,26 @@ +package com.zt.plat.framework.databus.server.controller.admin.vo.subscription; + +import io.swagger.v3.oas.annotations.media.Schema; +import jakarta.validation.constraints.NotEmpty; +import jakarta.validation.constraints.NotNull; +import lombok.Data; + +import java.util.List; + +/** + * 订阅批量状态更新请求 VO + * + * @author ZT + */ +@Schema(description = "管理后台 - 订阅批量状态更新请求 VO") +@Data +public class DatabusSyncSubscriptionBatchStatusReqVO { + + @Schema(description = "订阅ID列表", requiredMode = Schema.RequiredMode.REQUIRED, example = "[1, 2, 3]") + @NotEmpty(message = "订阅ID列表不能为空") + private List ids; + + @Schema(description = "启用状态", requiredMode = Schema.RequiredMode.REQUIRED, example = "1") + @NotNull(message = "启用状态不能为空") + private Integer enabled; +} diff --git a/zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/core/sync/DatabusIncrementalSyncServiceImpl.java b/zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/core/sync/DatabusIncrementalSyncServiceImpl.java index 99766a04..8a537b87 100644 --- a/zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/core/sync/DatabusIncrementalSyncServiceImpl.java +++ b/zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/core/sync/DatabusIncrementalSyncServiceImpl.java @@ -9,6 +9,7 @@ import com.zt.plat.framework.databus.server.dal.dataobject.*; import com.zt.plat.framework.databus.server.dal.mapper.*; import com.zt.plat.framework.databus.server.enums.SyncStatusEnum; import com.zt.plat.framework.databus.server.enums.TransportTypeEnum; +import com.zt.plat.framework.tenant.core.util.TenantUtils; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; @@ -48,6 +49,14 @@ public class DatabusIncrementalSyncServiceImpl implements DatabusIncrementalSync log.info("[Databus增量同步] 开始处理事件, eventType={}, eventAction={}", event.getEventType(), event.getEventAction()); + // 使用 TenantUtils.executeIgnore 忽略租户隔离,因为事件定义、客户端、订阅关系是全局共享的 + TenantUtils.executeIgnore(() -> processEventInternal(event)); + } + + /** + * 内部处理事件逻辑(忽略租户隔离后执行) + */ + private void processEventInternal(DatabusEvent event) { // 1. 查询事件定义 DatabusSyncEventDO eventDef = eventMapper.selectByEventType(event.getEventType()); if (eventDef == null) { @@ -216,11 +225,11 @@ public class DatabusIncrementalSyncServiceImpl implements DatabusIncrementalSync /** * 构建客户端专属Topic - * 格式: {topicBase}-{module}-{entity}-{action}-{clientCode} - * 示例: databus-sync-system-org-create-company-a + * 格式: {topicBase}-{clientCode}(简化版,所有事件共用一个 Topic) + * 示例: databus-sync-branch-001 * * @param topicBase 基础Topic名称(如 databus-sync) - * @param eventType 事件类型(格式: system-org-create) + * @param eventType 事件类型(格式: system-org-create)- 已不再使用,保留参数兼容性 * @param clientCode 客户端编码 */ private String buildClientTopic(String topicBase, String eventType, String clientCode) { @@ -228,8 +237,9 @@ public class DatabusIncrementalSyncServiceImpl implements DatabusIncrementalSync if (topicBase == null || topicBase.isEmpty()) { topicBase = "databus-sync"; } - // eventType 格式已经是 system-org-create,直接拼接 - return String.format("%s-%s-%s", topicBase, eventType.toLowerCase(), clientCode); + // 简化 Topic 格式:databus-sync-{clientCode} + // 不再为每个事件创建独立 Topic,而是通过消息体中的 eventType 字段路由 + return String.format("%s-%s", topicBase, clientCode); } /** diff --git a/zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/dal/mapper/DatabusSyncSubscriptionMapper.java b/zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/dal/mapper/DatabusSyncSubscriptionMapper.java index 4d30f98a..2882b11d 100644 --- a/zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/dal/mapper/DatabusSyncSubscriptionMapper.java +++ b/zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/dal/mapper/DatabusSyncSubscriptionMapper.java @@ -38,4 +38,13 @@ public interface DatabusSyncSubscriptionMapper extends BaseMapperX selectListByClientId(Long clientId) { + return selectList(new LambdaQueryWrapperX() + .eq(DatabusSyncSubscriptionDO::getClientId, clientId) + .orderByDesc(DatabusSyncSubscriptionDO::getId)); + } + } diff --git a/zt-module-databus/zt-module-databus-server/src/main/java/com/zt/plat/module/databus/provider/DeptDataFeignProvider.java b/zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/provider/DeptDataFeignProvider.java similarity index 98% rename from zt-module-databus/zt-module-databus-server/src/main/java/com/zt/plat/module/databus/provider/DeptDataFeignProvider.java rename to zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/provider/DeptDataFeignProvider.java index 1423be81..1afc83c4 100644 --- a/zt-module-databus/zt-module-databus-server/src/main/java/com/zt/plat/module/databus/provider/DeptDataFeignProvider.java +++ b/zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/provider/DeptDataFeignProvider.java @@ -1,4 +1,4 @@ -package com.zt.plat.module.databus.provider; +package com.zt.plat.framework.databus.server.provider; import com.zt.plat.framework.common.pojo.CommonResult; import com.zt.plat.framework.databus.server.core.provider.DataProvider; diff --git a/zt-module-databus/zt-module-databus-server/src/main/java/com/zt/plat/module/databus/provider/PostDataFeignProvider.java b/zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/provider/PostDataFeignProvider.java similarity index 97% rename from zt-module-databus/zt-module-databus-server/src/main/java/com/zt/plat/module/databus/provider/PostDataFeignProvider.java rename to zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/provider/PostDataFeignProvider.java index cfb4d8e3..23010f49 100644 --- a/zt-module-databus/zt-module-databus-server/src/main/java/com/zt/plat/module/databus/provider/PostDataFeignProvider.java +++ b/zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/provider/PostDataFeignProvider.java @@ -1,4 +1,4 @@ -package com.zt.plat.module.databus.provider; +package com.zt.plat.framework.databus.server.provider; import com.zt.plat.framework.common.pojo.CommonResult; import com.zt.plat.framework.databus.server.core.provider.DataProvider; diff --git a/zt-module-databus/zt-module-databus-server/src/main/java/com/zt/plat/module/databus/provider/UserDataFeignProvider.java b/zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/provider/UserDataFeignProvider.java similarity index 98% rename from zt-module-databus/zt-module-databus-server/src/main/java/com/zt/plat/module/databus/provider/UserDataFeignProvider.java rename to zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/provider/UserDataFeignProvider.java index b2c162e3..52304db0 100644 --- a/zt-module-databus/zt-module-databus-server/src/main/java/com/zt/plat/module/databus/provider/UserDataFeignProvider.java +++ b/zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/provider/UserDataFeignProvider.java @@ -1,4 +1,4 @@ -package com.zt.plat.module.databus.provider; +package com.zt.plat.framework.databus.server.provider; import com.zt.plat.framework.common.pojo.CommonResult; import com.zt.plat.framework.databus.server.core.provider.DataProvider; diff --git a/zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/service/DatabusSyncClientService.java b/zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/service/DatabusSyncClientService.java index 7e634a61..dc0c813a 100644 --- a/zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/service/DatabusSyncClientService.java +++ b/zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/service/DatabusSyncClientService.java @@ -67,4 +67,12 @@ public interface DatabusSyncClientService { */ void updateClientStatus(Long id, Integer enabled); + /** + * 批量更新客户端启用状态 + * + * @param ids 编号列表 + * @param enabled 启用状态 + */ + void batchUpdateClientStatus(List ids, Integer enabled); + } diff --git a/zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/service/DatabusSyncEventService.java b/zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/service/DatabusSyncEventService.java index fe7c2b3f..2de4779b 100644 --- a/zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/service/DatabusSyncEventService.java +++ b/zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/service/DatabusSyncEventService.java @@ -67,4 +67,12 @@ public interface DatabusSyncEventService { */ void updateEventStatus(Long id, Integer enabled); + /** + * 批量更新事件启用状态 + * + * @param ids 编号列表 + * @param enabled 启用状态 + */ + void batchUpdateEventStatus(List ids, Integer enabled); + } diff --git a/zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/service/DatabusSyncStatisticsService.java b/zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/service/DatabusSyncStatisticsService.java new file mode 100644 index 00000000..01140d7f --- /dev/null +++ b/zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/service/DatabusSyncStatisticsService.java @@ -0,0 +1,18 @@ +package com.zt.plat.framework.databus.server.service; + +import com.zt.plat.framework.databus.server.controller.admin.vo.statistics.DatabusSyncStatisticsRespVO; + +/** + * DataBus 同步统计 Service 接口 + * + * @author ZT + */ +public interface DatabusSyncStatisticsService { + + /** + * 获取 DataBus 同步统计数据 + * + * @return 统计数据 + */ + DatabusSyncStatisticsRespVO getStatistics(); +} diff --git a/zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/service/DatabusSyncSubscriptionService.java b/zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/service/DatabusSyncSubscriptionService.java index 88239aaf..adcddfe5 100644 --- a/zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/service/DatabusSyncSubscriptionService.java +++ b/zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/service/DatabusSyncSubscriptionService.java @@ -72,4 +72,20 @@ public interface DatabusSyncSubscriptionService { */ void triggerSync(Long id); + /** + * 根据客户端 ID 获取订阅列表 + * + * @param clientId 客户端 ID + * @return 订阅列表 + */ + java.util.List getSubscriptionListByClient(Long clientId); + + /** + * 批量更新订阅启用状态 + * + * @param ids 编号列表 + * @param enabled 启用状态 + */ + void batchUpdateSubscriptionStatus(java.util.List ids, Integer enabled); + } diff --git a/zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/service/impl/DatabusFullSyncServiceImpl.java b/zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/service/impl/DatabusFullSyncServiceImpl.java index d92f8800..3d13c7f8 100644 --- a/zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/service/impl/DatabusFullSyncServiceImpl.java +++ b/zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/service/impl/DatabusFullSyncServiceImpl.java @@ -240,8 +240,8 @@ public class DatabusFullSyncServiceImpl implements DatabusFullSyncService { BatchSyncMessage message) { try { if (TransportTypeEnum.isMqFirst(client.getTransportType()) && client.getMqEnabled() == 1) { - // 使用 getByTopicSuffix 支持数据库存储的 topic 格式(如 system-post-full) - DatabusEventType databusEventType = DatabusEventType.getByTopicSuffix(eventType); + // 使用 getByEventType 支持大写下划线格式(如 SYSTEM_POST_FULL) + DatabusEventType databusEventType = DatabusEventType.getByEventType(eventType); if (databusEventType == null) { log.error("[Databus] Unknown event type: {}", eventType); return false; diff --git a/zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/service/impl/DatabusSyncClientServiceImpl.java b/zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/service/impl/DatabusSyncClientServiceImpl.java index 496463c9..482b2046 100644 --- a/zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/service/impl/DatabusSyncClientServiceImpl.java +++ b/zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/service/impl/DatabusSyncClientServiceImpl.java @@ -112,4 +112,17 @@ public class DatabusSyncClientServiceImpl implements DatabusSyncClientService { clientMapper.updateById(updateObj); } + @Override + @Transactional(rollbackFor = Exception.class) + public void batchUpdateClientStatus(List ids, Integer enabled) { + // 批量更新状态 + for (Long id : ids) { + DatabusSyncClientDO updateObj = new DatabusSyncClientDO(); + updateObj.setId(id); + updateObj.setEnabled(enabled); + clientMapper.updateById(updateObj); + } + log.info("[batchUpdateClientStatus] 批量更新客户端状态完成, ids={}, enabled={}", ids, enabled); + } + } diff --git a/zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/service/impl/DatabusSyncEventServiceImpl.java b/zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/service/impl/DatabusSyncEventServiceImpl.java index 355c5f77..0c736d15 100644 --- a/zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/service/impl/DatabusSyncEventServiceImpl.java +++ b/zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/service/impl/DatabusSyncEventServiceImpl.java @@ -112,4 +112,17 @@ public class DatabusSyncEventServiceImpl implements DatabusSyncEventService { eventMapper.updateById(updateObj); } + @Override + @Transactional(rollbackFor = Exception.class) + public void batchUpdateEventStatus(List ids, Integer enabled) { + // 批量更新状态 + for (Long id : ids) { + DatabusSyncEventDO updateObj = new DatabusSyncEventDO(); + updateObj.setId(id); + updateObj.setEnabled(enabled); + eventMapper.updateById(updateObj); + } + log.info("[batchUpdateEventStatus] 批量更新事件状态完成, ids={}, enabled={}", ids, enabled); + } + } diff --git a/zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/service/impl/DatabusSyncStatisticsServiceImpl.java b/zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/service/impl/DatabusSyncStatisticsServiceImpl.java new file mode 100644 index 00000000..6db697bc --- /dev/null +++ b/zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/service/impl/DatabusSyncStatisticsServiceImpl.java @@ -0,0 +1,95 @@ +package com.zt.plat.framework.databus.server.service.impl; + +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import com.zt.plat.framework.databus.server.controller.admin.vo.statistics.DatabusSyncStatisticsRespVO; +import com.zt.plat.framework.databus.server.dal.dataobject.*; +import com.zt.plat.framework.databus.server.dal.mapper.*; +import com.zt.plat.framework.databus.server.service.DatabusSyncStatisticsService; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; + +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; + +/** + * DataBus 同步统计 Service 实现类 + * + * @author ZT + */ +@Slf4j +@Service +public class DatabusSyncStatisticsServiceImpl implements DatabusSyncStatisticsService { + + @Resource + private DatabusSyncEventMapper eventMapper; + + @Resource + private DatabusSyncClientMapper clientMapper; + + @Resource + private DatabusSyncSubscriptionMapper subscriptionMapper; + + @Resource + private DatabusSyncLogMapper logMapper; + + @Resource + private DatabusSyncDeadLetterMapper deadLetterMapper; + + @Override + public DatabusSyncStatisticsRespVO getStatistics() { + DatabusSyncStatisticsRespVO vo = new DatabusSyncStatisticsRespVO(); + + // 1. 事件总数 + vo.setTotalEvents(Math.toIntExact(eventMapper.selectCount(null))); + + // 2. 客户端总数 + vo.setTotalClients(Math.toIntExact(clientMapper.selectCount(null))); + + // 3. 订阅总数 + vo.setTotalSubscriptions(Math.toIntExact(subscriptionMapper.selectCount(null))); + + // 4. 活跃订阅数(enabled = 1) + vo.setActiveSubscriptions(Math.toIntExact(subscriptionMapper.selectCount( + new LambdaQueryWrapper() + .eq(DatabusSyncSubscriptionDO::getEnabled, 1) + ))); + + // 5. 今日推送统计(基于 create_time) + LocalDateTime todayStart = LocalDateTime.of(LocalDate.now(), LocalTime.MIN); + LocalDateTime todayEnd = LocalDateTime.of(LocalDate.now(), LocalTime.MAX); + + // 今日推送总数 + vo.setTodayPushCount(Math.toIntExact(logMapper.selectCount( + new LambdaQueryWrapper() + .ge(DatabusSyncLogDO::getCreateTime, todayStart) + .le(DatabusSyncLogDO::getCreateTime, todayEnd) + ))); + + // 今日成功数(status = 'SUCCESS') + vo.setTodaySuccessCount(Math.toIntExact(logMapper.selectCount( + new LambdaQueryWrapper() + .ge(DatabusSyncLogDO::getCreateTime, todayStart) + .le(DatabusSyncLogDO::getCreateTime, todayEnd) + .eq(DatabusSyncLogDO::getStatus, "SUCCESS") + ))); + + // 今日失败数(status = 'FAILURE') + vo.setTodayFailureCount(Math.toIntExact(logMapper.selectCount( + new LambdaQueryWrapper() + .ge(DatabusSyncLogDO::getCreateTime, todayStart) + .le(DatabusSyncLogDO::getCreateTime, todayEnd) + .eq(DatabusSyncLogDO::getStatus, "FAILURE") + ))); + + // 6. 死信队列数量(handled = 0) + vo.setDeadLetterCount(Math.toIntExact(deadLetterMapper.selectCount( + new LambdaQueryWrapper() + .eq(DatabusSyncDeadLetterDO::getHandled, 0) + ))); + + log.info("[DatabusSyncStatistics] 统计数据获取成功: {}", vo); + return vo; + } +} diff --git a/zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/service/impl/DatabusSyncSubscriptionServiceImpl.java b/zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/service/impl/DatabusSyncSubscriptionServiceImpl.java index 20952417..6feb607c 100644 --- a/zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/service/impl/DatabusSyncSubscriptionServiceImpl.java +++ b/zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/service/impl/DatabusSyncSubscriptionServiceImpl.java @@ -126,4 +126,22 @@ public class DatabusSyncSubscriptionServiceImpl implements DatabusSyncSubscripti log.info("[triggerSync] 手动触发同步,订阅ID: {}", id); } + @Override + public java.util.List getSubscriptionListByClient(Long clientId) { + return subscriptionMapper.selectListByClientId(clientId); + } + + @Override + @Transactional(rollbackFor = Exception.class) + public void batchUpdateSubscriptionStatus(java.util.List ids, Integer enabled) { + // 批量更新状态 + for (Long id : ids) { + DatabusSyncSubscriptionDO updateObj = new DatabusSyncSubscriptionDO(); + updateObj.setId(id); + updateObj.setEnabled(enabled); + subscriptionMapper.updateById(updateObj); + } + log.info("[batchUpdateSubscriptionStatus] 批量更新订阅状态完成, ids={}, enabled={}", ids, enabled); + } + } diff --git a/zt-module-databus/zt-module-databus-api/src/main/java/com/zt/plat/module/databus/enums/DatabusEventType.java b/zt-module-databus/zt-module-databus-api/src/main/java/com/zt/plat/module/databus/enums/DatabusEventType.java index da726cd3..bb853bfe 100644 --- a/zt-module-databus/zt-module-databus-api/src/main/java/com/zt/plat/module/databus/enums/DatabusEventType.java +++ b/zt-module-databus/zt-module-databus-api/src/main/java/com/zt/plat/module/databus/enums/DatabusEventType.java @@ -238,10 +238,13 @@ public enum DatabusEventType { /** * 获取完整Topic名称(服务端转发用) - * 格式: {topicBase}-{module}-{entity}-{action}-{clientCode} + * 格式: {topicBase}-{clientCode}(简化版,所有事件共用一个 Topic) + * 示例: databus-sync-branch-001 + * + * 注意:不再为每个事件创建独立 Topic,而是通过消息体中的 eventType 字段路由 */ public String getTopic(String topicBase, String clientCode) { - return String.format("%s-%s-%s-%s-%s", topicBase, module, entity, action, clientCode); + return String.format("%s-%s", topicBase, clientCode); } /** @@ -270,6 +273,31 @@ public enum DatabusEventType { return null; } + /** + * 根据事件类型字符串获取枚举(支持大写下划线格式) + * 例如:SYSTEM_POST_FULL → DatabusEventType.SYSTEM_POST_FULL + * + * @param eventType 事件类型字符串(格式: MODULE_ENTITY_ACTION) + * @return 枚举值,未找到返回null + */ + public static DatabusEventType getByEventType(String eventType) { + if (eventType == null) { + return null; + } + // 先尝试直接匹配枚举名称 + try { + return DatabusEventType.valueOf(eventType.toUpperCase()); + } catch (IllegalArgumentException e) { + // 如果失败,尝试转换格式后匹配(如 system-post-full → SYSTEM_POST_FULL) + String normalized = eventType.toUpperCase().replace("-", "_"); + try { + return DatabusEventType.valueOf(normalized); + } catch (IllegalArgumentException ex) { + return null; + } + } + } + /** * 根据模块、实体、操作获取枚举 * diff --git a/zt-module-databus/zt-module-databus-server/src/main/java/com/zt/plat/module/databus/framework/rpc/config/RpcConfiguration.java b/zt-module-databus/zt-module-databus-server/src/main/java/com/zt/plat/module/databus/framework/rpc/config/RpcConfiguration.java index 7e83f105..5d974e23 100644 --- a/zt-module-databus/zt-module-databus-server/src/main/java/com/zt/plat/module/databus/framework/rpc/config/RpcConfiguration.java +++ b/zt-module-databus/zt-module-databus-server/src/main/java/com/zt/plat/module/databus/framework/rpc/config/RpcConfiguration.java @@ -1,6 +1,9 @@ package com.zt.plat.module.databus.framework.rpc.config; import com.zt.plat.framework.common.biz.system.oauth2.OAuth2TokenCommonApi; +import com.zt.plat.module.databus.api.provider.DatabusDeptProviderApi; +import com.zt.plat.module.databus.api.provider.DatabusPostProviderApi; +import com.zt.plat.module.databus.api.provider.DatabusUserProviderApi; import com.zt.plat.module.system.api.user.AdminUserApi; import org.springframework.cloud.openfeign.EnableFeignClients; import org.springframework.context.annotation.Configuration; @@ -9,6 +12,13 @@ import org.springframework.context.annotation.Configuration; * Databus 模块的 RPC 配置,开启所需的 Feign 客户端。 */ @Configuration(value = "databusRpcConfiguration", proxyBeanMethods = false) -@EnableFeignClients(clients = {AdminUserApi.class, OAuth2TokenCommonApi.class}) +@EnableFeignClients(clients = { + AdminUserApi.class, + OAuth2TokenCommonApi.class, + // DataBus 数据提供者 Feign 客户端 + DatabusDeptProviderApi.class, + DatabusUserProviderApi.class, + DatabusPostProviderApi.class +}) public class RpcConfiguration { } diff --git a/zt-module-databus/zt-module-databus-server/src/main/resources/application-dev.yml b/zt-module-databus/zt-module-databus-server/src/main/resources/application-dev.yml index 30abeecd..e33d10c4 100644 --- a/zt-module-databus/zt-module-databus-server/src/main/resources/application-dev.yml +++ b/zt-module-databus/zt-module-databus-server/src/main/resources/application-dev.yml @@ -75,9 +75,16 @@ management: # 日志文件配置 logging: file: - name: ${user.home}/logs/${spring.application.name}.log # 日志文件名,全路径 + name: D:/project/zhongtong/logs/${spring.application.name}.log # 日志文件名,全路径 +# RocketMQ 配置项 +rocketmq: + name-server: 172.16.240.64:9876 # RocketMQ Namesrv + producer: + group: databus-server-producer-group # 生产者组名 + send-message-timeout: 10000 # 发送消息超时时间,单位:毫秒 + justauth: enabled: true type: @@ -105,3 +112,23 @@ justauth: prefix: 'social_auth_state:' # 缓存前缀,目前只对 Redis 缓存生效,默认 JUSTAUTH::STATE:: timeout: 24h # 超时时长,目前只对 Redis 缓存生效,默认 3 分钟 +zt: + databus: + sync: + server: + enabled: true + clients: + - company-b # 配置订阅的客户端(与客户端的client-code一致) + mq: + enabled: true + name-server: 172.16.240.64:9876 # RocketMQ NameServer 地址 + topic-base: databus-sync + producer-group: databus-server-producer + send-msg-timeout: 10000 + retry: + max-attempts: 5 # 最大重试次数 + initial-delay: 1 # 初始重试延迟(秒) + multiplier: 2 # 重试延迟倍数 + batch: + default-size: 500 # 默认批量大小 + interval: 5 # 批量推送间隔(秒) \ No newline at end of file diff --git a/zt-module-databus/zt-module-databus-server/src/main/resources/application-local.yml b/zt-module-databus/zt-module-databus-server/src/main/resources/application-local.yml index c45ce834..04b5c003 100644 --- a/zt-module-databus/zt-module-databus-server/src/main/resources/application-local.yml +++ b/zt-module-databus/zt-module-databus-server/src/main/resources/application-local.yml @@ -86,6 +86,13 @@ mybatis-plus: log-impl: org.apache.ibatis.logging.stdout.StdOutImpl +# RocketMQ 配置项 +rocketmq: + name-server: 172.16.46.63:9876 # RocketMQ Namesrv + producer: + group: databus-server-producer-group # 生产者组名 + send-message-timeout: 10000 # 发送消息超时时间,单位:毫秒 + # ZT配置项,设置当前项目所有自定义的配置 zt: env: # 多环境的配置项 diff --git a/zt-module-databus/zt-module-databus-server/src/main/resources/application.yml b/zt-module-databus/zt-module-databus-server/src/main/resources/application.yml index c40ecc83..bcb4db90 100644 --- a/zt-module-databus/zt-module-databus-server/src/main/resources/application.yml +++ b/zt-module-databus/zt-module-databus-server/src/main/resources/application.yml @@ -50,7 +50,7 @@ spring: time-to-live: 1h # 设置过期时间为 1 小时 server: - port: 48100 + port: 48105 logging: file: @@ -130,6 +130,11 @@ zt: - /databus/api/portal/** ignore-tables: - databus_api_client_credential + # DataBus 数据同步服务端配置 + databus: + sync: + server: + enabled: true # 启用 DataBus 同步服务端 databus: gateway: diff --git a/zt-module-system/zt-module-system-server/src/main/java/com/zt/plat/module/system/mq/producer/databus/DatabusChangeProducer.java b/zt-module-system/zt-module-system-server/src/main/java/com/zt/plat/module/system/mq/producer/databus/DatabusChangeProducer.java index 76c5b624..b8d4a995 100644 --- a/zt-module-system/zt-module-system-server/src/main/java/com/zt/plat/module/system/mq/producer/databus/DatabusChangeProducer.java +++ b/zt-module-system/zt-module-system-server/src/main/java/com/zt/plat/module/system/mq/producer/databus/DatabusChangeProducer.java @@ -8,6 +8,7 @@ import com.zt.plat.module.system.dal.dataobject.dept.PostDO; import com.zt.plat.module.system.dal.dataobject.user.AdminUserDO; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.core.RocketMQTemplate; +import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import jakarta.annotation.Resource; @@ -16,6 +17,9 @@ import jakarta.annotation.Resource; * Databus 数据变更消息生产者 *

* 用于发送部门、用户、岗位变更消息到 MQ,供 databus-server 消费 + *

+ * 注意:客户端系统(分公司)应该禁用此功能,避免形成消息循环 + * 配置项:zt.databus.change.producer.enabled=false * * @author ZT */ @@ -26,6 +30,15 @@ public class DatabusChangeProducer { @Resource private RocketMQTemplate rocketMQTemplate; + /** + * 是否启用变更消息发送 + *

+ * 集团侧(数据源):设置为 true,发送变更消息 + * 分公司侧(客户端):设置为 false,禁用变更消息,避免循环 + */ + @Value("${zt.databus.change.producer.enabled:true}") + private boolean enabled; + // ==================== 部门变更消息 ==================== /** @@ -61,6 +74,11 @@ public class DatabusChangeProducer { } private void sendDeptChangeMessage(DatabusDeptChangeMessage message) { + if (!enabled) { + log.debug("[Databus] 变更消息发送已禁用, 跳过部门变更消息, action={}, deptId={}", + message.getAction(), message.getDeptId()); + return; + } try { rocketMQTemplate.asyncSend(DatabusDeptChangeMessage.TOPIC, message, new org.apache.rocketmq.client.producer.SendCallback() { @Override @@ -118,6 +136,11 @@ public class DatabusChangeProducer { } private void sendUserChangeMessage(DatabusUserChangeMessage message) { + if (!enabled) { + log.debug("[Databus] 变更消息发送已禁用, 跳过用户变更消息, action={}, userId={}", + message.getAction(), message.getUserId()); + return; + } try { rocketMQTemplate.asyncSend(DatabusUserChangeMessage.TOPIC, message, new org.apache.rocketmq.client.producer.SendCallback() { @Override @@ -171,6 +194,11 @@ public class DatabusChangeProducer { } private void sendPostChangeMessage(DatabusPostChangeMessage message) { + if (!enabled) { + log.debug("[Databus] 变更消息发送已禁用, 跳过岗位变更消息, action={}, postId={}", + message.getAction(), message.getPostId()); + return; + } try { rocketMQTemplate.asyncSend(DatabusPostChangeMessage.TOPIC, message, new org.apache.rocketmq.client.producer.SendCallback() { @Override diff --git a/zt-module-system/zt-module-system-server/src/main/resources/application-dev.yaml b/zt-module-system/zt-module-system-server/src/main/resources/application-dev.yaml index 0d7755ef..9d9f0f8b 100644 --- a/zt-module-system/zt-module-system-server/src/main/resources/application-dev.yaml +++ b/zt-module-system/zt-module-system-server/src/main/resources/application-dev.yaml @@ -187,4 +187,10 @@ seata: undo: logTable: undo_log dataValidation: true - logSerialization: jackson \ No newline at end of file + logSerialization: jackson +zt: + databus: + # 变更消息生产者配置 + change: + producer: + enabled: false