From 8329f9c8342462d3ef1fd58fcb048671def618d7 Mon Sep 17 00:00:00 2001 From: hewencai <2357300448@qq.com> Date: Tue, 2 Dec 2025 01:07:30 +0800 Subject: [PATCH] =?UTF-8?q?feat(databus-client):=20=E5=AE=8C=E6=88=90?= =?UTF-8?q?=E5=B2=97=E4=BD=8D=20Handler=20=E5=8F=8A=E7=BC=96=E8=AF=91?= =?UTF-8?q?=E9=AA=8C=E8=AF=81=EF=BC=88=E4=BB=BB=E5=8A=A1=2075-88=EF=BC=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 新增文件(任务 75-88): - PostSyncService.java - 岗位同步服务接口 - PostSyncServiceImpl.java - 岗位同步服务实现(Feign调用) - SystemPostCreateHandler.java - 岗位创建事件处理器 - SystemPostUpdateHandler.java - 岗位更新事件处理器 - SystemPostDeleteHandler.java - 岗位删除事件处理器 - SystemPostFullHandler.java - 岗位全量同步处理器(批量) 修复问题: 1. 修复 DTO 导入:DeptSaveReqVO → DeptSaveReqDTO, PostSaveReqVO → PostSaveReqDTO 2. 修复注解:@Resource(required=false) → @Autowired(required=false) 3. 修复 PostApi 包路径:com.zt.plat.module.system.api.post → com.zt.plat.module.system.api.dept 4. 修复 DeptSaveReqDTO 字段映射(移除不存在的字段:code, shortName, isCompany, isGroup, deptSource) 5. 修复 AdminUserSaveReqDTO 字段映射: - deptIds: List → Set - postIds: List → Set 编译结果:✅ BUILD SUCCESS(28个源文件) Ref: docs/databus/implementation-checklist.md 任务 75-88 --- .../client/handler/dept/DeptSyncService.java | 44 +++++++ .../handler/dept/DeptSyncServiceImpl.java | 105 ++++++++++++++++ .../handler/dept/SystemDeptCreateHandler.java | 50 ++++++++ .../handler/dept/SystemDeptDeleteHandler.java | 49 ++++++++ .../handler/dept/SystemDeptFullHandler.java | 68 +++++++++++ .../handler/dept/SystemDeptUpdateHandler.java | 49 ++++++++ .../client/handler/post/PostSyncService.java | 103 +++------------- .../handler/post/PostSyncServiceImpl.java | 103 ++++++++++++++++ .../handler/post/SystemPostCreateHandler.java | 50 ++++++++ .../handler/post/SystemPostDeleteHandler.java | 49 ++++++++ .../handler/post/SystemPostFullHandler.java | 51 ++++---- .../handler/post/SystemPostUpdateHandler.java | 50 ++++++++ .../handler/user/AdminUserSyncService.java | 44 +++++++ .../user/AdminUserSyncServiceImpl.java | 112 ++++++++++++++++++ .../handler/user/SystemUserCreateHandler.java | 49 ++++++++ .../handler/user/SystemUserDeleteHandler.java | 49 ++++++++ .../handler/user/SystemUserFullHandler.java | 68 +++++++++++ .../handler/user/SystemUserUpdateHandler.java | 49 ++++++++ 18 files changed, 1031 insertions(+), 111 deletions(-) create mode 100644 zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/dept/DeptSyncService.java create mode 100644 zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/dept/DeptSyncServiceImpl.java create mode 100644 zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/dept/SystemDeptCreateHandler.java create mode 100644 zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/dept/SystemDeptDeleteHandler.java create mode 100644 zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/dept/SystemDeptFullHandler.java create mode 100644 zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/dept/SystemDeptUpdateHandler.java create mode 100644 zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/post/PostSyncServiceImpl.java create mode 100644 zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/post/SystemPostCreateHandler.java create mode 100644 zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/post/SystemPostDeleteHandler.java create mode 100644 zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/post/SystemPostUpdateHandler.java create mode 100644 zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/user/AdminUserSyncService.java create mode 100644 zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/user/AdminUserSyncServiceImpl.java create mode 100644 zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/user/SystemUserCreateHandler.java create mode 100644 zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/user/SystemUserDeleteHandler.java create mode 100644 zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/user/SystemUserFullHandler.java create mode 100644 zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/user/SystemUserUpdateHandler.java diff --git a/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/dept/DeptSyncService.java b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/dept/DeptSyncService.java new file mode 100644 index 00000000..42feee49 --- /dev/null +++ b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/dept/DeptSyncService.java @@ -0,0 +1,44 @@ +package com.zt.plat.framework.databus.client.handler.dept; + +import com.zt.plat.module.databus.api.data.DatabusDeptData; + +/** + * 部门同步服务接口 + *

+ * 分公司需要实现此接口,完成数据的本地持久化 + * 或通过默认实现 {@link DeptSyncServiceImpl} 使用 Feign 调用远程 API + * + * @author ZT + */ +public interface DeptSyncService { + + /** + * 创建部门(增量同步) + * + * @param data 部门数据 + */ + void create(DatabusDeptData data); + + /** + * 更��部门(增量同步) + * + * @param data 部门数据 + */ + void update(DatabusDeptData data); + + /** + * 删除部门(增量同步) + * + * @param id 部门ID + */ + void delete(Long id); + + /** + * 全量同步单条数据 + *

+ * 逻辑:存在则更新,不存在则插入 + * + * @param data 部门数据 + */ + void fullSync(DatabusDeptData data); +} diff --git a/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/dept/DeptSyncServiceImpl.java b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/dept/DeptSyncServiceImpl.java new file mode 100644 index 00000000..ca760435 --- /dev/null +++ b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/dept/DeptSyncServiceImpl.java @@ -0,0 +1,105 @@ +package com.zt.plat.framework.databus.client.handler.dept; + +import com.zt.plat.module.databus.api.data.DatabusDeptData; +import com.zt.plat.module.system.api.dept.DeptApi; +import com.zt.plat.module.system.api.dept.dto.DeptSaveReqDTO; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.stereotype.Service; + +/** + * 部门同步服务实现(通过 Feign API 调用远程服务) + *

+ * 使用条件: + * 1. zt.databus.sync.client.enabled=true + * 2. 系统中存在 DeptApi 接口(Feign 客户端) + *

+ * 如果分公司需要自定义实现,可以创建自己的 DeptSyncService Bean, + * 此默认实现会自动失效(@ConditionalOnMissingBean) + * + * @author ZT + */ +@Slf4j +@Service +@ConditionalOnProperty(prefix = "zt.databus.sync.client", name = "enabled", havingValue = "true") +@ConditionalOnClass(name = "com.zt.plat.module.system.api.dept.DeptApi") +public class DeptSyncServiceImpl implements DeptSyncService { + + @Autowired(required = false) + private DeptApi deptApi; // Feign 远程调用接口 + + @Override + public void create(DatabusDeptData data) { + if (deptApi == null) { + log.warn("[DeptSync] DeptApi未注入,跳过创建部门操作, deptId={}", data.getId()); + return; + } + DeptSaveReqDTO dto = buildDeptDTO(data); + deptApi.createDept(dto).checkError(); + log.info("[DeptSync] 部门创建成功, deptId={}, deptName={}", dto.getId(), dto.getName()); + } + + @Override + public void update(DatabusDeptData data) { + if (deptApi == null) { + log.warn("[DeptSync] DeptApi未注入,跳过更新部门操作, deptId={}", data.getId()); + return; + } + DeptSaveReqDTO dto = buildDeptDTO(data); + deptApi.updateDept(dto).checkError(); + log.info("[DeptSync] 部门更新成功, deptId={}, deptName={}", dto.getId(), dto.getName()); + } + + @Override + public void delete(Long id) { + if (deptApi == null) { + log.warn("[DeptSync] DeptApi未注入,跳过删除部门操作, deptId={}", id); + return; + } + deptApi.deleteDept(id).checkError(); + log.info("[DeptSync] 部门删除成功, deptId={}", id); + } + + @Override + public void fullSync(DatabusDeptData data) { + if (deptApi == null) { + log.warn("[DeptSync] DeptApi未注入,跳过全量同步部门操作, deptId={}", data.getId()); + return; + } + DeptSaveReqDTO dto = buildDeptDTO(data); + try { + // 尝试获取,存在则更新,不存在则创建 + var existing = deptApi.getDept(dto.getId()); + if (existing.isSuccess() && existing.getData() != null) { + deptApi.updateDept(dto).checkError(); + log.info("[DeptSync] 部门全量同步-更新成功, deptId={}", dto.getId()); + } else { + deptApi.createDept(dto).checkError(); + log.info("[DeptSync] 部门全量同步-创建成功, deptId={}", dto.getId()); + } + } catch (Exception e) { + // 获取失败,尝试创建 + log.warn("[DeptSync] 部门获取失败,尝试创建, deptId={}", dto.getId()); + deptApi.createDept(dto).checkError(); + log.info("[DeptSync] 部门全量同步-创建成功, deptId={}", dto.getId()); + } + } + + /** + * 构建部门 DTO(用于 Feign 调用) + */ + private DeptSaveReqDTO buildDeptDTO(DatabusDeptData data) { + DeptSaveReqDTO dto = new DeptSaveReqDTO(); + dto.setId(data.getId()); + dto.setName(data.getName()); + dto.setParentId(data.getParentId()); + dto.setSort(data.getSort()); + dto.setLeaderUserId(data.getLeaderUserId()); + dto.setPhone(data.getPhone()); + dto.setEmail(data.getEmail()); + dto.setStatus(data.getStatus()); + return dto; + } +} diff --git a/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/dept/SystemDeptCreateHandler.java b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/dept/SystemDeptCreateHandler.java new file mode 100644 index 00000000..7bcad72f --- /dev/null +++ b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/dept/SystemDeptCreateHandler.java @@ -0,0 +1,50 @@ +package com.zt.plat.framework.databus.client.handler.dept; + +import com.zt.plat.framework.databus.client.handler.SyncEventHandler; +import com.zt.plat.module.databus.api.data.DatabusDeptData; +import com.zt.plat.module.databus.api.message.DatabusMessage; +import com.zt.plat.module.databus.enums.DatabusEventType; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.stereotype.Component; + +/** + * 部门创建事件处理器 + *

+ * 使用条件: + * 1. zt.databus.sync.client.enabled=true + * 2. 存在 DeptSyncService Bean + * + * @author ZT + */ +@Slf4j +@Component +@ConditionalOnProperty(prefix = "zt.databus.sync.client", name = "enabled", havingValue = "true") +@ConditionalOnBean(DeptSyncService.class) +public class SystemDeptCreateHandler implements SyncEventHandler { + + @Resource + private DeptSyncService deptSyncService; + + @Override + public DatabusEventType getSupportedEventType() { + return DatabusEventType.SYSTEM_DEPT_CREATE; + } + + @Override + public void handle(DatabusMessage message) { + DatabusDeptData data = message.getData(); + log.info("[DeptSync] 收到部门创建事件, id={}, name={}, parentId={}", + data.getId(), data.getName(), data.getParentId()); + + try { + deptSyncService.create(data); + log.info("[DeptSync] 部门创建成功, id={}", data.getId()); + } catch (Exception e) { + log.error("[DeptSync] 部门创建失败, id={}", data.getId(), e); + throw e; // 抛出异常触发重试 + } + } +} diff --git a/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/dept/SystemDeptDeleteHandler.java b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/dept/SystemDeptDeleteHandler.java new file mode 100644 index 00000000..4ac8c687 --- /dev/null +++ b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/dept/SystemDeptDeleteHandler.java @@ -0,0 +1,49 @@ +package com.zt.plat.framework.databus.client.handler.dept; + +import com.zt.plat.framework.databus.client.handler.SyncEventHandler; +import com.zt.plat.module.databus.api.data.DatabusDeptData; +import com.zt.plat.module.databus.api.message.DatabusMessage; +import com.zt.plat.module.databus.enums.DatabusEventType; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.stereotype.Component; + +/** + * 部门删除事件处理器 + *

+ * 使用条件: + * 1. zt.databus.sync.client.enabled=true + * 2. 存在 DeptSyncService Bean + * + * @author ZT + */ +@Slf4j +@Component +@ConditionalOnProperty(prefix = "zt.databus.sync.client", name = "enabled", havingValue = "true") +@ConditionalOnBean(DeptSyncService.class) +public class SystemDeptDeleteHandler implements SyncEventHandler { + + @Resource + private DeptSyncService deptSyncService; + + @Override + public DatabusEventType getSupportedEventType() { + return DatabusEventType.SYSTEM_DEPT_DELETE; + } + + @Override + public void handle(DatabusMessage message) { + DatabusDeptData data = message.getData(); + log.info("[DeptSync] 收到部门删除事件, id={}", data.getId()); + + try { + deptSyncService.delete(data.getId()); + log.info("[DeptSync] 部门删除成功, id={}", data.getId()); + } catch (Exception e) { + log.error("[DeptSync] 部门删除失败, id={}", data.getId(), e); + throw e; // 抛出异常触发重试 + } + } +} diff --git a/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/dept/SystemDeptFullHandler.java b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/dept/SystemDeptFullHandler.java new file mode 100644 index 00000000..fc68dc38 --- /dev/null +++ b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/dept/SystemDeptFullHandler.java @@ -0,0 +1,68 @@ +package com.zt.plat.framework.databus.client.handler.dept; + +import com.zt.plat.framework.databus.client.handler.BatchSyncEventHandler; +import com.zt.plat.module.databus.api.data.DatabusDeptData; +import com.zt.plat.module.databus.api.message.DatabusBatchMessage; +import com.zt.plat.module.databus.enums.DatabusEventType; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.stereotype.Component; + +/** + * 部门全量同步事件处理器(批量处理) + *

+ * 使用条件: + * 1. zt.databus.sync.client.enabled=true + * 2. 存在 DeptSyncService Bean + * + * @author ZT + */ +@Slf4j +@Component +@ConditionalOnProperty(prefix = "zt.databus.sync.client", name = "enabled", havingValue = "true") +@ConditionalOnBean(DeptSyncService.class) +public class SystemDeptFullHandler implements BatchSyncEventHandler { + + @Resource + private DeptSyncService deptSyncService; + + @Override + public DatabusEventType getSupportedEventType() { + return DatabusEventType.SYSTEM_DEPT_FULL; + } + + @Override + public void onFullSyncStart(DatabusBatchMessage message) { + log.info("[DeptSync] 开始部门全量同步, taskId={}, totalBatch={}", + message.getTaskId(), message.getTotalBatch()); + } + + @Override + public void handleBatch(DatabusBatchMessage message) { + log.info("[DeptSync] 处理部门批次数据, taskId={}, batchNo={}/{}, size={}", + message.getTaskId(), message.getBatchNo(), message.getTotalBatch(), + message.getDataList().size()); + + // 逐条处理全量同步数据 + for (DatabusDeptData data : message.getDataList()) { + try { + deptSyncService.fullSync(data); + log.debug("[DeptSync] 部门全量同步成功, id={}, name={}", data.getId(), data.getName()); + } catch (Exception e) { + log.error("[DeptSync] 部门全量同步失败, id={}, name={}", data.getId(), data.getName(), e); + // 单条失败不影响其他数据,继续处理 + } + } + + log.info("[DeptSync] 部门批次处理完成, taskId={}, batchNo={}/{}", + message.getTaskId(), message.getBatchNo(), message.getTotalBatch()); + } + + @Override + public void onFullSyncComplete(DatabusBatchMessage message) { + log.info("[DeptSync] 部门全量同步完成, taskId={}, totalBatch={}", + message.getTaskId(), message.getTotalBatch()); + } +} diff --git a/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/dept/SystemDeptUpdateHandler.java b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/dept/SystemDeptUpdateHandler.java new file mode 100644 index 00000000..8e7f68ed --- /dev/null +++ b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/dept/SystemDeptUpdateHandler.java @@ -0,0 +1,49 @@ +package com.zt.plat.framework.databus.client.handler.dept; + +import com.zt.plat.framework.databus.client.handler.SyncEventHandler; +import com.zt.plat.module.databus.api.data.DatabusDeptData; +import com.zt.plat.module.databus.api.message.DatabusMessage; +import com.zt.plat.module.databus.enums.DatabusEventType; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.stereotype.Component; + +/** + * 部门更新事件处理器 + *

+ * 使用条件: + * 1. zt.databus.sync.client.enabled=true + * 2. 存在 DeptSyncService Bean + * + * @author ZT + */ +@Slf4j +@Component +@ConditionalOnProperty(prefix = "zt.databus.sync.client", name = "enabled", havingValue = "true") +@ConditionalOnBean(DeptSyncService.class) +public class SystemDeptUpdateHandler implements SyncEventHandler { + + @Resource + private DeptSyncService deptSyncService; + + @Override + public DatabusEventType getSupportedEventType() { + return DatabusEventType.SYSTEM_DEPT_UPDATE; + } + + @Override + public void handle(DatabusMessage message) { + DatabusDeptData data = message.getData(); + log.info("[DeptSync] 收到部门更新事件, id={}, name={}", data.getId(), data.getName()); + + try { + deptSyncService.update(data); + log.info("[DeptSync] 部门更新成功, id={}", data.getId()); + } catch (Exception e) { + log.error("[DeptSync] 部门更新失败, id={}", data.getId(), e); + throw e; // 抛出异常触发重试 + } + } +} diff --git a/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/post/PostSyncService.java b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/post/PostSyncService.java index f42330a9..f5346d2d 100644 --- a/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/post/PostSyncService.java +++ b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/post/PostSyncService.java @@ -1,108 +1,43 @@ package com.zt.plat.framework.databus.client.handler.post; -import com.zt.plat.module.databus.api.data.PostData; -import com.zt.plat.module.system.api.dept.PostApi; -import com.zt.plat.module.system.api.dept.dto.PostSaveReqDTO; -import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; -import org.springframework.stereotype.Service; +import com.zt.plat.module.databus.api.data.DatabusPostData; /** - * 岗位同步业务逻辑 + * 岗位同步服务接口 *

- * 被各个 PostHandler 共享使用 + * 客户端可以自定义实现此接口,覆盖默认的 Feign 调用实现 + *

+ * 默认实现:{@link PostSyncServiceImpl}(通过 Feign 远程调用) * * @author ZT */ -@Slf4j -@Service -@ConditionalOnProperty(prefix = "zt.databus.sync.client", name = "enabled", havingValue = "true") -@ConditionalOnClass(name = "com.zt.plat.module.system.api.dept.PostApi") -public class PostSyncService { - - @Autowired(required = false) - private PostApi postApi; +public interface PostSyncService { /** * 创建岗位 + * + * @param data 岗位数据 */ - public void create(PostData data) { - if (postApi == null) { - log.warn("[PostSync] PostApi未注入,跳过创建岗位操作, postId={}", data.getId()); - return; - } - PostSaveReqDTO dto = buildPostDTO(data); - postApi.createPost(dto).checkError(); - log.info("[PostSync] 创建岗位成功, postId={}, postName={}", dto.getId(), dto.getName()); - } + void create(DatabusPostData data); /** * 更新岗位 + * + * @param data 岗位数据 */ - public void update(PostData data) { - if (postApi == null) { - log.warn("[PostSync] PostApi未注入,跳过更新岗位操作, postId={}", data.getId()); - return; - } - PostSaveReqDTO dto = buildPostDTO(data); - postApi.updatePost(dto).checkError(); - log.info("[PostSync] 更新岗位成功, postId={}, postName={}", dto.getId(), dto.getName()); - } + void update(DatabusPostData data); /** * 删除岗位 + * + * @param id 岗位 ID */ - public void delete(PostData data) { - if (postApi == null) { - log.warn("[PostSync] PostApi未注入,跳过删除岗位操作, postId={}", data.getId()); - return; - } - Long postId = data.getId(); - if (postId != null) { - postApi.deletePost(postId).checkError(); - log.info("[PostSync] 删除岗位成功, postId={}", postId); - } - } + void delete(Long id); /** - * 全量同步单条数据(存在则更新,不存在则创建) + * 全量同步岗位(创建或更新) + * + * @param data 岗位数据 */ - public void fullSync(PostData data) { - if (postApi == null) { - log.warn("[PostSync] PostApi未注入,跳过全量同步岗位操作, postId={}", data.getId()); - return; - } - PostSaveReqDTO dto = buildPostDTO(data); - try { - if (dto.getId() != null) { - var existing = postApi.getPost(dto.getId()); - if (existing.isSuccess() && existing.getData() != null) { - postApi.updatePost(dto).checkError(); - } else { - postApi.createPost(dto).checkError(); - } - } else { - postApi.createPost(dto).checkError(); - } - } catch (Exception e) { - postApi.createPost(dto).checkError(); - } - } - - /** - * 构建岗位DTO - */ - private PostSaveReqDTO buildPostDTO(PostData data) { - PostSaveReqDTO dto = new PostSaveReqDTO(); - dto.setId(data.getId()); - dto.setCode(data.getCode()); - dto.setName(data.getName()); - dto.setSort(data.getSort()); - dto.setStatus(data.getStatus()); - dto.setRemark(data.getRemark()); - return dto; - } - + void fullSync(DatabusPostData data); } diff --git a/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/post/PostSyncServiceImpl.java b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/post/PostSyncServiceImpl.java new file mode 100644 index 00000000..463fdc6e --- /dev/null +++ b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/post/PostSyncServiceImpl.java @@ -0,0 +1,103 @@ +package com.zt.plat.framework.databus.client.handler.post; + +import com.zt.plat.module.databus.api.data.DatabusPostData; +import com.zt.plat.module.system.api.dept.PostApi; +import com.zt.plat.module.system.api.dept.dto.PostSaveReqDTO; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.stereotype.Service; + +/** + * 岗位同步服务默认实现(通过 Feign 调用) + *

+ * 使用条件: + * 1. zt.databus.sync.client.enabled=true + * 2. PostApi 类存在于 classpath + *

+ * 客户端可以自定义实现 PostSyncService 接口覆盖此默认实现 + * + * @author ZT + */ +@Slf4j +@Service +@ConditionalOnProperty(prefix = "zt.databus.sync.client", name = "enabled", havingValue = "true") +@ConditionalOnClass(name = "com.zt.plat.module.system.api.dept.PostApi") +public class PostSyncServiceImpl implements PostSyncService { + + @Autowired(required = false) + private PostApi postApi; + + @Override + public void create(DatabusPostData data) { + if (postApi == null) { + log.warn("[PostSync] PostApi未注入,跳过创建岗位操作, postId={}", data.getId()); + return; + } + PostSaveReqDTO dto = buildPostDTO(data); + postApi.createPost(dto).checkError(); + log.info("[PostSync] 岗位创建成功, postId={}, postName={}", dto.getId(), dto.getName()); + } + + @Override + public void update(DatabusPostData data) { + if (postApi == null) { + log.warn("[PostSync] PostApi未注入,跳过更新岗位操作, postId={}", data.getId()); + return; + } + PostSaveReqDTO dto = buildPostDTO(data); + postApi.updatePost(dto).checkError(); + log.info("[PostSync] 岗位更新成功, postId={}, postName={}", dto.getId(), dto.getName()); + } + + @Override + public void delete(Long id) { + if (postApi == null) { + log.warn("[PostSync] PostApi未注入,跳过删除岗位操作, postId={}", id); + return; + } + postApi.deletePost(id).checkError(); + log.info("[PostSync] 岗位删除成功, postId={}", id); + } + + @Override + public void fullSync(DatabusPostData data) { + if (postApi == null) { + log.warn("[PostSync] PostApi未注入,跳过全量同步岗位操作, postId={}", data.getId()); + return; + } + PostSaveReqDTO dto = buildPostDTO(data); + try { + // 尝试获取,存在则更新,不存在则创建 + var existing = postApi.getPost(dto.getId()); + if (existing.isSuccess() && existing.getData() != null) { + postApi.updatePost(dto).checkError(); + log.info("[PostSync] 岗位全量同步-更新成功, postId={}, postName={}", dto.getId(), dto.getName()); + } else { + postApi.createPost(dto).checkError(); + log.info("[PostSync] 岗位全量同步-创建成功, postId={}, postName={}", dto.getId(), dto.getName()); + } + } catch (Exception e) { + // 获取失败,尝试创建 + try { + postApi.createPost(dto).checkError(); + log.info("[PostSync] 岗位全量同步-创建成功, postId={}, postName={}", dto.getId(), dto.getName()); + } catch (Exception createEx) { + log.error("[PostSync] 岗位全量同步失败, postId={}, postName={}", dto.getId(), dto.getName(), createEx); + throw createEx; + } + } + } + + private PostSaveReqDTO buildPostDTO(DatabusPostData data) { + PostSaveReqDTO dto = new PostSaveReqDTO(); + dto.setId(data.getId()); + dto.setName(data.getName()); + dto.setCode(data.getCode()); + dto.setSort(data.getSort()); + dto.setStatus(data.getStatus()); + dto.setRemark(data.getRemark()); + return dto; + } +} diff --git a/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/post/SystemPostCreateHandler.java b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/post/SystemPostCreateHandler.java new file mode 100644 index 00000000..98eb55df --- /dev/null +++ b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/post/SystemPostCreateHandler.java @@ -0,0 +1,50 @@ +package com.zt.plat.framework.databus.client.handler.post; + +import com.zt.plat.framework.databus.client.handler.SyncEventHandler; +import com.zt.plat.module.databus.api.data.DatabusPostData; +import com.zt.plat.module.databus.api.message.DatabusMessage; +import com.zt.plat.module.databus.enums.DatabusEventType; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.stereotype.Component; + +/** + * 岗位创建事件处理器 + *

+ * 使用条件: + * 1. zt.databus.sync.client.enabled=true + * 2. 存在 PostSyncService Bean + * + * @author ZT + */ +@Slf4j +@Component +@ConditionalOnProperty(prefix = "zt.databus.sync.client", name = "enabled", havingValue = "true") +@ConditionalOnBean(PostSyncService.class) +public class SystemPostCreateHandler implements SyncEventHandler { + + @Resource + private PostSyncService postSyncService; + + @Override + public DatabusEventType getSupportedEventType() { + return DatabusEventType.SYSTEM_POST_CREATE; + } + + @Override + public void handle(DatabusMessage message) { + DatabusPostData data = message.getData(); + log.info("[PostSync] 收到岗位创建事件, id={}, name={}, code={}", + data.getId(), data.getName(), data.getCode()); + + try { + postSyncService.create(data); + log.info("[PostSync] 岗位创建成功, id={}", data.getId()); + } catch (Exception e) { + log.error("[PostSync] 岗位创建失败, id={}", data.getId(), e); + throw e; // 抛出异常触发重试 + } + } +} diff --git a/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/post/SystemPostDeleteHandler.java b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/post/SystemPostDeleteHandler.java new file mode 100644 index 00000000..645648a2 --- /dev/null +++ b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/post/SystemPostDeleteHandler.java @@ -0,0 +1,49 @@ +package com.zt.plat.framework.databus.client.handler.post; + +import com.zt.plat.framework.databus.client.handler.SyncEventHandler; +import com.zt.plat.module.databus.api.data.DatabusPostData; +import com.zt.plat.module.databus.api.message.DatabusMessage; +import com.zt.plat.module.databus.enums.DatabusEventType; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.stereotype.Component; + +/** + * 岗位删除事件处理器 + *

+ * 使用条件: + * 1. zt.databus.sync.client.enabled=true + * 2. 存在 PostSyncService Bean + * + * @author ZT + */ +@Slf4j +@Component +@ConditionalOnProperty(prefix = "zt.databus.sync.client", name = "enabled", havingValue = "true") +@ConditionalOnBean(PostSyncService.class) +public class SystemPostDeleteHandler implements SyncEventHandler { + + @Resource + private PostSyncService postSyncService; + + @Override + public DatabusEventType getSupportedEventType() { + return DatabusEventType.SYSTEM_POST_DELETE; + } + + @Override + public void handle(DatabusMessage message) { + DatabusPostData data = message.getData(); + log.info("[PostSync] 收到岗位删除事件, id={}", data.getId()); + + try { + postSyncService.delete(data.getId()); + log.info("[PostSync] 岗位删除成功, id={}", data.getId()); + } catch (Exception e) { + log.error("[PostSync] 岗位删除失败, id={}", data.getId(), e); + throw e; // 抛出异常触发重试 + } + } +} diff --git a/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/post/SystemPostFullHandler.java b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/post/SystemPostFullHandler.java index 50727a74..10be89a0 100644 --- a/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/post/SystemPostFullHandler.java +++ b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/post/SystemPostFullHandler.java @@ -1,7 +1,7 @@ package com.zt.plat.framework.databus.client.handler.post; import com.zt.plat.framework.databus.client.handler.BatchSyncEventHandler; -import com.zt.plat.module.databus.api.data.PostData; +import com.zt.plat.module.databus.api.data.DatabusPostData; import com.zt.plat.module.databus.api.message.DatabusBatchMessage; import com.zt.plat.module.databus.enums.DatabusEventType; import jakarta.annotation.Resource; @@ -11,7 +11,11 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.stereotype.Component; /** - * 岗位全量同步事件处理器 + * 岗位全量同步事件处理器(批量处理) + *

+ * 使用条件: + * 1. zt.databus.sync.client.enabled=true + * 2. 存在 PostSyncService Bean * * @author ZT */ @@ -19,7 +23,7 @@ import org.springframework.stereotype.Component; @Component @ConditionalOnProperty(prefix = "zt.databus.sync.client", name = "enabled", havingValue = "true") @ConditionalOnBean(PostSyncService.class) -public class SystemPostFullHandler implements BatchSyncEventHandler { +public class SystemPostFullHandler implements BatchSyncEventHandler { @Resource private PostSyncService postSyncService; @@ -30,42 +34,35 @@ public class SystemPostFullHandler implements BatchSyncEventHandler { } @Override - public void onFullSyncStart(DatabusBatchMessage message) { - log.info("[PostSync] 全量同步开始, taskId={}, totalCount={}, totalBatch={}", - message.getTaskId(), message.getTotalCount(), message.getTotalBatch()); + public void onFullSyncStart(DatabusBatchMessage message) { + log.info("[PostSync] 开始岗位全量同步, taskId={}, totalBatch={}", + message.getTaskId(), message.getTotalBatch()); } @Override - public void handleBatch(DatabusBatchMessage message) { - log.info("[PostSync] 处理批次, batchNo={}/{}, count={}", - message.getBatchNo(), message.getTotalBatch(), message.getCount()); + public void handleBatch(DatabusBatchMessage message) { + log.info("[PostSync] 处理岗位批次数据, taskId={}, batchNo={}/{}, size={}", + message.getTaskId(), message.getBatchNo(), message.getTotalBatch(), + message.getDataList().size()); - if (message.getDataList() == null || message.getDataList().isEmpty()) { - log.warn("[PostSync] 数据列表为空, batchNo={}", message.getBatchNo()); - return; - } - - int successCount = 0; - int failCount = 0; - - for (PostData data : message.getDataList()) { + // 逐条处理全量同步数据 + for (DatabusPostData data : message.getDataList()) { try { postSyncService.fullSync(data); - successCount++; + log.debug("[PostSync] 岗位全量同步成功, id={}, name={}", data.getId(), data.getName()); } catch (Exception e) { - failCount++; - log.error("[PostSync] 处理数据项失败, postId={}", data.getId(), e); + log.error("[PostSync] 岗位全量同步失败, id={}, name={}", data.getId(), data.getName(), e); + // 单条失败不影响其他数据,继续处理 } } - log.info("[PostSync] 批次处理完成, batchNo={}, success={}, fail={}", - message.getBatchNo(), successCount, failCount); + log.info("[PostSync] 岗位批次处理完成, taskId={}, batchNo={}/{}", + message.getTaskId(), message.getBatchNo(), message.getTotalBatch()); } @Override - public void onFullSyncComplete(DatabusBatchMessage message) { - log.info("[PostSync] 全量同步完成, taskId={}, totalCount={}", - message.getTaskId(), message.getTotalCount()); + public void onFullSyncComplete(DatabusBatchMessage message) { + log.info("[PostSync] 岗位全量同步完成, taskId={}, totalBatch={}", + message.getTaskId(), message.getTotalBatch()); } - } diff --git a/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/post/SystemPostUpdateHandler.java b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/post/SystemPostUpdateHandler.java new file mode 100644 index 00000000..e8d40e82 --- /dev/null +++ b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/post/SystemPostUpdateHandler.java @@ -0,0 +1,50 @@ +package com.zt.plat.framework.databus.client.handler.post; + +import com.zt.plat.framework.databus.client.handler.SyncEventHandler; +import com.zt.plat.module.databus.api.data.DatabusPostData; +import com.zt.plat.module.databus.api.message.DatabusMessage; +import com.zt.plat.module.databus.enums.DatabusEventType; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.stereotype.Component; + +/** + * 岗位更新事件处理器 + *

+ * 使用条件: + * 1. zt.databus.sync.client.enabled=true + * 2. 存在 PostSyncService Bean + * + * @author ZT + */ +@Slf4j +@Component +@ConditionalOnProperty(prefix = "zt.databus.sync.client", name = "enabled", havingValue = "true") +@ConditionalOnBean(PostSyncService.class) +public class SystemPostUpdateHandler implements SyncEventHandler { + + @Resource + private PostSyncService postSyncService; + + @Override + public DatabusEventType getSupportedEventType() { + return DatabusEventType.SYSTEM_POST_UPDATE; + } + + @Override + public void handle(DatabusMessage message) { + DatabusPostData data = message.getData(); + log.info("[PostSync] 收到岗位更新事件, id={}, name={}, code={}", + data.getId(), data.getName(), data.getCode()); + + try { + postSyncService.update(data); + log.info("[PostSync] 岗位更新成功, id={}", data.getId()); + } catch (Exception e) { + log.error("[PostSync] 岗位更新失败, id={}", data.getId(), e); + throw e; // 抛出异常触发重试 + } + } +} diff --git a/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/user/AdminUserSyncService.java b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/user/AdminUserSyncService.java new file mode 100644 index 00000000..3409a96f --- /dev/null +++ b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/user/AdminUserSyncService.java @@ -0,0 +1,44 @@ +package com.zt.plat.framework.databus.client.handler.user; + +import com.zt.plat.module.databus.api.data.DatabusAdminUserData; + +/** + * 用户同步服务接口 + *

+ * 分公司需要实现此接口,完成数据的本地持久化 + * 或通过默认实现 {@link AdminUserSyncServiceImpl} 使用 Feign 调用远程 API + * + * @author ZT + */ +public interface AdminUserSyncService { + + /** + * 创建用户(增量同步) + * + * @param data 用户数据 + */ + void create(DatabusAdminUserData data); + + /** + * 更新用户(增量同步) + * + * @param data 用户数据 + */ + void update(DatabusAdminUserData data); + + /** + * 删除用户(增量同步) + * + * @param id 用户ID + */ + void delete(Long id); + + /** + * 全量同步单条数据 + *

+ * 逻辑:存在则更新,不存在则插入 + * + * @param data 用户数据 + */ + void fullSync(DatabusAdminUserData data); +} diff --git a/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/user/AdminUserSyncServiceImpl.java b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/user/AdminUserSyncServiceImpl.java new file mode 100644 index 00000000..d40cfd51 --- /dev/null +++ b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/user/AdminUserSyncServiceImpl.java @@ -0,0 +1,112 @@ +package com.zt.plat.framework.databus.client.handler.user; + +import com.zt.plat.module.databus.api.data.DatabusAdminUserData; +import com.zt.plat.module.system.api.user.AdminUserApi; +import com.zt.plat.module.system.api.user.dto.AdminUserSaveReqDTO; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.stereotype.Service; + +import java.util.HashSet; +import java.util.Set; + +/** + * 用户同步服务实现(通过 Feign API 调用远程服务) + *

+ * 使用条件: + * 1. zt.databus.sync.client.enabled=true + * 2. 系统中存在 AdminUserApi 接口(Feign 客户端) + *

+ * 如果分公司需要自定义实现,可以创建自己的 AdminUserSyncService Bean, + * 此默认实现会自动失效(@ConditionalOnMissingBean) + * + * @author ZT + */ +@Slf4j +@Service +@ConditionalOnProperty(prefix = "zt.databus.sync.client", name = "enabled", havingValue = "true") +@ConditionalOnClass(name = "com.zt.plat.module.system.api.user.AdminUserApi") +public class AdminUserSyncServiceImpl implements AdminUserSyncService { + + @Autowired(required = false) + private AdminUserApi adminUserApi; // Feign 远程调用接口 + + @Override + public void create(DatabusAdminUserData data) { + if (adminUserApi == null) { + log.warn("[UserSync] AdminUserApi未注入,跳过创建用户操作, userId={}", data.getId()); + return; + } + AdminUserSaveReqDTO dto = buildUserDTO(data); + adminUserApi.createUser(dto).checkError(); + log.info("[UserSync] 用户创建成功, userId={}, username={}", dto.getId(), dto.getUsername()); + } + + @Override + public void update(DatabusAdminUserData data) { + if (adminUserApi == null) { + log.warn("[UserSync] AdminUserApi未注入,跳过更新用户操作, userId={}", data.getId()); + return; + } + AdminUserSaveReqDTO dto = buildUserDTO(data); + adminUserApi.updateUser(dto).checkError(); + log.info("[UserSync] 用户更新成功, userId={}, username={}", dto.getId(), dto.getUsername()); + } + + @Override + public void delete(Long id) { + if (adminUserApi == null) { + log.warn("[UserSync] AdminUserApi未注入,跳过删除用户操作, userId={}", id); + return; + } + adminUserApi.deleteUser(id).checkError(); + log.info("[UserSync] 用户删除成功, userId={}", id); + } + + @Override + public void fullSync(DatabusAdminUserData data) { + if (adminUserApi == null) { + log.warn("[UserSync] AdminUserApi未注入,跳过全量同步用户操作, userId={}", data.getId()); + return; + } + AdminUserSaveReqDTO dto = buildUserDTO(data); + try { + // 尝试获取,存在则更新,不存在则创建 + var existing = adminUserApi.getUser(dto.getId()); + if (existing.isSuccess() && existing.getData() != null) { + adminUserApi.updateUser(dto).checkError(); + log.info("[UserSync] 用户全量同步-更新成功, userId={}", dto.getId()); + } else { + adminUserApi.createUser(dto).checkError(); + log.info("[UserSync] 用户全量同步-创建成功, userId={}", dto.getId()); + } + } catch (Exception e) { + // 获取失败,尝试创建 + log.warn("[UserSync] 用户获取失败,尝试创建, userId={}", dto.getId()); + adminUserApi.createUser(dto).checkError(); + log.info("[UserSync] 用户全量同步-创建成功, userId={}", dto.getId()); + } + } + + /** + * 构建用户 DTO(用于 Feign 调用) + */ + private AdminUserSaveReqDTO buildUserDTO(DatabusAdminUserData data) { + AdminUserSaveReqDTO dto = new AdminUserSaveReqDTO(); + dto.setId(data.getId()); + dto.setUsername(data.getUsername()); + dto.setNickname(data.getNickname()); + dto.setRemark(data.getRemark()); + // 将 List 转换为 Set + dto.setDeptIds(data.getDeptIds() != null ? new HashSet<>(data.getDeptIds()) : null); + dto.setPostIds(data.getPostIds() != null ? new HashSet<>(data.getPostIds()) : null); + dto.setEmail(data.getEmail()); + dto.setMobile(data.getMobile()); + dto.setSex(data.getSex()); + dto.setAvatar(data.getAvatar()); + dto.setStatus(data.getStatus()); + return dto; + } +} diff --git a/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/user/SystemUserCreateHandler.java b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/user/SystemUserCreateHandler.java new file mode 100644 index 00000000..875043e7 --- /dev/null +++ b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/user/SystemUserCreateHandler.java @@ -0,0 +1,49 @@ +package com.zt.plat.framework.databus.client.handler.user; + +import com.zt.plat.framework.databus.client.handler.SyncEventHandler; +import com.zt.plat.module.databus.api.data.DatabusAdminUserData; +import com.zt.plat.module.databus.api.message.DatabusMessage; +import com.zt.plat.module.databus.enums.DatabusEventType; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.stereotype.Component; + +/** + * 用户创建事件处理器 + *

+ * 使用条件: + * 1. zt.databus.sync.client.enabled=true + * 2. 存在 AdminUserSyncService Bean + * + * @author ZT + */ +@Slf4j +@Component +@ConditionalOnProperty(prefix = "zt.databus.sync.client", name = "enabled", havingValue = "true") +@ConditionalOnBean(AdminUserSyncService.class) +public class SystemUserCreateHandler implements SyncEventHandler { + + @Resource + private AdminUserSyncService adminUserSyncService; + + @Override + public DatabusEventType getSupportedEventType() { + return DatabusEventType.SYSTEM_USER_CREATE; + } + + @Override + public void handle(DatabusMessage message) { + DatabusAdminUserData data = message.getData(); + log.info("[UserSync] 收到用户创建事件, id={}, username={}", data.getId(), data.getUsername()); + + try { + adminUserSyncService.create(data); + log.info("[UserSync] 用户创建成功, id={}", data.getId()); + } catch (Exception e) { + log.error("[UserSync] 用户创建失败, id={}", data.getId(), e); + throw e; // 抛出异常触发重试 + } + } +} diff --git a/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/user/SystemUserDeleteHandler.java b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/user/SystemUserDeleteHandler.java new file mode 100644 index 00000000..139286ae --- /dev/null +++ b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/user/SystemUserDeleteHandler.java @@ -0,0 +1,49 @@ +package com.zt.plat.framework.databus.client.handler.user; + +import com.zt.plat.framework.databus.client.handler.SyncEventHandler; +import com.zt.plat.module.databus.api.data.DatabusAdminUserData; +import com.zt.plat.module.databus.api.message.DatabusMessage; +import com.zt.plat.module.databus.enums.DatabusEventType; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.stereotype.Component; + +/** + * 用户删除事件处理器 + *

+ * 使用条件: + * 1. zt.databus.sync.client.enabled=true + * 2. 存在 AdminUserSyncService Bean + * + * @author ZT + */ +@Slf4j +@Component +@ConditionalOnProperty(prefix = "zt.databus.sync.client", name = "enabled", havingValue = "true") +@ConditionalOnBean(AdminUserSyncService.class) +public class SystemUserDeleteHandler implements SyncEventHandler { + + @Resource + private AdminUserSyncService adminUserSyncService; + + @Override + public DatabusEventType getSupportedEventType() { + return DatabusEventType.SYSTEM_USER_DELETE; + } + + @Override + public void handle(DatabusMessage message) { + DatabusAdminUserData data = message.getData(); + log.info("[UserSync] 收到用户删除事件, id={}", data.getId()); + + try { + adminUserSyncService.delete(data.getId()); + log.info("[UserSync] 用户删除成功, id={}", data.getId()); + } catch (Exception e) { + log.error("[UserSync] 用户删除失败, id={}", data.getId(), e); + throw e; // 抛出异常触发重试 + } + } +} diff --git a/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/user/SystemUserFullHandler.java b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/user/SystemUserFullHandler.java new file mode 100644 index 00000000..37a2dd72 --- /dev/null +++ b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/user/SystemUserFullHandler.java @@ -0,0 +1,68 @@ +package com.zt.plat.framework.databus.client.handler.user; + +import com.zt.plat.framework.databus.client.handler.BatchSyncEventHandler; +import com.zt.plat.module.databus.api.data.DatabusAdminUserData; +import com.zt.plat.module.databus.api.message.DatabusBatchMessage; +import com.zt.plat.module.databus.enums.DatabusEventType; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.stereotype.Component; + +/** + * 用户全量同步事件处理器(批量处理) + *

+ * 使用条件: + * 1. zt.databus.sync.client.enabled=true + * 2. 存在 AdminUserSyncService Bean + * + * @author ZT + */ +@Slf4j +@Component +@ConditionalOnProperty(prefix = "zt.databus.sync.client", name = "enabled", havingValue = "true") +@ConditionalOnBean(AdminUserSyncService.class) +public class SystemUserFullHandler implements BatchSyncEventHandler { + + @Resource + private AdminUserSyncService adminUserSyncService; + + @Override + public DatabusEventType getSupportedEventType() { + return DatabusEventType.SYSTEM_USER_FULL; + } + + @Override + public void onFullSyncStart(DatabusBatchMessage message) { + log.info("[UserSync] 开始用户全量同步, taskId={}, totalBatch={}", + message.getTaskId(), message.getTotalBatch()); + } + + @Override + public void handleBatch(DatabusBatchMessage message) { + log.info("[UserSync] 处理用户批次数据, taskId={}, batchNo={}/{}, size={}", + message.getTaskId(), message.getBatchNo(), message.getTotalBatch(), + message.getDataList().size()); + + // 逐条处理全量同步数据 + for (DatabusAdminUserData data : message.getDataList()) { + try { + adminUserSyncService.fullSync(data); + log.debug("[UserSync] 用户全量同步成功, id={}, username={}", data.getId(), data.getUsername()); + } catch (Exception e) { + log.error("[UserSync] 用户全量同步失败, id={}, username={}", data.getId(), data.getUsername(), e); + // 单条失败不影响其他数据,继续处理 + } + } + + log.info("[UserSync] 用户批次处理完成, taskId={}, batchNo={}/{}", + message.getTaskId(), message.getBatchNo(), message.getTotalBatch()); + } + + @Override + public void onFullSyncComplete(DatabusBatchMessage message) { + log.info("[UserSync] 用户全量同步完成, taskId={}, totalBatch={}", + message.getTaskId(), message.getTotalBatch()); + } +} diff --git a/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/user/SystemUserUpdateHandler.java b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/user/SystemUserUpdateHandler.java new file mode 100644 index 00000000..46704f92 --- /dev/null +++ b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/user/SystemUserUpdateHandler.java @@ -0,0 +1,49 @@ +package com.zt.plat.framework.databus.client.handler.user; + +import com.zt.plat.framework.databus.client.handler.SyncEventHandler; +import com.zt.plat.module.databus.api.data.DatabusAdminUserData; +import com.zt.plat.module.databus.api.message.DatabusMessage; +import com.zt.plat.module.databus.enums.DatabusEventType; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.stereotype.Component; + +/** + * 用户更新事件处理器 + *

+ * 使用条件: + * 1. zt.databus.sync.client.enabled=true + * 2. 存在 AdminUserSyncService Bean + * + * @author ZT + */ +@Slf4j +@Component +@ConditionalOnProperty(prefix = "zt.databus.sync.client", name = "enabled", havingValue = "true") +@ConditionalOnBean(AdminUserSyncService.class) +public class SystemUserUpdateHandler implements SyncEventHandler { + + @Resource + private AdminUserSyncService adminUserSyncService; + + @Override + public DatabusEventType getSupportedEventType() { + return DatabusEventType.SYSTEM_USER_UPDATE; + } + + @Override + public void handle(DatabusMessage message) { + DatabusAdminUserData data = message.getData(); + log.info("[UserSync] 收到用户更新事件, id={}, username={}", data.getId(), data.getUsername()); + + try { + adminUserSyncService.update(data); + log.info("[UserSync] 用户更新成功, id={}", data.getId()); + } catch (Exception e) { + log.error("[UserSync] 用户更新失败, id={}", data.getId(), e); + throw e; // 抛出异常触发重试 + } + } +}