Merge branch 'dev' into 'test'

update:数据同步分发机构岗位绑定关系

See merge request jygk/dsc!10
This commit is contained in:
wencai he
2025-12-16 06:34:34 +00:00
2 changed files with 202 additions and 0 deletions

View File

@@ -2,6 +2,8 @@ package com.zt.plat.framework.databus.server.core.sync;
import com.zt.plat.framework.databus.server.dal.dataobject.DatabusSyncFullTaskDO;
import java.util.List;
/**
* Databus 全量同步服务接口
*

View File

@@ -113,6 +113,13 @@ public class DatabusFullSyncServiceImpl implements DatabusFullSyncService {
DatabusSyncClientDO client = clientMapper.selectById(subscription.getClientId());
DatabusSyncEventDO event = eventMapper.selectById(subscription.getEventId());
// 检查是否为基础数据全量同步事件(组合事件)
if ("SYSTEM_BASE_DATA_FULL".equals(event.getEventType())) {
log.info("[Databus] 检测到基础数据全量同步事件,开始按依赖顺序执行子任务, taskId={}", taskId);
executeBaseDataFullSync(task, client);
return;
}
String providerType = event.getDataProviderMethod();
if (providerType == null) {
throw new RuntimeException("Event data provider type not configured");
@@ -325,4 +332,197 @@ public class DatabusFullSyncServiceImpl implements DatabusFullSyncService {
return fullTaskMapper.selectByTaskNo(taskNo);
}
/**
* 执行基础数据全量同步(组合事件)
* <p>
* 按依赖顺序同步执行5个子任务
* 1. 组织机构SYSTEM_DEPT_FULL
* 2. 岗位SYSTEM_POST_FULL
* 3. 用户SYSTEM_USER_FULL
* 4. 用户-部门关系SYSTEM_USER_DEPT_FULL
* 5. 用户-岗位关系SYSTEM_USER_POST_FULL
*
* @param parentTask 父任务SYSTEM_BASE_DATA_FULL
* @param client 客户端配置
*/
private void executeBaseDataFullSync(DatabusSyncFullTaskDO parentTask, DatabusSyncClientDO client) {
// 按依赖顺序定义事件类型
String[] eventTypes = {
"SYSTEM_DEPT_FULL", // 1. 组织机构
"SYSTEM_POST_FULL", // 2. 岗位
"SYSTEM_USER_FULL", // 3. 用户
"SYSTEM_USER_DEPT_FULL", // 4. 用户-部门关系
"SYSTEM_USER_POST_FULL" // 5. 用户-岗位关系
};
int successTaskCount = 0; // 成功的任务数
int failTaskCount = 0; // 失败的任务数
StringBuilder errorMessages = new StringBuilder();
// 用于聚合所有子任务的进度
long totalDataCount = 0L; // 总数据量
long processedDataCount = 0L; // 已处理数据量
long successDataCount = 0L; // 成功数据量
long failDataCount = 0L; // 失败数据量
int totalBatchCount = 0; // 总批次数
List<Long> subTaskIds = new ArrayList<>();
try {
// 顺序执行每个子任务
for (int i = 0; i < eventTypes.length; i++) {
String eventType = eventTypes[i];
try {
log.info("[Databus] 开始执行子任务 {}/{}: eventType={}, parentTaskId={}",
i + 1, eventTypes.length, eventType, parentTask.getId());
// 查询事件定义
DatabusSyncEventDO event = eventMapper.selectByEventType(eventType);
if (event == null) {
log.warn("[Databus] 事件定义不存在,跳过: eventType={}", eventType);
failTaskCount++;
errorMessages.append(String.format("[%s: 事件定义不存在] ", eventType));
continue;
}
if (event.getEnabled() != 1) {
log.warn("[Databus] 事件未启用,跳过: eventType={}", eventType);
failTaskCount++;
errorMessages.append(String.format("[%s: 事件未启用] ", eventType));
continue;
}
// 查询订阅关系
DatabusSyncSubscriptionDO subscription = subscriptionMapper.selectByClientIdAndEventId(
client.getId(), event.getId());
if (subscription == null) {
log.warn("[Databus] 订阅关系不存在,跳过: clientCode={}, eventType={}",
client.getClientCode(), eventType);
failTaskCount++;
errorMessages.append(String.format("[%s: 订阅关系不存在] ", eventType));
continue;
}
if (subscription.getEnabled() != 1) {
log.warn("[Databus] 订阅关系未启用,跳过: clientCode={}, eventType={}",
client.getClientCode(), eventType);
failTaskCount++;
errorMessages.append(String.format("[%s: 订阅关系未启用] ", eventType));
continue;
}
// 创建子任务(同步执行,非异步)
String taskRemark = String.format("[基础数据全量同步-子任务 %d/%d] %s",
i + 1, eventTypes.length, event.getEventName());
Long subTaskId = createFullSyncTask(subscription.getId(), taskRemark);
subTaskIds.add(subTaskId);
// 同步执行子任务(注意:这里不用异步,确保按顺序执行)
DatabusSyncFullTaskDO subTask = fullTaskMapper.selectById(subTaskId);
executeSubTaskSync(subTask, subscription, client, event);
// 重新查询子任务获取最新统计数据
subTask = fullTaskMapper.selectById(subTaskId);
// 聚合子任务进度到父任务
totalDataCount += (subTask.getTotalCount() != null ? subTask.getTotalCount() : 0L);
processedDataCount += (subTask.getProcessedCount() != null ? subTask.getProcessedCount() : 0L);
successDataCount += (subTask.getSuccessCount() != null ? subTask.getSuccessCount() : 0L);
failDataCount += (subTask.getFailCount() != null ? subTask.getFailCount() : 0L);
totalBatchCount += (subTask.getTotalBatch() != null ? subTask.getTotalBatch() : 0);
if (FullTaskStatusEnum.isCompleted(subTask.getStatus())) {
successTaskCount++;
} else {
failTaskCount++;
}
// 实时更新父任务进度(每完成一个子任务就更新一次)
parentTask.setTotalCount(totalDataCount);
parentTask.setProcessedCount(processedDataCount);
parentTask.setSuccessCount(successDataCount);
parentTask.setFailCount(failDataCount);
parentTask.setTotalBatch(totalBatchCount);
parentTask.setCurrentBatch(i + 1); // 当前完成的子任务数
fullTaskMapper.updateById(parentTask);
log.info("[Databus] 子任务执行完成 {}/{}: eventType={}, subTaskId={}, status={}, " +
"数据统计[total={}, success={}, fail={}]",
i + 1, eventTypes.length, eventType, subTaskId, subTask.getStatus(),
subTask.getTotalCount(), subTask.getSuccessCount(), subTask.getFailCount());
} catch (Exception e) {
log.error("[Databus] 子任务执行失败 {}/{}: eventType={}", i + 1, eventTypes.length, eventType, e);
failTaskCount++;
errorMessages.append(String.format("[%s: %s] ", eventType, e.getMessage()));
// 继续执行下一个任务
}
}
// 最终更新父任务状态
parentTask.setStatus(failTaskCount == 0 ? FullTaskStatusEnum.COMPLETED.getStatus()
: FullTaskStatusEnum.FAILED.getStatus());
parentTask.setEndTime(LocalDateTime.now());
parentTask.setTotalCount(totalDataCount);
parentTask.setProcessedCount(processedDataCount);
parentTask.setSuccessCount(successDataCount);
parentTask.setFailCount(failDataCount);
parentTask.setTotalBatch(totalBatchCount);
parentTask.setCurrentBatch(eventTypes.length);
if (errorMessages.length() > 0) {
parentTask.setLastErrorMessage(errorMessages.toString());
}
fullTaskMapper.updateById(parentTask);
log.info("[Databus] 基础数据全量同步完成, parentTaskId={}, " +
"任务统计[成功={}/{}, 失败={}], " +
"数据统计[总量={}, 已处理={}, 成功={}, 失败={}]",
parentTask.getId(),
successTaskCount, eventTypes.length, failTaskCount,
totalDataCount, processedDataCount, successDataCount, failDataCount);
} catch (Exception e) {
log.error("[Databus] 基础数据全量同步异常, parentTaskId={}", parentTask.getId(), e);
parentTask.setStatus(FullTaskStatusEnum.FAILED.getStatus());
parentTask.setEndTime(LocalDateTime.now());
parentTask.setLastErrorMessage(e.getMessage());
fullTaskMapper.updateById(parentTask);
}
}
/**
* 同步执行子任务(非异步)
*/
private void executeSubTaskSync(DatabusSyncFullTaskDO task,
DatabusSyncSubscriptionDO subscription,
DatabusSyncClientDO client,
DatabusSyncEventDO event) {
task.setStatus(FullTaskStatusEnum.RUNNING.getStatus());
task.setStartTime(LocalDateTime.now());
fullTaskMapper.updateById(task);
try {
String providerType = event.getDataProviderMethod();
if (providerType == null) {
throw new RuntimeException("Event data provider type not configured");
}
DataProvider<?> dataProvider = dataProviderRegistry.getProvider(providerType);
if (dataProvider == null) {
throw new RuntimeException("Data provider not found: " + providerType);
}
executeGenericFullSync(task, subscription, client, event, dataProvider);
task.setStatus(FullTaskStatusEnum.COMPLETED.getStatus());
task.setEndTime(LocalDateTime.now());
fullTaskMapper.updateById(task);
} catch (Exception e) {
task.setStatus(FullTaskStatusEnum.FAILED.getStatus());
task.setEndTime(LocalDateTime.now());
task.setLastErrorMessage(e.getMessage());
fullTaskMapper.updateById(task);
throw e;
}
}
}