diff --git a/zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/core/sync/DatabusFullSyncService.java b/zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/core/sync/DatabusFullSyncService.java index 62d00b16..ee18651c 100644 --- a/zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/core/sync/DatabusFullSyncService.java +++ b/zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/core/sync/DatabusFullSyncService.java @@ -2,6 +2,8 @@ package com.zt.plat.framework.databus.server.core.sync; import com.zt.plat.framework.databus.server.dal.dataobject.DatabusSyncFullTaskDO; +import java.util.List; + /** * Databus 全量同步服务接口 * diff --git a/zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/service/impl/DatabusFullSyncServiceImpl.java b/zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/service/impl/DatabusFullSyncServiceImpl.java index 3d13c7f8..d2f277fb 100644 --- a/zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/service/impl/DatabusFullSyncServiceImpl.java +++ b/zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/service/impl/DatabusFullSyncServiceImpl.java @@ -113,6 +113,13 @@ public class DatabusFullSyncServiceImpl implements DatabusFullSyncService { DatabusSyncClientDO client = clientMapper.selectById(subscription.getClientId()); DatabusSyncEventDO event = eventMapper.selectById(subscription.getEventId()); + // 检查是否为基础数据全量同步事件(组合事件) + if ("SYSTEM_BASE_DATA_FULL".equals(event.getEventType())) { + log.info("[Databus] 检测到基础数据全量同步事件,开始按依赖顺序执行子任务, taskId={}", taskId); + executeBaseDataFullSync(task, client); + return; + } + String providerType = event.getDataProviderMethod(); if (providerType == null) { throw new RuntimeException("Event data provider type not configured"); @@ -325,4 +332,197 @@ public class DatabusFullSyncServiceImpl implements DatabusFullSyncService { return fullTaskMapper.selectByTaskNo(taskNo); } + /** + * 执行基础数据全量同步(组合事件) + *

+ * 按依赖顺序同步执行5个子任务: + * 1. 组织机构(SYSTEM_DEPT_FULL) + * 2. 岗位(SYSTEM_POST_FULL) + * 3. 用户(SYSTEM_USER_FULL) + * 4. 用户-部门关系(SYSTEM_USER_DEPT_FULL) + * 5. 用户-岗位关系(SYSTEM_USER_POST_FULL) + * + * @param parentTask 父任务(SYSTEM_BASE_DATA_FULL) + * @param client 客户端配置 + */ + private void executeBaseDataFullSync(DatabusSyncFullTaskDO parentTask, DatabusSyncClientDO client) { + // 按依赖顺序定义事件类型 + String[] eventTypes = { + "SYSTEM_DEPT_FULL", // 1. 组织机构 + "SYSTEM_POST_FULL", // 2. 岗位 + "SYSTEM_USER_FULL", // 3. 用户 + "SYSTEM_USER_DEPT_FULL", // 4. 用户-部门关系 + "SYSTEM_USER_POST_FULL" // 5. 用户-岗位关系 + }; + + int successTaskCount = 0; // 成功的任务数 + int failTaskCount = 0; // 失败的任务数 + StringBuilder errorMessages = new StringBuilder(); + + // 用于聚合所有子任务的进度 + long totalDataCount = 0L; // 总数据量 + long processedDataCount = 0L; // 已处理数据量 + long successDataCount = 0L; // 成功数据量 + long failDataCount = 0L; // 失败数据量 + int totalBatchCount = 0; // 总批次数 + + List subTaskIds = new ArrayList<>(); + + try { + // 顺序执行每个子任务 + for (int i = 0; i < eventTypes.length; i++) { + String eventType = eventTypes[i]; + try { + log.info("[Databus] 开始执行子任务 {}/{}: eventType={}, parentTaskId={}", + i + 1, eventTypes.length, eventType, parentTask.getId()); + + // 查询事件定义 + DatabusSyncEventDO event = eventMapper.selectByEventType(eventType); + if (event == null) { + log.warn("[Databus] 事件定义不存在,跳过: eventType={}", eventType); + failTaskCount++; + errorMessages.append(String.format("[%s: 事件定义不存在] ", eventType)); + continue; + } + if (event.getEnabled() != 1) { + log.warn("[Databus] 事件未启用,跳过: eventType={}", eventType); + failTaskCount++; + errorMessages.append(String.format("[%s: 事件未启用] ", eventType)); + continue; + } + + // 查询订阅关系 + DatabusSyncSubscriptionDO subscription = subscriptionMapper.selectByClientIdAndEventId( + client.getId(), event.getId()); + if (subscription == null) { + log.warn("[Databus] 订阅关系不存在,跳过: clientCode={}, eventType={}", + client.getClientCode(), eventType); + failTaskCount++; + errorMessages.append(String.format("[%s: 订阅关系不存在] ", eventType)); + continue; + } + if (subscription.getEnabled() != 1) { + log.warn("[Databus] 订阅关系未启用,跳过: clientCode={}, eventType={}", + client.getClientCode(), eventType); + failTaskCount++; + errorMessages.append(String.format("[%s: 订阅关系未启用] ", eventType)); + continue; + } + + // 创建子任务(同步执行,非异步) + String taskRemark = String.format("[基础数据全量同步-子任务 %d/%d] %s", + i + 1, eventTypes.length, event.getEventName()); + Long subTaskId = createFullSyncTask(subscription.getId(), taskRemark); + subTaskIds.add(subTaskId); + + // 同步执行子任务(注意:这里不用异步,确保按顺序执行) + DatabusSyncFullTaskDO subTask = fullTaskMapper.selectById(subTaskId); + executeSubTaskSync(subTask, subscription, client, event); + + // 重新查询子任务获取最新统计数据 + subTask = fullTaskMapper.selectById(subTaskId); + + // 聚合子任务进度到父任务 + totalDataCount += (subTask.getTotalCount() != null ? subTask.getTotalCount() : 0L); + processedDataCount += (subTask.getProcessedCount() != null ? subTask.getProcessedCount() : 0L); + successDataCount += (subTask.getSuccessCount() != null ? subTask.getSuccessCount() : 0L); + failDataCount += (subTask.getFailCount() != null ? subTask.getFailCount() : 0L); + totalBatchCount += (subTask.getTotalBatch() != null ? subTask.getTotalBatch() : 0); + + if (FullTaskStatusEnum.isCompleted(subTask.getStatus())) { + successTaskCount++; + } else { + failTaskCount++; + } + + // 实时更新父任务进度(每完成一个子任务就更新一次) + parentTask.setTotalCount(totalDataCount); + parentTask.setProcessedCount(processedDataCount); + parentTask.setSuccessCount(successDataCount); + parentTask.setFailCount(failDataCount); + parentTask.setTotalBatch(totalBatchCount); + parentTask.setCurrentBatch(i + 1); // 当前完成的子任务数 + fullTaskMapper.updateById(parentTask); + + log.info("[Databus] 子任务执行完成 {}/{}: eventType={}, subTaskId={}, status={}, " + + "数据统计[total={}, success={}, fail={}]", + i + 1, eventTypes.length, eventType, subTaskId, subTask.getStatus(), + subTask.getTotalCount(), subTask.getSuccessCount(), subTask.getFailCount()); + + } catch (Exception e) { + log.error("[Databus] 子任务执行失败 {}/{}: eventType={}", i + 1, eventTypes.length, eventType, e); + failTaskCount++; + errorMessages.append(String.format("[%s: %s] ", eventType, e.getMessage())); + // 继续执行下一个任务 + } + } + + // 最终更新父任务状态 + parentTask.setStatus(failTaskCount == 0 ? FullTaskStatusEnum.COMPLETED.getStatus() + : FullTaskStatusEnum.FAILED.getStatus()); + parentTask.setEndTime(LocalDateTime.now()); + parentTask.setTotalCount(totalDataCount); + parentTask.setProcessedCount(processedDataCount); + parentTask.setSuccessCount(successDataCount); + parentTask.setFailCount(failDataCount); + parentTask.setTotalBatch(totalBatchCount); + parentTask.setCurrentBatch(eventTypes.length); + if (errorMessages.length() > 0) { + parentTask.setLastErrorMessage(errorMessages.toString()); + } + fullTaskMapper.updateById(parentTask); + + log.info("[Databus] 基础数据全量同步完成, parentTaskId={}, " + + "任务统计[成功={}/{}, 失败={}], " + + "数据统计[总量={}, 已处理={}, 成功={}, 失败={}]", + parentTask.getId(), + successTaskCount, eventTypes.length, failTaskCount, + totalDataCount, processedDataCount, successDataCount, failDataCount); + + } catch (Exception e) { + log.error("[Databus] 基础数据全量同步异常, parentTaskId={}", parentTask.getId(), e); + parentTask.setStatus(FullTaskStatusEnum.FAILED.getStatus()); + parentTask.setEndTime(LocalDateTime.now()); + parentTask.setLastErrorMessage(e.getMessage()); + fullTaskMapper.updateById(parentTask); + } + } + + /** + * 同步执行子任务(非异步) + */ + private void executeSubTaskSync(DatabusSyncFullTaskDO task, + DatabusSyncSubscriptionDO subscription, + DatabusSyncClientDO client, + DatabusSyncEventDO event) { + task.setStatus(FullTaskStatusEnum.RUNNING.getStatus()); + task.setStartTime(LocalDateTime.now()); + fullTaskMapper.updateById(task); + + try { + String providerType = event.getDataProviderMethod(); + if (providerType == null) { + throw new RuntimeException("Event data provider type not configured"); + } + + DataProvider dataProvider = dataProviderRegistry.getProvider(providerType); + if (dataProvider == null) { + throw new RuntimeException("Data provider not found: " + providerType); + } + + executeGenericFullSync(task, subscription, client, event, dataProvider); + + task.setStatus(FullTaskStatusEnum.COMPLETED.getStatus()); + task.setEndTime(LocalDateTime.now()); + fullTaskMapper.updateById(task); + + } catch (Exception e) { + task.setStatus(FullTaskStatusEnum.FAILED.getStatus()); + task.setEndTime(LocalDateTime.now()); + task.setLastErrorMessage(e.getMessage()); + fullTaskMapper.updateById(task); + throw e; + } + } + }