update:数据同步分发机构岗位绑定关系
This commit is contained in:
@@ -2,6 +2,8 @@ package com.zt.plat.framework.databus.server.core.sync;
|
||||
|
||||
import com.zt.plat.framework.databus.server.dal.dataobject.DatabusSyncFullTaskDO;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Databus 全量同步服务接口
|
||||
*
|
||||
|
||||
@@ -113,6 +113,13 @@ public class DatabusFullSyncServiceImpl implements DatabusFullSyncService {
|
||||
DatabusSyncClientDO client = clientMapper.selectById(subscription.getClientId());
|
||||
DatabusSyncEventDO event = eventMapper.selectById(subscription.getEventId());
|
||||
|
||||
// 检查是否为基础数据全量同步事件(组合事件)
|
||||
if ("SYSTEM_BASE_DATA_FULL".equals(event.getEventType())) {
|
||||
log.info("[Databus] 检测到基础数据全量同步事件,开始按依赖顺序执行子任务, taskId={}", taskId);
|
||||
executeBaseDataFullSync(task, client);
|
||||
return;
|
||||
}
|
||||
|
||||
String providerType = event.getDataProviderMethod();
|
||||
if (providerType == null) {
|
||||
throw new RuntimeException("Event data provider type not configured");
|
||||
@@ -325,4 +332,197 @@ public class DatabusFullSyncServiceImpl implements DatabusFullSyncService {
|
||||
return fullTaskMapper.selectByTaskNo(taskNo);
|
||||
}
|
||||
|
||||
/**
|
||||
* 执行基础数据全量同步(组合事件)
|
||||
* <p>
|
||||
* 按依赖顺序同步执行5个子任务:
|
||||
* 1. 组织机构(SYSTEM_DEPT_FULL)
|
||||
* 2. 岗位(SYSTEM_POST_FULL)
|
||||
* 3. 用户(SYSTEM_USER_FULL)
|
||||
* 4. 用户-部门关系(SYSTEM_USER_DEPT_FULL)
|
||||
* 5. 用户-岗位关系(SYSTEM_USER_POST_FULL)
|
||||
*
|
||||
* @param parentTask 父任务(SYSTEM_BASE_DATA_FULL)
|
||||
* @param client 客户端配置
|
||||
*/
|
||||
private void executeBaseDataFullSync(DatabusSyncFullTaskDO parentTask, DatabusSyncClientDO client) {
|
||||
// 按依赖顺序定义事件类型
|
||||
String[] eventTypes = {
|
||||
"SYSTEM_DEPT_FULL", // 1. 组织机构
|
||||
"SYSTEM_POST_FULL", // 2. 岗位
|
||||
"SYSTEM_USER_FULL", // 3. 用户
|
||||
"SYSTEM_USER_DEPT_FULL", // 4. 用户-部门关系
|
||||
"SYSTEM_USER_POST_FULL" // 5. 用户-岗位关系
|
||||
};
|
||||
|
||||
int successTaskCount = 0; // 成功的任务数
|
||||
int failTaskCount = 0; // 失败的任务数
|
||||
StringBuilder errorMessages = new StringBuilder();
|
||||
|
||||
// 用于聚合所有子任务的进度
|
||||
long totalDataCount = 0L; // 总数据量
|
||||
long processedDataCount = 0L; // 已处理数据量
|
||||
long successDataCount = 0L; // 成功数据量
|
||||
long failDataCount = 0L; // 失败数据量
|
||||
int totalBatchCount = 0; // 总批次数
|
||||
|
||||
List<Long> subTaskIds = new ArrayList<>();
|
||||
|
||||
try {
|
||||
// 顺序执行每个子任务
|
||||
for (int i = 0; i < eventTypes.length; i++) {
|
||||
String eventType = eventTypes[i];
|
||||
try {
|
||||
log.info("[Databus] 开始执行子任务 {}/{}: eventType={}, parentTaskId={}",
|
||||
i + 1, eventTypes.length, eventType, parentTask.getId());
|
||||
|
||||
// 查询事件定义
|
||||
DatabusSyncEventDO event = eventMapper.selectByEventType(eventType);
|
||||
if (event == null) {
|
||||
log.warn("[Databus] 事件定义不存在,跳过: eventType={}", eventType);
|
||||
failTaskCount++;
|
||||
errorMessages.append(String.format("[%s: 事件定义不存在] ", eventType));
|
||||
continue;
|
||||
}
|
||||
if (event.getEnabled() != 1) {
|
||||
log.warn("[Databus] 事件未启用,跳过: eventType={}", eventType);
|
||||
failTaskCount++;
|
||||
errorMessages.append(String.format("[%s: 事件未启用] ", eventType));
|
||||
continue;
|
||||
}
|
||||
|
||||
// 查询订阅关系
|
||||
DatabusSyncSubscriptionDO subscription = subscriptionMapper.selectByClientIdAndEventId(
|
||||
client.getId(), event.getId());
|
||||
if (subscription == null) {
|
||||
log.warn("[Databus] 订阅关系不存在,跳过: clientCode={}, eventType={}",
|
||||
client.getClientCode(), eventType);
|
||||
failTaskCount++;
|
||||
errorMessages.append(String.format("[%s: 订阅关系不存在] ", eventType));
|
||||
continue;
|
||||
}
|
||||
if (subscription.getEnabled() != 1) {
|
||||
log.warn("[Databus] 订阅关系未启用,跳过: clientCode={}, eventType={}",
|
||||
client.getClientCode(), eventType);
|
||||
failTaskCount++;
|
||||
errorMessages.append(String.format("[%s: 订阅关系未启用] ", eventType));
|
||||
continue;
|
||||
}
|
||||
|
||||
// 创建子任务(同步执行,非异步)
|
||||
String taskRemark = String.format("[基础数据全量同步-子任务 %d/%d] %s",
|
||||
i + 1, eventTypes.length, event.getEventName());
|
||||
Long subTaskId = createFullSyncTask(subscription.getId(), taskRemark);
|
||||
subTaskIds.add(subTaskId);
|
||||
|
||||
// 同步执行子任务(注意:这里不用异步,确保按顺序执行)
|
||||
DatabusSyncFullTaskDO subTask = fullTaskMapper.selectById(subTaskId);
|
||||
executeSubTaskSync(subTask, subscription, client, event);
|
||||
|
||||
// 重新查询子任务获取最新统计数据
|
||||
subTask = fullTaskMapper.selectById(subTaskId);
|
||||
|
||||
// 聚合子任务进度到父任务
|
||||
totalDataCount += (subTask.getTotalCount() != null ? subTask.getTotalCount() : 0L);
|
||||
processedDataCount += (subTask.getProcessedCount() != null ? subTask.getProcessedCount() : 0L);
|
||||
successDataCount += (subTask.getSuccessCount() != null ? subTask.getSuccessCount() : 0L);
|
||||
failDataCount += (subTask.getFailCount() != null ? subTask.getFailCount() : 0L);
|
||||
totalBatchCount += (subTask.getTotalBatch() != null ? subTask.getTotalBatch() : 0);
|
||||
|
||||
if (FullTaskStatusEnum.isCompleted(subTask.getStatus())) {
|
||||
successTaskCount++;
|
||||
} else {
|
||||
failTaskCount++;
|
||||
}
|
||||
|
||||
// 实时更新父任务进度(每完成一个子任务就更新一次)
|
||||
parentTask.setTotalCount(totalDataCount);
|
||||
parentTask.setProcessedCount(processedDataCount);
|
||||
parentTask.setSuccessCount(successDataCount);
|
||||
parentTask.setFailCount(failDataCount);
|
||||
parentTask.setTotalBatch(totalBatchCount);
|
||||
parentTask.setCurrentBatch(i + 1); // 当前完成的子任务数
|
||||
fullTaskMapper.updateById(parentTask);
|
||||
|
||||
log.info("[Databus] 子任务执行完成 {}/{}: eventType={}, subTaskId={}, status={}, " +
|
||||
"数据统计[total={}, success={}, fail={}]",
|
||||
i + 1, eventTypes.length, eventType, subTaskId, subTask.getStatus(),
|
||||
subTask.getTotalCount(), subTask.getSuccessCount(), subTask.getFailCount());
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("[Databus] 子任务执行失败 {}/{}: eventType={}", i + 1, eventTypes.length, eventType, e);
|
||||
failTaskCount++;
|
||||
errorMessages.append(String.format("[%s: %s] ", eventType, e.getMessage()));
|
||||
// 继续执行下一个任务
|
||||
}
|
||||
}
|
||||
|
||||
// 最终更新父任务状态
|
||||
parentTask.setStatus(failTaskCount == 0 ? FullTaskStatusEnum.COMPLETED.getStatus()
|
||||
: FullTaskStatusEnum.FAILED.getStatus());
|
||||
parentTask.setEndTime(LocalDateTime.now());
|
||||
parentTask.setTotalCount(totalDataCount);
|
||||
parentTask.setProcessedCount(processedDataCount);
|
||||
parentTask.setSuccessCount(successDataCount);
|
||||
parentTask.setFailCount(failDataCount);
|
||||
parentTask.setTotalBatch(totalBatchCount);
|
||||
parentTask.setCurrentBatch(eventTypes.length);
|
||||
if (errorMessages.length() > 0) {
|
||||
parentTask.setLastErrorMessage(errorMessages.toString());
|
||||
}
|
||||
fullTaskMapper.updateById(parentTask);
|
||||
|
||||
log.info("[Databus] 基础数据全量同步完成, parentTaskId={}, " +
|
||||
"任务统计[成功={}/{}, 失败={}], " +
|
||||
"数据统计[总量={}, 已处理={}, 成功={}, 失败={}]",
|
||||
parentTask.getId(),
|
||||
successTaskCount, eventTypes.length, failTaskCount,
|
||||
totalDataCount, processedDataCount, successDataCount, failDataCount);
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("[Databus] 基础数据全量同步异常, parentTaskId={}", parentTask.getId(), e);
|
||||
parentTask.setStatus(FullTaskStatusEnum.FAILED.getStatus());
|
||||
parentTask.setEndTime(LocalDateTime.now());
|
||||
parentTask.setLastErrorMessage(e.getMessage());
|
||||
fullTaskMapper.updateById(parentTask);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 同步执行子任务(非异步)
|
||||
*/
|
||||
private void executeSubTaskSync(DatabusSyncFullTaskDO task,
|
||||
DatabusSyncSubscriptionDO subscription,
|
||||
DatabusSyncClientDO client,
|
||||
DatabusSyncEventDO event) {
|
||||
task.setStatus(FullTaskStatusEnum.RUNNING.getStatus());
|
||||
task.setStartTime(LocalDateTime.now());
|
||||
fullTaskMapper.updateById(task);
|
||||
|
||||
try {
|
||||
String providerType = event.getDataProviderMethod();
|
||||
if (providerType == null) {
|
||||
throw new RuntimeException("Event data provider type not configured");
|
||||
}
|
||||
|
||||
DataProvider<?> dataProvider = dataProviderRegistry.getProvider(providerType);
|
||||
if (dataProvider == null) {
|
||||
throw new RuntimeException("Data provider not found: " + providerType);
|
||||
}
|
||||
|
||||
executeGenericFullSync(task, subscription, client, event, dataProvider);
|
||||
|
||||
task.setStatus(FullTaskStatusEnum.COMPLETED.getStatus());
|
||||
task.setEndTime(LocalDateTime.now());
|
||||
fullTaskMapper.updateById(task);
|
||||
|
||||
} catch (Exception e) {
|
||||
task.setStatus(FullTaskStatusEnum.FAILED.getStatus());
|
||||
task.setEndTime(LocalDateTime.now());
|
||||
task.setLastErrorMessage(e.getMessage());
|
||||
fullTaskMapper.updateById(task);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user