From 72fe903447b447f53a88ff375169173fa0c524f7 Mon Sep 17 00:00:00 2001
From: hewencai <2357300448@qq.com>
Date: Tue, 16 Dec 2025 14:34:13 +0800
Subject: [PATCH] =?UTF-8?q?update=EF=BC=9A=E6=95=B0=E6=8D=AE=E5=90=8C?=
=?UTF-8?q?=E6=AD=A5=E5=88=86=E5=8F=91=E6=9C=BA=E6=9E=84=E5=B2=97=E4=BD=8D?=
=?UTF-8?q?=E7=BB=91=E5=AE=9A=E5=85=B3=E7=B3=BB?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
.../core/sync/DatabusFullSyncService.java | 2 +
.../impl/DatabusFullSyncServiceImpl.java | 200 ++++++++++++++++++
2 files changed, 202 insertions(+)
diff --git a/zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/core/sync/DatabusFullSyncService.java b/zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/core/sync/DatabusFullSyncService.java
index 62d00b16..ee18651c 100644
--- a/zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/core/sync/DatabusFullSyncService.java
+++ b/zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/core/sync/DatabusFullSyncService.java
@@ -2,6 +2,8 @@ package com.zt.plat.framework.databus.server.core.sync;
import com.zt.plat.framework.databus.server.dal.dataobject.DatabusSyncFullTaskDO;
+import java.util.List;
+
/**
* Databus 全量同步服务接口
*
diff --git a/zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/service/impl/DatabusFullSyncServiceImpl.java b/zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/service/impl/DatabusFullSyncServiceImpl.java
index 3d13c7f8..d2f277fb 100644
--- a/zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/service/impl/DatabusFullSyncServiceImpl.java
+++ b/zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/service/impl/DatabusFullSyncServiceImpl.java
@@ -113,6 +113,13 @@ public class DatabusFullSyncServiceImpl implements DatabusFullSyncService {
DatabusSyncClientDO client = clientMapper.selectById(subscription.getClientId());
DatabusSyncEventDO event = eventMapper.selectById(subscription.getEventId());
+ // 检查是否为基础数据全量同步事件(组合事件)
+ if ("SYSTEM_BASE_DATA_FULL".equals(event.getEventType())) {
+ log.info("[Databus] 检测到基础数据全量同步事件,开始按依赖顺序执行子任务, taskId={}", taskId);
+ executeBaseDataFullSync(task, client);
+ return;
+ }
+
String providerType = event.getDataProviderMethod();
if (providerType == null) {
throw new RuntimeException("Event data provider type not configured");
@@ -325,4 +332,197 @@ public class DatabusFullSyncServiceImpl implements DatabusFullSyncService {
return fullTaskMapper.selectByTaskNo(taskNo);
}
+ /**
+ * 执行基础数据全量同步(组合事件)
+ *
+ * 按依赖顺序同步执行5个子任务:
+ * 1. 组织机构(SYSTEM_DEPT_FULL)
+ * 2. 岗位(SYSTEM_POST_FULL)
+ * 3. 用户(SYSTEM_USER_FULL)
+ * 4. 用户-部门关系(SYSTEM_USER_DEPT_FULL)
+ * 5. 用户-岗位关系(SYSTEM_USER_POST_FULL)
+ *
+ * @param parentTask 父任务(SYSTEM_BASE_DATA_FULL)
+ * @param client 客户端配置
+ */
+ private void executeBaseDataFullSync(DatabusSyncFullTaskDO parentTask, DatabusSyncClientDO client) {
+ // 按依赖顺序定义事件类型
+ String[] eventTypes = {
+ "SYSTEM_DEPT_FULL", // 1. 组织机构
+ "SYSTEM_POST_FULL", // 2. 岗位
+ "SYSTEM_USER_FULL", // 3. 用户
+ "SYSTEM_USER_DEPT_FULL", // 4. 用户-部门关系
+ "SYSTEM_USER_POST_FULL" // 5. 用户-岗位关系
+ };
+
+ int successTaskCount = 0; // 成功的任务数
+ int failTaskCount = 0; // 失败的任务数
+ StringBuilder errorMessages = new StringBuilder();
+
+ // 用于聚合所有子任务的进度
+ long totalDataCount = 0L; // 总数据量
+ long processedDataCount = 0L; // 已处理数据量
+ long successDataCount = 0L; // 成功数据量
+ long failDataCount = 0L; // 失败数据量
+ int totalBatchCount = 0; // 总批次数
+
+ List subTaskIds = new ArrayList<>();
+
+ try {
+ // 顺序执行每个子任务
+ for (int i = 0; i < eventTypes.length; i++) {
+ String eventType = eventTypes[i];
+ try {
+ log.info("[Databus] 开始执行子任务 {}/{}: eventType={}, parentTaskId={}",
+ i + 1, eventTypes.length, eventType, parentTask.getId());
+
+ // 查询事件定义
+ DatabusSyncEventDO event = eventMapper.selectByEventType(eventType);
+ if (event == null) {
+ log.warn("[Databus] 事件定义不存在,跳过: eventType={}", eventType);
+ failTaskCount++;
+ errorMessages.append(String.format("[%s: 事件定义不存在] ", eventType));
+ continue;
+ }
+ if (event.getEnabled() != 1) {
+ log.warn("[Databus] 事件未启用,跳过: eventType={}", eventType);
+ failTaskCount++;
+ errorMessages.append(String.format("[%s: 事件未启用] ", eventType));
+ continue;
+ }
+
+ // 查询订阅关系
+ DatabusSyncSubscriptionDO subscription = subscriptionMapper.selectByClientIdAndEventId(
+ client.getId(), event.getId());
+ if (subscription == null) {
+ log.warn("[Databus] 订阅关系不存在,跳过: clientCode={}, eventType={}",
+ client.getClientCode(), eventType);
+ failTaskCount++;
+ errorMessages.append(String.format("[%s: 订阅关系不存在] ", eventType));
+ continue;
+ }
+ if (subscription.getEnabled() != 1) {
+ log.warn("[Databus] 订阅关系未启用,跳过: clientCode={}, eventType={}",
+ client.getClientCode(), eventType);
+ failTaskCount++;
+ errorMessages.append(String.format("[%s: 订阅关系未启用] ", eventType));
+ continue;
+ }
+
+ // 创建子任务(同步执行,非异步)
+ String taskRemark = String.format("[基础数据全量同步-子任务 %d/%d] %s",
+ i + 1, eventTypes.length, event.getEventName());
+ Long subTaskId = createFullSyncTask(subscription.getId(), taskRemark);
+ subTaskIds.add(subTaskId);
+
+ // 同步执行子任务(注意:这里不用异步,确保按顺序执行)
+ DatabusSyncFullTaskDO subTask = fullTaskMapper.selectById(subTaskId);
+ executeSubTaskSync(subTask, subscription, client, event);
+
+ // 重新查询子任务获取最新统计数据
+ subTask = fullTaskMapper.selectById(subTaskId);
+
+ // 聚合子任务进度到父任务
+ totalDataCount += (subTask.getTotalCount() != null ? subTask.getTotalCount() : 0L);
+ processedDataCount += (subTask.getProcessedCount() != null ? subTask.getProcessedCount() : 0L);
+ successDataCount += (subTask.getSuccessCount() != null ? subTask.getSuccessCount() : 0L);
+ failDataCount += (subTask.getFailCount() != null ? subTask.getFailCount() : 0L);
+ totalBatchCount += (subTask.getTotalBatch() != null ? subTask.getTotalBatch() : 0);
+
+ if (FullTaskStatusEnum.isCompleted(subTask.getStatus())) {
+ successTaskCount++;
+ } else {
+ failTaskCount++;
+ }
+
+ // 实时更新父任务进度(每完成一个子任务就更新一次)
+ parentTask.setTotalCount(totalDataCount);
+ parentTask.setProcessedCount(processedDataCount);
+ parentTask.setSuccessCount(successDataCount);
+ parentTask.setFailCount(failDataCount);
+ parentTask.setTotalBatch(totalBatchCount);
+ parentTask.setCurrentBatch(i + 1); // 当前完成的子任务数
+ fullTaskMapper.updateById(parentTask);
+
+ log.info("[Databus] 子任务执行完成 {}/{}: eventType={}, subTaskId={}, status={}, " +
+ "数据统计[total={}, success={}, fail={}]",
+ i + 1, eventTypes.length, eventType, subTaskId, subTask.getStatus(),
+ subTask.getTotalCount(), subTask.getSuccessCount(), subTask.getFailCount());
+
+ } catch (Exception e) {
+ log.error("[Databus] 子任务执行失败 {}/{}: eventType={}", i + 1, eventTypes.length, eventType, e);
+ failTaskCount++;
+ errorMessages.append(String.format("[%s: %s] ", eventType, e.getMessage()));
+ // 继续执行下一个任务
+ }
+ }
+
+ // 最终更新父任务状态
+ parentTask.setStatus(failTaskCount == 0 ? FullTaskStatusEnum.COMPLETED.getStatus()
+ : FullTaskStatusEnum.FAILED.getStatus());
+ parentTask.setEndTime(LocalDateTime.now());
+ parentTask.setTotalCount(totalDataCount);
+ parentTask.setProcessedCount(processedDataCount);
+ parentTask.setSuccessCount(successDataCount);
+ parentTask.setFailCount(failDataCount);
+ parentTask.setTotalBatch(totalBatchCount);
+ parentTask.setCurrentBatch(eventTypes.length);
+ if (errorMessages.length() > 0) {
+ parentTask.setLastErrorMessage(errorMessages.toString());
+ }
+ fullTaskMapper.updateById(parentTask);
+
+ log.info("[Databus] 基础数据全量同步完成, parentTaskId={}, " +
+ "任务统计[成功={}/{}, 失败={}], " +
+ "数据统计[总量={}, 已处理={}, 成功={}, 失败={}]",
+ parentTask.getId(),
+ successTaskCount, eventTypes.length, failTaskCount,
+ totalDataCount, processedDataCount, successDataCount, failDataCount);
+
+ } catch (Exception e) {
+ log.error("[Databus] 基础数据全量同步异常, parentTaskId={}", parentTask.getId(), e);
+ parentTask.setStatus(FullTaskStatusEnum.FAILED.getStatus());
+ parentTask.setEndTime(LocalDateTime.now());
+ parentTask.setLastErrorMessage(e.getMessage());
+ fullTaskMapper.updateById(parentTask);
+ }
+ }
+
+ /**
+ * 同步执行子任务(非异步)
+ */
+ private void executeSubTaskSync(DatabusSyncFullTaskDO task,
+ DatabusSyncSubscriptionDO subscription,
+ DatabusSyncClientDO client,
+ DatabusSyncEventDO event) {
+ task.setStatus(FullTaskStatusEnum.RUNNING.getStatus());
+ task.setStartTime(LocalDateTime.now());
+ fullTaskMapper.updateById(task);
+
+ try {
+ String providerType = event.getDataProviderMethod();
+ if (providerType == null) {
+ throw new RuntimeException("Event data provider type not configured");
+ }
+
+ DataProvider> dataProvider = dataProviderRegistry.getProvider(providerType);
+ if (dataProvider == null) {
+ throw new RuntimeException("Data provider not found: " + providerType);
+ }
+
+ executeGenericFullSync(task, subscription, client, event, dataProvider);
+
+ task.setStatus(FullTaskStatusEnum.COMPLETED.getStatus());
+ task.setEndTime(LocalDateTime.now());
+ fullTaskMapper.updateById(task);
+
+ } catch (Exception e) {
+ task.setStatus(FullTaskStatusEnum.FAILED.getStatus());
+ task.setEndTime(LocalDateTime.now());
+ task.setLastErrorMessage(e.getMessage());
+ fullTaskMapper.updateById(task);
+ throw e;
+ }
+ }
+
}