feat(databus-client): 完成岗位 Handler 及编译验证(任务 75-88)

新增文件(任务 75-88):
- PostSyncService.java - 岗位同步服务接口
- PostSyncServiceImpl.java - 岗位同步服务实现(Feign调用)
- SystemPostCreateHandler.java - 岗位创建事件处理器
- SystemPostUpdateHandler.java - 岗位更新事件处理器
- SystemPostDeleteHandler.java - 岗位删除事件处理器
- SystemPostFullHandler.java - 岗位全量同步处理器(批量)

修复问题:
1. 修复 DTO 导入:DeptSaveReqVO → DeptSaveReqDTO, PostSaveReqVO → PostSaveReqDTO
2. 修复注解:@Resource(required=false) → @Autowired(required=false)
3. 修复 PostApi 包路径:com.zt.plat.module.system.api.post → com.zt.plat.module.system.api.dept
4. 修复 DeptSaveReqDTO 字段映射(移除不存在的字段:code, shortName, isCompany, isGroup, deptSource)
5. 修复 AdminUserSaveReqDTO 字段映射:
   - deptIds: List<Long> → Set<Long>
   - postIds: List<Long> → Set<Long>

编译结果: BUILD SUCCESS(28个源文件)

Ref: docs/databus/implementation-checklist.md 任务 75-88
This commit is contained in:
hewencai
2025-12-02 01:07:30 +08:00
parent 94ab320fa8
commit 8329f9c834
18 changed files with 1031 additions and 111 deletions

View File

@@ -0,0 +1,44 @@
package com.zt.plat.framework.databus.client.handler.dept;
import com.zt.plat.module.databus.api.data.DatabusDeptData;
/**
* 部门同步服务接口
* <p>
* 分公司需要实现此接口,完成数据的本地持久化
* 或通过默认实现 {@link DeptSyncServiceImpl} 使用 Feign 调用远程 API
*
* @author ZT
*/
public interface DeptSyncService {
/**
* 创建部门(增量同步)
*
* @param data 部门数据
*/
void create(DatabusDeptData data);
/**
* 更<><E69BB4>部门增量同步
*
* @param data 部门数据
*/
void update(DatabusDeptData data);
/**
* 删除部门(增量同步)
*
* @param id 部门ID
*/
void delete(Long id);
/**
* 全量同步单条数据
* <p>
* 逻辑:存在则更新,不存在则插入
*
* @param data 部门数据
*/
void fullSync(DatabusDeptData data);
}

View File

@@ -0,0 +1,105 @@
package com.zt.plat.framework.databus.client.handler.dept;
import com.zt.plat.module.databus.api.data.DatabusDeptData;
import com.zt.plat.module.system.api.dept.DeptApi;
import com.zt.plat.module.system.api.dept.dto.DeptSaveReqDTO;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Service;
/**
* 部门同步服务实现(通过 Feign API 调用远程服务)
* <p>
* 使用条件:
* 1. zt.databus.sync.client.enabled=true
* 2. 系统中存在 DeptApi 接口Feign 客户端)
* <p>
* 如果分公司需要自定义实现,可以创建自己的 DeptSyncService Bean
* 此默认实现会自动失效(@ConditionalOnMissingBean
*
* @author ZT
*/
@Slf4j
@Service
@ConditionalOnProperty(prefix = "zt.databus.sync.client", name = "enabled", havingValue = "true")
@ConditionalOnClass(name = "com.zt.plat.module.system.api.dept.DeptApi")
public class DeptSyncServiceImpl implements DeptSyncService {
@Autowired(required = false)
private DeptApi deptApi; // Feign 远程调用接口
@Override
public void create(DatabusDeptData data) {
if (deptApi == null) {
log.warn("[DeptSync] DeptApi未注入跳过创建部门操作, deptId={}", data.getId());
return;
}
DeptSaveReqDTO dto = buildDeptDTO(data);
deptApi.createDept(dto).checkError();
log.info("[DeptSync] 部门创建成功, deptId={}, deptName={}", dto.getId(), dto.getName());
}
@Override
public void update(DatabusDeptData data) {
if (deptApi == null) {
log.warn("[DeptSync] DeptApi未注入跳过更新部门操作, deptId={}", data.getId());
return;
}
DeptSaveReqDTO dto = buildDeptDTO(data);
deptApi.updateDept(dto).checkError();
log.info("[DeptSync] 部门更新成功, deptId={}, deptName={}", dto.getId(), dto.getName());
}
@Override
public void delete(Long id) {
if (deptApi == null) {
log.warn("[DeptSync] DeptApi未注入跳过删除部门操作, deptId={}", id);
return;
}
deptApi.deleteDept(id).checkError();
log.info("[DeptSync] 部门删除成功, deptId={}", id);
}
@Override
public void fullSync(DatabusDeptData data) {
if (deptApi == null) {
log.warn("[DeptSync] DeptApi未注入跳过全量同步部门操作, deptId={}", data.getId());
return;
}
DeptSaveReqDTO dto = buildDeptDTO(data);
try {
// 尝试获取,存在则更新,不存在则创建
var existing = deptApi.getDept(dto.getId());
if (existing.isSuccess() && existing.getData() != null) {
deptApi.updateDept(dto).checkError();
log.info("[DeptSync] 部门全量同步-更新成功, deptId={}", dto.getId());
} else {
deptApi.createDept(dto).checkError();
log.info("[DeptSync] 部门全量同步-创建成功, deptId={}", dto.getId());
}
} catch (Exception e) {
// 获取失败,尝试创建
log.warn("[DeptSync] 部门获取失败,尝试创建, deptId={}", dto.getId());
deptApi.createDept(dto).checkError();
log.info("[DeptSync] 部门全量同步-创建成功, deptId={}", dto.getId());
}
}
/**
* 构建部门 DTO用于 Feign 调用)
*/
private DeptSaveReqDTO buildDeptDTO(DatabusDeptData data) {
DeptSaveReqDTO dto = new DeptSaveReqDTO();
dto.setId(data.getId());
dto.setName(data.getName());
dto.setParentId(data.getParentId());
dto.setSort(data.getSort());
dto.setLeaderUserId(data.getLeaderUserId());
dto.setPhone(data.getPhone());
dto.setEmail(data.getEmail());
dto.setStatus(data.getStatus());
return dto;
}
}

View File

@@ -0,0 +1,50 @@
package com.zt.plat.framework.databus.client.handler.dept;
import com.zt.plat.framework.databus.client.handler.SyncEventHandler;
import com.zt.plat.module.databus.api.data.DatabusDeptData;
import com.zt.plat.module.databus.api.message.DatabusMessage;
import com.zt.plat.module.databus.enums.DatabusEventType;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
/**
* 部门创建事件处理器
* <p>
* 使用条件:
* 1. zt.databus.sync.client.enabled=true
* 2. 存在 DeptSyncService Bean
*
* @author ZT
*/
@Slf4j
@Component
@ConditionalOnProperty(prefix = "zt.databus.sync.client", name = "enabled", havingValue = "true")
@ConditionalOnBean(DeptSyncService.class)
public class SystemDeptCreateHandler implements SyncEventHandler<DatabusDeptData> {
@Resource
private DeptSyncService deptSyncService;
@Override
public DatabusEventType getSupportedEventType() {
return DatabusEventType.SYSTEM_DEPT_CREATE;
}
@Override
public void handle(DatabusMessage<DatabusDeptData> message) {
DatabusDeptData data = message.getData();
log.info("[DeptSync] 收到部门创建事件, id={}, name={}, parentId={}",
data.getId(), data.getName(), data.getParentId());
try {
deptSyncService.create(data);
log.info("[DeptSync] 部门创建成功, id={}", data.getId());
} catch (Exception e) {
log.error("[DeptSync] 部门创建失败, id={}", data.getId(), e);
throw e; // 抛出异常触发重试
}
}
}

View File

@@ -0,0 +1,49 @@
package com.zt.plat.framework.databus.client.handler.dept;
import com.zt.plat.framework.databus.client.handler.SyncEventHandler;
import com.zt.plat.module.databus.api.data.DatabusDeptData;
import com.zt.plat.module.databus.api.message.DatabusMessage;
import com.zt.plat.module.databus.enums.DatabusEventType;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
/**
* 部门删除事件处理器
* <p>
* 使用条件:
* 1. zt.databus.sync.client.enabled=true
* 2. 存在 DeptSyncService Bean
*
* @author ZT
*/
@Slf4j
@Component
@ConditionalOnProperty(prefix = "zt.databus.sync.client", name = "enabled", havingValue = "true")
@ConditionalOnBean(DeptSyncService.class)
public class SystemDeptDeleteHandler implements SyncEventHandler<DatabusDeptData> {
@Resource
private DeptSyncService deptSyncService;
@Override
public DatabusEventType getSupportedEventType() {
return DatabusEventType.SYSTEM_DEPT_DELETE;
}
@Override
public void handle(DatabusMessage<DatabusDeptData> message) {
DatabusDeptData data = message.getData();
log.info("[DeptSync] 收到部门删除事件, id={}", data.getId());
try {
deptSyncService.delete(data.getId());
log.info("[DeptSync] 部门删除成功, id={}", data.getId());
} catch (Exception e) {
log.error("[DeptSync] 部门删除失败, id={}", data.getId(), e);
throw e; // 抛出异常触发重试
}
}
}

View File

@@ -0,0 +1,68 @@
package com.zt.plat.framework.databus.client.handler.dept;
import com.zt.plat.framework.databus.client.handler.BatchSyncEventHandler;
import com.zt.plat.module.databus.api.data.DatabusDeptData;
import com.zt.plat.module.databus.api.message.DatabusBatchMessage;
import com.zt.plat.module.databus.enums.DatabusEventType;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
/**
* 部门全量同步事件处理器(批量处理)
* <p>
* 使用条件:
* 1. zt.databus.sync.client.enabled=true
* 2. 存在 DeptSyncService Bean
*
* @author ZT
*/
@Slf4j
@Component
@ConditionalOnProperty(prefix = "zt.databus.sync.client", name = "enabled", havingValue = "true")
@ConditionalOnBean(DeptSyncService.class)
public class SystemDeptFullHandler implements BatchSyncEventHandler<DatabusDeptData> {
@Resource
private DeptSyncService deptSyncService;
@Override
public DatabusEventType getSupportedEventType() {
return DatabusEventType.SYSTEM_DEPT_FULL;
}
@Override
public void onFullSyncStart(DatabusBatchMessage<DatabusDeptData> message) {
log.info("[DeptSync] 开始部门全量同步, taskId={}, totalBatch={}",
message.getTaskId(), message.getTotalBatch());
}
@Override
public void handleBatch(DatabusBatchMessage<DatabusDeptData> message) {
log.info("[DeptSync] 处理部门批次数据, taskId={}, batchNo={}/{}, size={}",
message.getTaskId(), message.getBatchNo(), message.getTotalBatch(),
message.getDataList().size());
// 逐条处理全量同步数据
for (DatabusDeptData data : message.getDataList()) {
try {
deptSyncService.fullSync(data);
log.debug("[DeptSync] 部门全量同步成功, id={}, name={}", data.getId(), data.getName());
} catch (Exception e) {
log.error("[DeptSync] 部门全量同步失败, id={}, name={}", data.getId(), data.getName(), e);
// 单条失败不影响其他数据,继续处理
}
}
log.info("[DeptSync] 部门批次处理完成, taskId={}, batchNo={}/{}",
message.getTaskId(), message.getBatchNo(), message.getTotalBatch());
}
@Override
public void onFullSyncComplete(DatabusBatchMessage<DatabusDeptData> message) {
log.info("[DeptSync] 部门全量同步完成, taskId={}, totalBatch={}",
message.getTaskId(), message.getTotalBatch());
}
}

View File

@@ -0,0 +1,49 @@
package com.zt.plat.framework.databus.client.handler.dept;
import com.zt.plat.framework.databus.client.handler.SyncEventHandler;
import com.zt.plat.module.databus.api.data.DatabusDeptData;
import com.zt.plat.module.databus.api.message.DatabusMessage;
import com.zt.plat.module.databus.enums.DatabusEventType;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
/**
* 部门更新事件处理器
* <p>
* 使用条件:
* 1. zt.databus.sync.client.enabled=true
* 2. 存在 DeptSyncService Bean
*
* @author ZT
*/
@Slf4j
@Component
@ConditionalOnProperty(prefix = "zt.databus.sync.client", name = "enabled", havingValue = "true")
@ConditionalOnBean(DeptSyncService.class)
public class SystemDeptUpdateHandler implements SyncEventHandler<DatabusDeptData> {
@Resource
private DeptSyncService deptSyncService;
@Override
public DatabusEventType getSupportedEventType() {
return DatabusEventType.SYSTEM_DEPT_UPDATE;
}
@Override
public void handle(DatabusMessage<DatabusDeptData> message) {
DatabusDeptData data = message.getData();
log.info("[DeptSync] 收到部门更新事件, id={}, name={}", data.getId(), data.getName());
try {
deptSyncService.update(data);
log.info("[DeptSync] 部门更新成功, id={}", data.getId());
} catch (Exception e) {
log.error("[DeptSync] 部门更新失败, id={}", data.getId(), e);
throw e; // 抛出异常触发重试
}
}
}

View File

@@ -1,108 +1,43 @@
package com.zt.plat.framework.databus.client.handler.post; package com.zt.plat.framework.databus.client.handler.post;
import com.zt.plat.module.databus.api.data.PostData; import com.zt.plat.module.databus.api.data.DatabusPostData;
import com.zt.plat.module.system.api.dept.PostApi;
import com.zt.plat.module.system.api.dept.dto.PostSaveReqDTO;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Service;
/** /**
* 岗位同步业务逻辑 * 岗位同步服务接口
* <p> * <p>
* 被各个 PostHandler 共享使用 * 客户端可以自定义实现此接口,覆盖默认的 Feign 调用实现
* <p>
* 默认实现:{@link PostSyncServiceImpl}(通过 Feign 远程调用)
* *
* @author ZT * @author ZT
*/ */
@Slf4j public interface PostSyncService {
@Service
@ConditionalOnProperty(prefix = "zt.databus.sync.client", name = "enabled", havingValue = "true")
@ConditionalOnClass(name = "com.zt.plat.module.system.api.dept.PostApi")
public class PostSyncService {
@Autowired(required = false)
private PostApi postApi;
/** /**
* 创建岗位 * 创建岗位
*
* @param data 岗位数据
*/ */
public void create(PostData data) { void create(DatabusPostData data);
if (postApi == null) {
log.warn("[PostSync] PostApi未注入跳过创建岗位操作, postId={}", data.getId());
return;
}
PostSaveReqDTO dto = buildPostDTO(data);
postApi.createPost(dto).checkError();
log.info("[PostSync] 创建岗位成功, postId={}, postName={}", dto.getId(), dto.getName());
}
/** /**
* 更新岗位 * 更新岗位
*
* @param data 岗位数据
*/ */
public void update(PostData data) { void update(DatabusPostData data);
if (postApi == null) {
log.warn("[PostSync] PostApi未注入跳过更新岗位操作, postId={}", data.getId());
return;
}
PostSaveReqDTO dto = buildPostDTO(data);
postApi.updatePost(dto).checkError();
log.info("[PostSync] 更新岗位成功, postId={}, postName={}", dto.getId(), dto.getName());
}
/** /**
* 删除岗位 * 删除岗位
*
* @param id 岗位 ID
*/ */
public void delete(PostData data) { void delete(Long id);
if (postApi == null) {
log.warn("[PostSync] PostApi未注入跳过删除岗位操作, postId={}", data.getId());
return;
}
Long postId = data.getId();
if (postId != null) {
postApi.deletePost(postId).checkError();
log.info("[PostSync] 删除岗位成功, postId={}", postId);
}
}
/** /**
* 全量同步单条数据(存在则更新,不存在则创建 * 全量同步岗位(创建或更新
*
* @param data 岗位数据
*/ */
public void fullSync(PostData data) { void fullSync(DatabusPostData data);
if (postApi == null) {
log.warn("[PostSync] PostApi未注入跳过全量同步岗位操作, postId={}", data.getId());
return;
}
PostSaveReqDTO dto = buildPostDTO(data);
try {
if (dto.getId() != null) {
var existing = postApi.getPost(dto.getId());
if (existing.isSuccess() && existing.getData() != null) {
postApi.updatePost(dto).checkError();
} else {
postApi.createPost(dto).checkError();
}
} else {
postApi.createPost(dto).checkError();
}
} catch (Exception e) {
postApi.createPost(dto).checkError();
}
}
/**
* 构建岗位DTO
*/
private PostSaveReqDTO buildPostDTO(PostData data) {
PostSaveReqDTO dto = new PostSaveReqDTO();
dto.setId(data.getId());
dto.setCode(data.getCode());
dto.setName(data.getName());
dto.setSort(data.getSort());
dto.setStatus(data.getStatus());
dto.setRemark(data.getRemark());
return dto;
}
} }

View File

@@ -0,0 +1,103 @@
package com.zt.plat.framework.databus.client.handler.post;
import com.zt.plat.module.databus.api.data.DatabusPostData;
import com.zt.plat.module.system.api.dept.PostApi;
import com.zt.plat.module.system.api.dept.dto.PostSaveReqDTO;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Service;
/**
* 岗位同步服务默认实现(通过 Feign 调用)
* <p>
* 使用条件:
* 1. zt.databus.sync.client.enabled=true
* 2. PostApi 类存在于 classpath
* <p>
* 客户端可以自定义实现 PostSyncService 接口覆盖此默认实现
*
* @author ZT
*/
@Slf4j
@Service
@ConditionalOnProperty(prefix = "zt.databus.sync.client", name = "enabled", havingValue = "true")
@ConditionalOnClass(name = "com.zt.plat.module.system.api.dept.PostApi")
public class PostSyncServiceImpl implements PostSyncService {
@Autowired(required = false)
private PostApi postApi;
@Override
public void create(DatabusPostData data) {
if (postApi == null) {
log.warn("[PostSync] PostApi未注入跳过创建岗位操作, postId={}", data.getId());
return;
}
PostSaveReqDTO dto = buildPostDTO(data);
postApi.createPost(dto).checkError();
log.info("[PostSync] 岗位创建成功, postId={}, postName={}", dto.getId(), dto.getName());
}
@Override
public void update(DatabusPostData data) {
if (postApi == null) {
log.warn("[PostSync] PostApi未注入跳过更新岗位操作, postId={}", data.getId());
return;
}
PostSaveReqDTO dto = buildPostDTO(data);
postApi.updatePost(dto).checkError();
log.info("[PostSync] 岗位更新成功, postId={}, postName={}", dto.getId(), dto.getName());
}
@Override
public void delete(Long id) {
if (postApi == null) {
log.warn("[PostSync] PostApi未注入跳过删除岗位操作, postId={}", id);
return;
}
postApi.deletePost(id).checkError();
log.info("[PostSync] 岗位删除成功, postId={}", id);
}
@Override
public void fullSync(DatabusPostData data) {
if (postApi == null) {
log.warn("[PostSync] PostApi未注入跳过全量同步岗位操作, postId={}", data.getId());
return;
}
PostSaveReqDTO dto = buildPostDTO(data);
try {
// 尝试获取,存在则更新,不存在则创建
var existing = postApi.getPost(dto.getId());
if (existing.isSuccess() && existing.getData() != null) {
postApi.updatePost(dto).checkError();
log.info("[PostSync] 岗位全量同步-更新成功, postId={}, postName={}", dto.getId(), dto.getName());
} else {
postApi.createPost(dto).checkError();
log.info("[PostSync] 岗位全量同步-创建成功, postId={}, postName={}", dto.getId(), dto.getName());
}
} catch (Exception e) {
// 获取失败,尝试创建
try {
postApi.createPost(dto).checkError();
log.info("[PostSync] 岗位全量同步-创建成功, postId={}, postName={}", dto.getId(), dto.getName());
} catch (Exception createEx) {
log.error("[PostSync] 岗位全量同步失败, postId={}, postName={}", dto.getId(), dto.getName(), createEx);
throw createEx;
}
}
}
private PostSaveReqDTO buildPostDTO(DatabusPostData data) {
PostSaveReqDTO dto = new PostSaveReqDTO();
dto.setId(data.getId());
dto.setName(data.getName());
dto.setCode(data.getCode());
dto.setSort(data.getSort());
dto.setStatus(data.getStatus());
dto.setRemark(data.getRemark());
return dto;
}
}

View File

@@ -0,0 +1,50 @@
package com.zt.plat.framework.databus.client.handler.post;
import com.zt.plat.framework.databus.client.handler.SyncEventHandler;
import com.zt.plat.module.databus.api.data.DatabusPostData;
import com.zt.plat.module.databus.api.message.DatabusMessage;
import com.zt.plat.module.databus.enums.DatabusEventType;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
/**
* 岗位创建事件处理器
* <p>
* 使用条件:
* 1. zt.databus.sync.client.enabled=true
* 2. 存在 PostSyncService Bean
*
* @author ZT
*/
@Slf4j
@Component
@ConditionalOnProperty(prefix = "zt.databus.sync.client", name = "enabled", havingValue = "true")
@ConditionalOnBean(PostSyncService.class)
public class SystemPostCreateHandler implements SyncEventHandler<DatabusPostData> {
@Resource
private PostSyncService postSyncService;
@Override
public DatabusEventType getSupportedEventType() {
return DatabusEventType.SYSTEM_POST_CREATE;
}
@Override
public void handle(DatabusMessage<DatabusPostData> message) {
DatabusPostData data = message.getData();
log.info("[PostSync] 收到岗位创建事件, id={}, name={}, code={}",
data.getId(), data.getName(), data.getCode());
try {
postSyncService.create(data);
log.info("[PostSync] 岗位创建成功, id={}", data.getId());
} catch (Exception e) {
log.error("[PostSync] 岗位创建失败, id={}", data.getId(), e);
throw e; // 抛出异常触发重试
}
}
}

View File

@@ -0,0 +1,49 @@
package com.zt.plat.framework.databus.client.handler.post;
import com.zt.plat.framework.databus.client.handler.SyncEventHandler;
import com.zt.plat.module.databus.api.data.DatabusPostData;
import com.zt.plat.module.databus.api.message.DatabusMessage;
import com.zt.plat.module.databus.enums.DatabusEventType;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
/**
* 岗位删除事件处理器
* <p>
* 使用条件:
* 1. zt.databus.sync.client.enabled=true
* 2. 存在 PostSyncService Bean
*
* @author ZT
*/
@Slf4j
@Component
@ConditionalOnProperty(prefix = "zt.databus.sync.client", name = "enabled", havingValue = "true")
@ConditionalOnBean(PostSyncService.class)
public class SystemPostDeleteHandler implements SyncEventHandler<DatabusPostData> {
@Resource
private PostSyncService postSyncService;
@Override
public DatabusEventType getSupportedEventType() {
return DatabusEventType.SYSTEM_POST_DELETE;
}
@Override
public void handle(DatabusMessage<DatabusPostData> message) {
DatabusPostData data = message.getData();
log.info("[PostSync] 收到岗位删除事件, id={}", data.getId());
try {
postSyncService.delete(data.getId());
log.info("[PostSync] 岗位删除成功, id={}", data.getId());
} catch (Exception e) {
log.error("[PostSync] 岗位删除失败, id={}", data.getId(), e);
throw e; // 抛出异常触发重试
}
}
}

View File

@@ -1,7 +1,7 @@
package com.zt.plat.framework.databus.client.handler.post; package com.zt.plat.framework.databus.client.handler.post;
import com.zt.plat.framework.databus.client.handler.BatchSyncEventHandler; import com.zt.plat.framework.databus.client.handler.BatchSyncEventHandler;
import com.zt.plat.module.databus.api.data.PostData; import com.zt.plat.module.databus.api.data.DatabusPostData;
import com.zt.plat.module.databus.api.message.DatabusBatchMessage; import com.zt.plat.module.databus.api.message.DatabusBatchMessage;
import com.zt.plat.module.databus.enums.DatabusEventType; import com.zt.plat.module.databus.enums.DatabusEventType;
import jakarta.annotation.Resource; import jakarta.annotation.Resource;
@@ -11,7 +11,11 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
/** /**
* 岗位全量同步事件处理器 * 岗位全量同步事件处理器(批量处理)
* <p>
* 使用条件:
* 1. zt.databus.sync.client.enabled=true
* 2. 存在 PostSyncService Bean
* *
* @author ZT * @author ZT
*/ */
@@ -19,7 +23,7 @@ import org.springframework.stereotype.Component;
@Component @Component
@ConditionalOnProperty(prefix = "zt.databus.sync.client", name = "enabled", havingValue = "true") @ConditionalOnProperty(prefix = "zt.databus.sync.client", name = "enabled", havingValue = "true")
@ConditionalOnBean(PostSyncService.class) @ConditionalOnBean(PostSyncService.class)
public class SystemPostFullHandler implements BatchSyncEventHandler<PostData> { public class SystemPostFullHandler implements BatchSyncEventHandler<DatabusPostData> {
@Resource @Resource
private PostSyncService postSyncService; private PostSyncService postSyncService;
@@ -30,42 +34,35 @@ public class SystemPostFullHandler implements BatchSyncEventHandler<PostData> {
} }
@Override @Override
public void onFullSyncStart(DatabusBatchMessage<PostData> message) { public void onFullSyncStart(DatabusBatchMessage<DatabusPostData> message) {
log.info("[PostSync] 全量同步开始, taskId={}, totalCount={}, totalBatch={}", log.info("[PostSync] 开始岗位全量同步, taskId={}, totalBatch={}",
message.getTaskId(), message.getTotalCount(), message.getTotalBatch()); message.getTaskId(), message.getTotalBatch());
} }
@Override @Override
public void handleBatch(DatabusBatchMessage<PostData> message) { public void handleBatch(DatabusBatchMessage<DatabusPostData> message) {
log.info("[PostSync] 处理批次, batchNo={}/{}, count={}", log.info("[PostSync] 处理岗位批次数据, taskId={}, batchNo={}/{}, size={}",
message.getBatchNo(), message.getTotalBatch(), message.getCount()); message.getTaskId(), message.getBatchNo(), message.getTotalBatch(),
message.getDataList().size());
if (message.getDataList() == null || message.getDataList().isEmpty()) { // 逐条处理全量同步数据
log.warn("[PostSync] 数据列表为空, batchNo={}", message.getBatchNo()); for (DatabusPostData data : message.getDataList()) {
return;
}
int successCount = 0;
int failCount = 0;
for (PostData data : message.getDataList()) {
try { try {
postSyncService.fullSync(data); postSyncService.fullSync(data);
successCount++; log.debug("[PostSync] 岗位全量同步成功, id={}, name={}", data.getId(), data.getName());
} catch (Exception e) { } catch (Exception e) {
failCount++; log.error("[PostSync] 岗位全量同步失败, id={}, name={}", data.getId(), data.getName(), e);
log.error("[PostSync] 处理数据项失败, postId={}", data.getId(), e); // 单条失败不影响其他数据,继续处理
} }
} }
log.info("[PostSync] 批次处理完成, batchNo={}, success={}, fail={}", log.info("[PostSync] 岗位批次处理完成, taskId={}, batchNo={}/{}",
message.getBatchNo(), successCount, failCount); message.getTaskId(), message.getBatchNo(), message.getTotalBatch());
} }
@Override @Override
public void onFullSyncComplete(DatabusBatchMessage<PostData> message) { public void onFullSyncComplete(DatabusBatchMessage<DatabusPostData> message) {
log.info("[PostSync] 全量同步完成, taskId={}, totalCount={}", log.info("[PostSync] 岗位全量同步完成, taskId={}, totalBatch={}",
message.getTaskId(), message.getTotalCount()); message.getTaskId(), message.getTotalBatch());
} }
} }

View File

@@ -0,0 +1,50 @@
package com.zt.plat.framework.databus.client.handler.post;
import com.zt.plat.framework.databus.client.handler.SyncEventHandler;
import com.zt.plat.module.databus.api.data.DatabusPostData;
import com.zt.plat.module.databus.api.message.DatabusMessage;
import com.zt.plat.module.databus.enums.DatabusEventType;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
/**
* 岗位更新事件处理器
* <p>
* 使用条件:
* 1. zt.databus.sync.client.enabled=true
* 2. 存在 PostSyncService Bean
*
* @author ZT
*/
@Slf4j
@Component
@ConditionalOnProperty(prefix = "zt.databus.sync.client", name = "enabled", havingValue = "true")
@ConditionalOnBean(PostSyncService.class)
public class SystemPostUpdateHandler implements SyncEventHandler<DatabusPostData> {
@Resource
private PostSyncService postSyncService;
@Override
public DatabusEventType getSupportedEventType() {
return DatabusEventType.SYSTEM_POST_UPDATE;
}
@Override
public void handle(DatabusMessage<DatabusPostData> message) {
DatabusPostData data = message.getData();
log.info("[PostSync] 收到岗位更新事件, id={}, name={}, code={}",
data.getId(), data.getName(), data.getCode());
try {
postSyncService.update(data);
log.info("[PostSync] 岗位更新成功, id={}", data.getId());
} catch (Exception e) {
log.error("[PostSync] 岗位更新失败, id={}", data.getId(), e);
throw e; // 抛出异常触发重试
}
}
}

View File

@@ -0,0 +1,44 @@
package com.zt.plat.framework.databus.client.handler.user;
import com.zt.plat.module.databus.api.data.DatabusAdminUserData;
/**
* 用户同步服务接口
* <p>
* 分公司需要实现此接口,完成数据的本地持久化
* 或通过默认实现 {@link AdminUserSyncServiceImpl} 使用 Feign 调用远程 API
*
* @author ZT
*/
public interface AdminUserSyncService {
/**
* 创建用户(增量同步)
*
* @param data 用户数据
*/
void create(DatabusAdminUserData data);
/**
* 更新用户(增量同步)
*
* @param data 用户数据
*/
void update(DatabusAdminUserData data);
/**
* 删除用户(增量同步)
*
* @param id 用户ID
*/
void delete(Long id);
/**
* 全量同步单条数据
* <p>
* 逻辑:存在则更新,不存在则插入
*
* @param data 用户数据
*/
void fullSync(DatabusAdminUserData data);
}

View File

@@ -0,0 +1,112 @@
package com.zt.plat.framework.databus.client.handler.user;
import com.zt.plat.module.databus.api.data.DatabusAdminUserData;
import com.zt.plat.module.system.api.user.AdminUserApi;
import com.zt.plat.module.system.api.user.dto.AdminUserSaveReqDTO;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Service;
import java.util.HashSet;
import java.util.Set;
/**
* 用户同步服务实现(通过 Feign API 调用远程服务)
* <p>
* 使用条件:
* 1. zt.databus.sync.client.enabled=true
* 2. 系统中存在 AdminUserApi 接口Feign 客户端)
* <p>
* 如果分公司需要自定义实现,可以创建自己的 AdminUserSyncService Bean
* 此默认实现会自动失效(@ConditionalOnMissingBean
*
* @author ZT
*/
@Slf4j
@Service
@ConditionalOnProperty(prefix = "zt.databus.sync.client", name = "enabled", havingValue = "true")
@ConditionalOnClass(name = "com.zt.plat.module.system.api.user.AdminUserApi")
public class AdminUserSyncServiceImpl implements AdminUserSyncService {
@Autowired(required = false)
private AdminUserApi adminUserApi; // Feign 远程调用接口
@Override
public void create(DatabusAdminUserData data) {
if (adminUserApi == null) {
log.warn("[UserSync] AdminUserApi未注入跳过创建用户操作, userId={}", data.getId());
return;
}
AdminUserSaveReqDTO dto = buildUserDTO(data);
adminUserApi.createUser(dto).checkError();
log.info("[UserSync] 用户创建成功, userId={}, username={}", dto.getId(), dto.getUsername());
}
@Override
public void update(DatabusAdminUserData data) {
if (adminUserApi == null) {
log.warn("[UserSync] AdminUserApi未注入跳过更新用户操作, userId={}", data.getId());
return;
}
AdminUserSaveReqDTO dto = buildUserDTO(data);
adminUserApi.updateUser(dto).checkError();
log.info("[UserSync] 用户更新成功, userId={}, username={}", dto.getId(), dto.getUsername());
}
@Override
public void delete(Long id) {
if (adminUserApi == null) {
log.warn("[UserSync] AdminUserApi未注入跳过删除用户操作, userId={}", id);
return;
}
adminUserApi.deleteUser(id).checkError();
log.info("[UserSync] 用户删除成功, userId={}", id);
}
@Override
public void fullSync(DatabusAdminUserData data) {
if (adminUserApi == null) {
log.warn("[UserSync] AdminUserApi未注入跳过全量同步用户操作, userId={}", data.getId());
return;
}
AdminUserSaveReqDTO dto = buildUserDTO(data);
try {
// 尝试获取,存在则更新,不存在则创建
var existing = adminUserApi.getUser(dto.getId());
if (existing.isSuccess() && existing.getData() != null) {
adminUserApi.updateUser(dto).checkError();
log.info("[UserSync] 用户全量同步-更新成功, userId={}", dto.getId());
} else {
adminUserApi.createUser(dto).checkError();
log.info("[UserSync] 用户全量同步-创建成功, userId={}", dto.getId());
}
} catch (Exception e) {
// 获取失败,尝试创建
log.warn("[UserSync] 用户获取失败,尝试创建, userId={}", dto.getId());
adminUserApi.createUser(dto).checkError();
log.info("[UserSync] 用户全量同步-创建成功, userId={}", dto.getId());
}
}
/**
* 构建用户 DTO用于 Feign 调用)
*/
private AdminUserSaveReqDTO buildUserDTO(DatabusAdminUserData data) {
AdminUserSaveReqDTO dto = new AdminUserSaveReqDTO();
dto.setId(data.getId());
dto.setUsername(data.getUsername());
dto.setNickname(data.getNickname());
dto.setRemark(data.getRemark());
// 将 List<Long> 转换为 Set<Long>
dto.setDeptIds(data.getDeptIds() != null ? new HashSet<>(data.getDeptIds()) : null);
dto.setPostIds(data.getPostIds() != null ? new HashSet<>(data.getPostIds()) : null);
dto.setEmail(data.getEmail());
dto.setMobile(data.getMobile());
dto.setSex(data.getSex());
dto.setAvatar(data.getAvatar());
dto.setStatus(data.getStatus());
return dto;
}
}

View File

@@ -0,0 +1,49 @@
package com.zt.plat.framework.databus.client.handler.user;
import com.zt.plat.framework.databus.client.handler.SyncEventHandler;
import com.zt.plat.module.databus.api.data.DatabusAdminUserData;
import com.zt.plat.module.databus.api.message.DatabusMessage;
import com.zt.plat.module.databus.enums.DatabusEventType;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
/**
* 用户创建事件处理器
* <p>
* 使用条件:
* 1. zt.databus.sync.client.enabled=true
* 2. 存在 AdminUserSyncService Bean
*
* @author ZT
*/
@Slf4j
@Component
@ConditionalOnProperty(prefix = "zt.databus.sync.client", name = "enabled", havingValue = "true")
@ConditionalOnBean(AdminUserSyncService.class)
public class SystemUserCreateHandler implements SyncEventHandler<DatabusAdminUserData> {
@Resource
private AdminUserSyncService adminUserSyncService;
@Override
public DatabusEventType getSupportedEventType() {
return DatabusEventType.SYSTEM_USER_CREATE;
}
@Override
public void handle(DatabusMessage<DatabusAdminUserData> message) {
DatabusAdminUserData data = message.getData();
log.info("[UserSync] 收到用户创建事件, id={}, username={}", data.getId(), data.getUsername());
try {
adminUserSyncService.create(data);
log.info("[UserSync] 用户创建成功, id={}", data.getId());
} catch (Exception e) {
log.error("[UserSync] 用户创建失败, id={}", data.getId(), e);
throw e; // 抛出异常触发重试
}
}
}

View File

@@ -0,0 +1,49 @@
package com.zt.plat.framework.databus.client.handler.user;
import com.zt.plat.framework.databus.client.handler.SyncEventHandler;
import com.zt.plat.module.databus.api.data.DatabusAdminUserData;
import com.zt.plat.module.databus.api.message.DatabusMessage;
import com.zt.plat.module.databus.enums.DatabusEventType;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
/**
* 用户删除事件处理器
* <p>
* 使用条件:
* 1. zt.databus.sync.client.enabled=true
* 2. 存在 AdminUserSyncService Bean
*
* @author ZT
*/
@Slf4j
@Component
@ConditionalOnProperty(prefix = "zt.databus.sync.client", name = "enabled", havingValue = "true")
@ConditionalOnBean(AdminUserSyncService.class)
public class SystemUserDeleteHandler implements SyncEventHandler<DatabusAdminUserData> {
@Resource
private AdminUserSyncService adminUserSyncService;
@Override
public DatabusEventType getSupportedEventType() {
return DatabusEventType.SYSTEM_USER_DELETE;
}
@Override
public void handle(DatabusMessage<DatabusAdminUserData> message) {
DatabusAdminUserData data = message.getData();
log.info("[UserSync] 收到用户删除事件, id={}", data.getId());
try {
adminUserSyncService.delete(data.getId());
log.info("[UserSync] 用户删除成功, id={}", data.getId());
} catch (Exception e) {
log.error("[UserSync] 用户删除失败, id={}", data.getId(), e);
throw e; // 抛出异常触发重试
}
}
}

View File

@@ -0,0 +1,68 @@
package com.zt.plat.framework.databus.client.handler.user;
import com.zt.plat.framework.databus.client.handler.BatchSyncEventHandler;
import com.zt.plat.module.databus.api.data.DatabusAdminUserData;
import com.zt.plat.module.databus.api.message.DatabusBatchMessage;
import com.zt.plat.module.databus.enums.DatabusEventType;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
/**
* 用户全量同步事件处理器(批量处理)
* <p>
* 使用条件:
* 1. zt.databus.sync.client.enabled=true
* 2. 存在 AdminUserSyncService Bean
*
* @author ZT
*/
@Slf4j
@Component
@ConditionalOnProperty(prefix = "zt.databus.sync.client", name = "enabled", havingValue = "true")
@ConditionalOnBean(AdminUserSyncService.class)
public class SystemUserFullHandler implements BatchSyncEventHandler<DatabusAdminUserData> {
@Resource
private AdminUserSyncService adminUserSyncService;
@Override
public DatabusEventType getSupportedEventType() {
return DatabusEventType.SYSTEM_USER_FULL;
}
@Override
public void onFullSyncStart(DatabusBatchMessage<DatabusAdminUserData> message) {
log.info("[UserSync] 开始用户全量同步, taskId={}, totalBatch={}",
message.getTaskId(), message.getTotalBatch());
}
@Override
public void handleBatch(DatabusBatchMessage<DatabusAdminUserData> message) {
log.info("[UserSync] 处理用户批次数据, taskId={}, batchNo={}/{}, size={}",
message.getTaskId(), message.getBatchNo(), message.getTotalBatch(),
message.getDataList().size());
// 逐条处理全量同步数据
for (DatabusAdminUserData data : message.getDataList()) {
try {
adminUserSyncService.fullSync(data);
log.debug("[UserSync] 用户全量同步成功, id={}, username={}", data.getId(), data.getUsername());
} catch (Exception e) {
log.error("[UserSync] 用户全量同步失败, id={}, username={}", data.getId(), data.getUsername(), e);
// 单条失败不影响其他数据,继续处理
}
}
log.info("[UserSync] 用户批次处理完成, taskId={}, batchNo={}/{}",
message.getTaskId(), message.getBatchNo(), message.getTotalBatch());
}
@Override
public void onFullSyncComplete(DatabusBatchMessage<DatabusAdminUserData> message) {
log.info("[UserSync] 用户全量同步完成, taskId={}, totalBatch={}",
message.getTaskId(), message.getTotalBatch());
}
}

View File

@@ -0,0 +1,49 @@
package com.zt.plat.framework.databus.client.handler.user;
import com.zt.plat.framework.databus.client.handler.SyncEventHandler;
import com.zt.plat.module.databus.api.data.DatabusAdminUserData;
import com.zt.plat.module.databus.api.message.DatabusMessage;
import com.zt.plat.module.databus.enums.DatabusEventType;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
/**
* 用户更新事件处理器
* <p>
* 使用条件:
* 1. zt.databus.sync.client.enabled=true
* 2. 存在 AdminUserSyncService Bean
*
* @author ZT
*/
@Slf4j
@Component
@ConditionalOnProperty(prefix = "zt.databus.sync.client", name = "enabled", havingValue = "true")
@ConditionalOnBean(AdminUserSyncService.class)
public class SystemUserUpdateHandler implements SyncEventHandler<DatabusAdminUserData> {
@Resource
private AdminUserSyncService adminUserSyncService;
@Override
public DatabusEventType getSupportedEventType() {
return DatabusEventType.SYSTEM_USER_UPDATE;
}
@Override
public void handle(DatabusMessage<DatabusAdminUserData> message) {
DatabusAdminUserData data = message.getData();
log.info("[UserSync] 收到用户更新事件, id={}, username={}", data.getId(), data.getUsername());
try {
adminUserSyncService.update(data);
log.info("[UserSync] 用户更新成功, id={}", data.getId());
} catch (Exception e) {
log.error("[UserSync] 用户更新失败, id={}", data.getId(), e);
throw e; // 抛出异常触发重试
}
}
}