update:调整数据同步用户-部门,用户-岗位同步逻辑

This commit is contained in:
hewencai
2025-12-24 10:36:00 +08:00
parent 1ac2f1e2cc
commit 9485d94574
28 changed files with 453 additions and 78 deletions

View File

@@ -13,7 +13,7 @@ import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
import com.zt.plat.framework.tenant.core.context.TenantContextHolder;
/**
* DataBus 客户端统一消费者
* <p>
@@ -33,8 +33,8 @@ import org.springframework.stereotype.Component;
@Component
@ConditionalOnProperty(prefix = "zt.databus.sync.client", name = "enabled", havingValue = "true")
@RocketMQMessageListener(
topic = "${zt.databus.sync.client.mq.topic:databus-sync}-${zt.databus.sync.client.client-code}",
consumerGroup = "${zt.databus.sync.client.mq.consumer-group:databus-client-consumer}-${zt.databus.sync.client.client-code}"
topic = "${zt.databus.sync.client.mq.topic-base:databus-sync}-${zt.databus.sync.client.client-code}",
consumerGroup = "${zt.databus.sync.client.mq.consumer-group-prefix:databus-client-consumer}-${zt.databus.sync.client.client-code}"
)
public class DatabusClientConsumer implements RocketMQListener<String> {
@@ -46,6 +46,7 @@ public class DatabusClientConsumer implements RocketMQListener<String> {
log.debug("[DatabusClient] 收到消息, body={}", body);
try {
TenantContextHolder.setTenantId(1L);
// 1. 解析消息获取 eventType
DatabusEventType eventType = parseEventType(body);
if (eventType == null) {

View File

@@ -69,21 +69,15 @@ public class DeptSyncServiceImpl implements DeptSyncService {
return;
}
DeptSaveReqDTO dto = buildDeptDTO(data);
// 使用专用同步接口,跳过业务校验,直接 upsert
try {
// 尝试获取,存在则更新,不存在则创建
var existing = deptApi.getDept(dto.getId());
if (existing.isSuccess() && existing.getData() != null) {
deptApi.updateDept(dto).checkError();
log.info("[DeptSync] 部门全量同步-更新成功, deptId={}", dto.getId());
} else {
deptApi.createDept(dto).checkError();
log.info("[DeptSync] 部门全量同步-创建成功, deptId={}", dto.getId());
}
deptApi.syncDept(dto).checkError();
log.info("[DeptSync] 部门全量同步成功, deptId={}, deptName={}", dto.getId(), dto.getName());
} catch (Exception e) {
// 获取失败,尝试创建
log.warn("[DeptSync] 部门获取失败,尝试创建, deptId={}", dto.getId());
deptApi.createDept(dto).checkError();
log.info("[DeptSync] 部门全量同步-创建成功, deptId={}", dto.getId());
log.error("[DeptSync] 部门全量同步失败, deptId={}, deptName={}, parentId={}, code={}, error={}",
dto.getId(), dto.getName(), dto.getParentId(), dto.getCode(), e.getMessage());
throw e;
}
}
@@ -93,13 +87,18 @@ public class DeptSyncServiceImpl implements DeptSyncService {
private DeptSaveReqDTO buildDeptDTO(DatabusDeptData data) {
DeptSaveReqDTO dto = new DeptSaveReqDTO();
dto.setId(data.getId());
dto.setCode(data.getCode()); // ⚠️ 重要:传递编码,保持一致
dto.setName(data.getName());
dto.setShortName(data.getShortName());
dto.setParentId(data.getParentId());
dto.setSort(data.getSort());
dto.setLeaderUserId(data.getLeaderUserId());
dto.setPhone(data.getPhone());
dto.setEmail(data.getEmail());
dto.setStatus(data.getStatus());
dto.setIsGroup(data.getIsGroup());
dto.setIsCompany(data.getIsCompany());
dto.setDeptSource(data.getDeptSource());
return dto;
}
}

View File

@@ -68,25 +68,14 @@ public class PostSyncServiceImpl implements PostSyncService {
return;
}
PostSaveReqDTO dto = buildPostDTO(data);
try {
// 尝试获取,存在则更新,不存在则创建
var existing = postApi.getPost(dto.getId());
if (existing.isSuccess() && existing.getData() != null) {
postApi.updatePost(dto).checkError();
log.info("[PostSync] 岗位全量同步-更新成功, postId={}, postName={}", dto.getId(), dto.getName());
} else {
postApi.createPost(dto).checkError();
log.info("[PostSync] 岗位全量同步-创建成功, postId={}, postName={}", dto.getId(), dto.getName());
}
postApi.syncPost(dto).checkError();
log.info("[PostSync] 岗位全量同步成功, postId={}, postName={}", dto.getId(), dto.getName());
} catch (Exception e) {
// 获取失败,尝试创建
try {
postApi.createPost(dto).checkError();
log.info("[PostSync] 岗位全量同步-创建成功, postId={}, postName={}", dto.getId(), dto.getName());
} catch (Exception createEx) {
log.error("[PostSync] 岗位全量同步失败, postId={}, postName={}", dto.getId(), dto.getName(), createEx);
throw createEx;
}
log.error("[PostSync] 岗位全量同步失败, postId={}, postName={}, code={}, error={}",
dto.getId(), dto.getName(), dto.getCode(), e.getMessage());
throw e;
}
}

View File

@@ -72,21 +72,14 @@ public class AdminUserSyncServiceImpl implements AdminUserSyncService {
return;
}
AdminUserSaveReqDTO dto = buildUserDTO(data);
try {
// 尝试获取,存在则更新,不存在则创建
var existing = adminUserApi.getUser(dto.getId());
if (existing.isSuccess() && existing.getData() != null) {
adminUserApi.updateUser(dto).checkError();
log.info("[UserSync] 用户全量同步-更新成功, userId={}", dto.getId());
} else {
adminUserApi.createUser(dto).checkError();
log.info("[UserSync] 用户全量同步-创建成功, userId={}", dto.getId());
}
adminUserApi.syncUser(dto).checkError();
log.info("[UserSync] 用户全量同步成功, userId={}, username={}", dto.getId(), dto.getUsername());
} catch (Exception e) {
// 获取失败,尝试创建
log.warn("[UserSync] 用户获取失败,尝试创建, userId={}", dto.getId());
adminUserApi.createUser(dto).checkError();
log.info("[UserSync] 用户全量同步-创建成功, userId={}", dto.getId());
log.error("[UserSync] 用户全量同步失败, userId={}, username={}, error={}",
dto.getId(), dto.getUsername(), e.getMessage());
throw e;
}
}

View File

@@ -70,20 +70,13 @@ public class UserDeptSyncServiceImpl implements UserDeptSyncService {
}
UserDeptSaveReqDTO dto = buildUserDeptDTO(data);
try {
// 尝试获取,存在则更新,不存在则创建
var existing = userDeptApi.getUserDept(dto.getId());
if (existing.isSuccess() && existing.getData() != null) {
userDeptApi.updateUserDept(dto).checkError();
log.info("[UserDeptSync] 用户-部门关系全量同步-更新成功, id={}", dto.getId());
} else {
userDeptApi.createUserDept(dto).checkError();
log.info("[UserDeptSync] 用户-部门关系全量同步-创建成功, id={}", dto.getId());
}
userDeptApi.syncUserDept(dto).checkError();
log.info("[UserDeptSync] 用户-部门关系全量同步成功, id={}, userId={}, deptId={}",
dto.getId(), dto.getUserId(), dto.getDeptId());
} catch (Exception e) {
// 获取失败,尝试创建
log.warn("[UserDeptSync] 用户-部门关系获取失败,尝试创建, id={}", dto.getId());
userDeptApi.createUserDept(dto).checkError();
log.info("[UserDeptSync] 用户-部门关系全量同步-创建成功, id={}", dto.getId());
log.error("[UserDeptSync] 用户-部门关系全量同步失败, id={}, userId={}, deptId={}, error={}",
dto.getId(), dto.getUserId(), dto.getDeptId(), e.getMessage());
throw e;
}
}

View File

@@ -70,20 +70,13 @@ public class UserPostSyncServiceImpl implements UserPostSyncService {
}
UserPostSaveReqDTO dto = buildUserPostDTO(data);
try {
// 尝试获取,存在则更新,不存在则创建
var existing = userPostApi.getUserPost(dto.getId());
if (existing.isSuccess() && existing.getData() != null) {
userPostApi.updateUserPost(dto).checkError();
log.info("[UserPostSync] 用户-岗位关系全量同步-更新成功, id={}", dto.getId());
} else {
userPostApi.createUserPost(dto).checkError();
log.info("[UserPostSync] 用户-岗位关系全量同步-创建成功, id={}", dto.getId());
}
userPostApi.syncUserPost(dto).checkError();
log.info("[UserPostSync] 用户-岗位关系全量同步成功, id={}, userId={}, postId={}",
dto.getId(), dto.getUserId(), dto.getPostId());
} catch (Exception e) {
// 获取失败,尝试创建
log.warn("[UserPostSync] 用户-岗位关系获取失败,尝试创建, id={}", dto.getId());
userPostApi.createUserPost(dto).checkError();
log.info("[UserPostSync] 用户-岗位关系全量同步-创建成功, id={}", dto.getId());
log.error("[UserPostSync] 用户-岗位关系全量同步失败, id={}, userId={}, postId={}, error={}",
dto.getId(), dto.getUserId(), dto.getPostId(), e.getMessage());
throw e;
}
}