diff --git a/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/dept/DeptSyncService.java b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/dept/DeptSyncService.java new file mode 100644 index 00000000..42feee49 --- /dev/null +++ b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/dept/DeptSyncService.java @@ -0,0 +1,44 @@ +package com.zt.plat.framework.databus.client.handler.dept; + +import com.zt.plat.module.databus.api.data.DatabusDeptData; + +/** + * 部门同步服务接口 + *

+ * 分公司需要实现此接口,完成数据的本地持久化 + * 或通过默认实现 {@link DeptSyncServiceImpl} 使用 Feign 调用远程 API + * + * @author ZT + */ +public interface DeptSyncService { + + /** + * 创建部门(增量同步) + * + * @param data 部门数据 + */ + void create(DatabusDeptData data); + + /** + * 更��部门(增量同步) + * + * @param data 部门数据 + */ + void update(DatabusDeptData data); + + /** + * 删除部门(增量同步) + * + * @param id 部门ID + */ + void delete(Long id); + + /** + * 全量同步单条数据 + *

+ * 逻辑:存在则更新,不存在则插入 + * + * @param data 部门数据 + */ + void fullSync(DatabusDeptData data); +} diff --git a/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/dept/DeptSyncServiceImpl.java b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/dept/DeptSyncServiceImpl.java new file mode 100644 index 00000000..ca760435 --- /dev/null +++ b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/dept/DeptSyncServiceImpl.java @@ -0,0 +1,105 @@ +package com.zt.plat.framework.databus.client.handler.dept; + +import com.zt.plat.module.databus.api.data.DatabusDeptData; +import com.zt.plat.module.system.api.dept.DeptApi; +import com.zt.plat.module.system.api.dept.dto.DeptSaveReqDTO; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.stereotype.Service; + +/** + * 部门同步服务实现(通过 Feign API 调用远程服务) + *

+ * 使用条件: + * 1. zt.databus.sync.client.enabled=true + * 2. 系统中存在 DeptApi 接口(Feign 客户端) + *

+ * 如果分公司需要自定义实现,可以创建自己的 DeptSyncService Bean, + * 此默认实现会自动失效(@ConditionalOnMissingBean) + * + * @author ZT + */ +@Slf4j +@Service +@ConditionalOnProperty(prefix = "zt.databus.sync.client", name = "enabled", havingValue = "true") +@ConditionalOnClass(name = "com.zt.plat.module.system.api.dept.DeptApi") +public class DeptSyncServiceImpl implements DeptSyncService { + + @Autowired(required = false) + private DeptApi deptApi; // Feign 远程调用接口 + + @Override + public void create(DatabusDeptData data) { + if (deptApi == null) { + log.warn("[DeptSync] DeptApi未注入,跳过创建部门操作, deptId={}", data.getId()); + return; + } + DeptSaveReqDTO dto = buildDeptDTO(data); + deptApi.createDept(dto).checkError(); + log.info("[DeptSync] 部门创建成功, deptId={}, deptName={}", dto.getId(), dto.getName()); + } + + @Override + public void update(DatabusDeptData data) { + if (deptApi == null) { + log.warn("[DeptSync] DeptApi未注入,跳过更新部门操作, deptId={}", data.getId()); + return; + } + DeptSaveReqDTO dto = buildDeptDTO(data); + deptApi.updateDept(dto).checkError(); + log.info("[DeptSync] 部门更新成功, deptId={}, deptName={}", dto.getId(), dto.getName()); + } + + @Override + public void delete(Long id) { + if (deptApi == null) { + log.warn("[DeptSync] DeptApi未注入,跳过删除部门操作, deptId={}", id); + return; + } + deptApi.deleteDept(id).checkError(); + log.info("[DeptSync] 部门删除成功, deptId={}", id); + } + + @Override + public void fullSync(DatabusDeptData data) { + if (deptApi == null) { + log.warn("[DeptSync] DeptApi未注入,跳过全量同步部门操作, deptId={}", data.getId()); + return; + } + DeptSaveReqDTO dto = buildDeptDTO(data); + try { + // 尝试获取,存在则更新,不存在则创建 + var existing = deptApi.getDept(dto.getId()); + if (existing.isSuccess() && existing.getData() != null) { + deptApi.updateDept(dto).checkError(); + log.info("[DeptSync] 部门全量同步-更新成功, deptId={}", dto.getId()); + } else { + deptApi.createDept(dto).checkError(); + log.info("[DeptSync] 部门全量同步-创建成功, deptId={}", dto.getId()); + } + } catch (Exception e) { + // 获取失败,尝试创建 + log.warn("[DeptSync] 部门获取失败,尝试创建, deptId={}", dto.getId()); + deptApi.createDept(dto).checkError(); + log.info("[DeptSync] 部门全量同步-创建成功, deptId={}", dto.getId()); + } + } + + /** + * 构建部门 DTO(用于 Feign 调用) + */ + private DeptSaveReqDTO buildDeptDTO(DatabusDeptData data) { + DeptSaveReqDTO dto = new DeptSaveReqDTO(); + dto.setId(data.getId()); + dto.setName(data.getName()); + dto.setParentId(data.getParentId()); + dto.setSort(data.getSort()); + dto.setLeaderUserId(data.getLeaderUserId()); + dto.setPhone(data.getPhone()); + dto.setEmail(data.getEmail()); + dto.setStatus(data.getStatus()); + return dto; + } +} diff --git a/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/dept/SystemDeptCreateHandler.java b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/dept/SystemDeptCreateHandler.java new file mode 100644 index 00000000..7bcad72f --- /dev/null +++ b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/dept/SystemDeptCreateHandler.java @@ -0,0 +1,50 @@ +package com.zt.plat.framework.databus.client.handler.dept; + +import com.zt.plat.framework.databus.client.handler.SyncEventHandler; +import com.zt.plat.module.databus.api.data.DatabusDeptData; +import com.zt.plat.module.databus.api.message.DatabusMessage; +import com.zt.plat.module.databus.enums.DatabusEventType; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.stereotype.Component; + +/** + * 部门创建事件处理器 + *

+ * 使用条件: + * 1. zt.databus.sync.client.enabled=true + * 2. 存在 DeptSyncService Bean + * + * @author ZT + */ +@Slf4j +@Component +@ConditionalOnProperty(prefix = "zt.databus.sync.client", name = "enabled", havingValue = "true") +@ConditionalOnBean(DeptSyncService.class) +public class SystemDeptCreateHandler implements SyncEventHandler { + + @Resource + private DeptSyncService deptSyncService; + + @Override + public DatabusEventType getSupportedEventType() { + return DatabusEventType.SYSTEM_DEPT_CREATE; + } + + @Override + public void handle(DatabusMessage message) { + DatabusDeptData data = message.getData(); + log.info("[DeptSync] 收到部门创建事件, id={}, name={}, parentId={}", + data.getId(), data.getName(), data.getParentId()); + + try { + deptSyncService.create(data); + log.info("[DeptSync] 部门创建成功, id={}", data.getId()); + } catch (Exception e) { + log.error("[DeptSync] 部门创建失败, id={}", data.getId(), e); + throw e; // 抛出异常触发重试 + } + } +} diff --git a/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/dept/SystemDeptDeleteHandler.java b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/dept/SystemDeptDeleteHandler.java new file mode 100644 index 00000000..4ac8c687 --- /dev/null +++ b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/dept/SystemDeptDeleteHandler.java @@ -0,0 +1,49 @@ +package com.zt.plat.framework.databus.client.handler.dept; + +import com.zt.plat.framework.databus.client.handler.SyncEventHandler; +import com.zt.plat.module.databus.api.data.DatabusDeptData; +import com.zt.plat.module.databus.api.message.DatabusMessage; +import com.zt.plat.module.databus.enums.DatabusEventType; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.stereotype.Component; + +/** + * 部门删除事件处理器 + *

+ * 使用条件: + * 1. zt.databus.sync.client.enabled=true + * 2. 存在 DeptSyncService Bean + * + * @author ZT + */ +@Slf4j +@Component +@ConditionalOnProperty(prefix = "zt.databus.sync.client", name = "enabled", havingValue = "true") +@ConditionalOnBean(DeptSyncService.class) +public class SystemDeptDeleteHandler implements SyncEventHandler { + + @Resource + private DeptSyncService deptSyncService; + + @Override + public DatabusEventType getSupportedEventType() { + return DatabusEventType.SYSTEM_DEPT_DELETE; + } + + @Override + public void handle(DatabusMessage message) { + DatabusDeptData data = message.getData(); + log.info("[DeptSync] 收到部门删除事件, id={}", data.getId()); + + try { + deptSyncService.delete(data.getId()); + log.info("[DeptSync] 部门删除成功, id={}", data.getId()); + } catch (Exception e) { + log.error("[DeptSync] 部门删除失败, id={}", data.getId(), e); + throw e; // 抛出异常触发重试 + } + } +} diff --git a/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/dept/SystemDeptFullHandler.java b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/dept/SystemDeptFullHandler.java new file mode 100644 index 00000000..fc68dc38 --- /dev/null +++ b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/dept/SystemDeptFullHandler.java @@ -0,0 +1,68 @@ +package com.zt.plat.framework.databus.client.handler.dept; + +import com.zt.plat.framework.databus.client.handler.BatchSyncEventHandler; +import com.zt.plat.module.databus.api.data.DatabusDeptData; +import com.zt.plat.module.databus.api.message.DatabusBatchMessage; +import com.zt.plat.module.databus.enums.DatabusEventType; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.stereotype.Component; + +/** + * 部门全量同步事件处理器(批量处理) + *

+ * 使用条件: + * 1. zt.databus.sync.client.enabled=true + * 2. 存在 DeptSyncService Bean + * + * @author ZT + */ +@Slf4j +@Component +@ConditionalOnProperty(prefix = "zt.databus.sync.client", name = "enabled", havingValue = "true") +@ConditionalOnBean(DeptSyncService.class) +public class SystemDeptFullHandler implements BatchSyncEventHandler { + + @Resource + private DeptSyncService deptSyncService; + + @Override + public DatabusEventType getSupportedEventType() { + return DatabusEventType.SYSTEM_DEPT_FULL; + } + + @Override + public void onFullSyncStart(DatabusBatchMessage message) { + log.info("[DeptSync] 开始部门全量同步, taskId={}, totalBatch={}", + message.getTaskId(), message.getTotalBatch()); + } + + @Override + public void handleBatch(DatabusBatchMessage message) { + log.info("[DeptSync] 处理部门批次数据, taskId={}, batchNo={}/{}, size={}", + message.getTaskId(), message.getBatchNo(), message.getTotalBatch(), + message.getDataList().size()); + + // 逐条处理全量同步数据 + for (DatabusDeptData data : message.getDataList()) { + try { + deptSyncService.fullSync(data); + log.debug("[DeptSync] 部门全量同步成功, id={}, name={}", data.getId(), data.getName()); + } catch (Exception e) { + log.error("[DeptSync] 部门全量同步失败, id={}, name={}", data.getId(), data.getName(), e); + // 单条失败不影响其他数据,继续处理 + } + } + + log.info("[DeptSync] 部门批次处理完成, taskId={}, batchNo={}/{}", + message.getTaskId(), message.getBatchNo(), message.getTotalBatch()); + } + + @Override + public void onFullSyncComplete(DatabusBatchMessage message) { + log.info("[DeptSync] 部门全量同步完成, taskId={}, totalBatch={}", + message.getTaskId(), message.getTotalBatch()); + } +} diff --git a/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/dept/SystemDeptUpdateHandler.java b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/dept/SystemDeptUpdateHandler.java new file mode 100644 index 00000000..8e7f68ed --- /dev/null +++ b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/dept/SystemDeptUpdateHandler.java @@ -0,0 +1,49 @@ +package com.zt.plat.framework.databus.client.handler.dept; + +import com.zt.plat.framework.databus.client.handler.SyncEventHandler; +import com.zt.plat.module.databus.api.data.DatabusDeptData; +import com.zt.plat.module.databus.api.message.DatabusMessage; +import com.zt.plat.module.databus.enums.DatabusEventType; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.stereotype.Component; + +/** + * 部门更新事件处理器 + *

+ * 使用条件: + * 1. zt.databus.sync.client.enabled=true + * 2. 存在 DeptSyncService Bean + * + * @author ZT + */ +@Slf4j +@Component +@ConditionalOnProperty(prefix = "zt.databus.sync.client", name = "enabled", havingValue = "true") +@ConditionalOnBean(DeptSyncService.class) +public class SystemDeptUpdateHandler implements SyncEventHandler { + + @Resource + private DeptSyncService deptSyncService; + + @Override + public DatabusEventType getSupportedEventType() { + return DatabusEventType.SYSTEM_DEPT_UPDATE; + } + + @Override + public void handle(DatabusMessage message) { + DatabusDeptData data = message.getData(); + log.info("[DeptSync] 收到部门更新事件, id={}, name={}", data.getId(), data.getName()); + + try { + deptSyncService.update(data); + log.info("[DeptSync] 部门更新成功, id={}", data.getId()); + } catch (Exception e) { + log.error("[DeptSync] 部门更新失败, id={}", data.getId(), e); + throw e; // 抛出异常触发重试 + } + } +} diff --git a/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/post/PostSyncService.java b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/post/PostSyncService.java index f42330a9..f5346d2d 100644 --- a/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/post/PostSyncService.java +++ b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/post/PostSyncService.java @@ -1,108 +1,43 @@ package com.zt.plat.framework.databus.client.handler.post; -import com.zt.plat.module.databus.api.data.PostData; -import com.zt.plat.module.system.api.dept.PostApi; -import com.zt.plat.module.system.api.dept.dto.PostSaveReqDTO; -import lombok.extern.slf4j.Slf4j; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; -import org.springframework.stereotype.Service; +import com.zt.plat.module.databus.api.data.DatabusPostData; /** - * 岗位同步业务逻辑 + * 岗位同步服务接口 *

- * 被各个 PostHandler 共享使用 + * 客户端可以自定义实现此接口,覆盖默认的 Feign 调用实现 + *

+ * 默认实现:{@link PostSyncServiceImpl}(通过 Feign 远程调用) * * @author ZT */ -@Slf4j -@Service -@ConditionalOnProperty(prefix = "zt.databus.sync.client", name = "enabled", havingValue = "true") -@ConditionalOnClass(name = "com.zt.plat.module.system.api.dept.PostApi") -public class PostSyncService { - - @Autowired(required = false) - private PostApi postApi; +public interface PostSyncService { /** * 创建岗位 + * + * @param data 岗位数据 */ - public void create(PostData data) { - if (postApi == null) { - log.warn("[PostSync] PostApi未注入,跳过创建岗位操作, postId={}", data.getId()); - return; - } - PostSaveReqDTO dto = buildPostDTO(data); - postApi.createPost(dto).checkError(); - log.info("[PostSync] 创建岗位成功, postId={}, postName={}", dto.getId(), dto.getName()); - } + void create(DatabusPostData data); /** * 更新岗位 + * + * @param data 岗位数据 */ - public void update(PostData data) { - if (postApi == null) { - log.warn("[PostSync] PostApi未注入,跳过更新岗位操作, postId={}", data.getId()); - return; - } - PostSaveReqDTO dto = buildPostDTO(data); - postApi.updatePost(dto).checkError(); - log.info("[PostSync] 更新岗位成功, postId={}, postName={}", dto.getId(), dto.getName()); - } + void update(DatabusPostData data); /** * 删除岗位 + * + * @param id 岗位 ID */ - public void delete(PostData data) { - if (postApi == null) { - log.warn("[PostSync] PostApi未注入,跳过删除岗位操作, postId={}", data.getId()); - return; - } - Long postId = data.getId(); - if (postId != null) { - postApi.deletePost(postId).checkError(); - log.info("[PostSync] 删除岗位成功, postId={}", postId); - } - } + void delete(Long id); /** - * 全量同步单条数据(存在则更新,不存在则创建) + * 全量同步岗位(创建或更新) + * + * @param data 岗位数据 */ - public void fullSync(PostData data) { - if (postApi == null) { - log.warn("[PostSync] PostApi未注入,跳过全量同步岗位操作, postId={}", data.getId()); - return; - } - PostSaveReqDTO dto = buildPostDTO(data); - try { - if (dto.getId() != null) { - var existing = postApi.getPost(dto.getId()); - if (existing.isSuccess() && existing.getData() != null) { - postApi.updatePost(dto).checkError(); - } else { - postApi.createPost(dto).checkError(); - } - } else { - postApi.createPost(dto).checkError(); - } - } catch (Exception e) { - postApi.createPost(dto).checkError(); - } - } - - /** - * 构建岗位DTO - */ - private PostSaveReqDTO buildPostDTO(PostData data) { - PostSaveReqDTO dto = new PostSaveReqDTO(); - dto.setId(data.getId()); - dto.setCode(data.getCode()); - dto.setName(data.getName()); - dto.setSort(data.getSort()); - dto.setStatus(data.getStatus()); - dto.setRemark(data.getRemark()); - return dto; - } - + void fullSync(DatabusPostData data); } diff --git a/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/post/PostSyncServiceImpl.java b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/post/PostSyncServiceImpl.java new file mode 100644 index 00000000..463fdc6e --- /dev/null +++ b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/post/PostSyncServiceImpl.java @@ -0,0 +1,103 @@ +package com.zt.plat.framework.databus.client.handler.post; + +import com.zt.plat.module.databus.api.data.DatabusPostData; +import com.zt.plat.module.system.api.dept.PostApi; +import com.zt.plat.module.system.api.dept.dto.PostSaveReqDTO; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.stereotype.Service; + +/** + * 岗位同步服务默认实现(通过 Feign 调用) + *

+ * 使用条件: + * 1. zt.databus.sync.client.enabled=true + * 2. PostApi 类存在于 classpath + *

+ * 客户端可以自定义实现 PostSyncService 接口覆盖此默认实现 + * + * @author ZT + */ +@Slf4j +@Service +@ConditionalOnProperty(prefix = "zt.databus.sync.client", name = "enabled", havingValue = "true") +@ConditionalOnClass(name = "com.zt.plat.module.system.api.dept.PostApi") +public class PostSyncServiceImpl implements PostSyncService { + + @Autowired(required = false) + private PostApi postApi; + + @Override + public void create(DatabusPostData data) { + if (postApi == null) { + log.warn("[PostSync] PostApi未注入,跳过创建岗位操作, postId={}", data.getId()); + return; + } + PostSaveReqDTO dto = buildPostDTO(data); + postApi.createPost(dto).checkError(); + log.info("[PostSync] 岗位创建成功, postId={}, postName={}", dto.getId(), dto.getName()); + } + + @Override + public void update(DatabusPostData data) { + if (postApi == null) { + log.warn("[PostSync] PostApi未注入,跳过更新岗位操作, postId={}", data.getId()); + return; + } + PostSaveReqDTO dto = buildPostDTO(data); + postApi.updatePost(dto).checkError(); + log.info("[PostSync] 岗位更新成功, postId={}, postName={}", dto.getId(), dto.getName()); + } + + @Override + public void delete(Long id) { + if (postApi == null) { + log.warn("[PostSync] PostApi未注入,跳过删除岗位操作, postId={}", id); + return; + } + postApi.deletePost(id).checkError(); + log.info("[PostSync] 岗位删除成功, postId={}", id); + } + + @Override + public void fullSync(DatabusPostData data) { + if (postApi == null) { + log.warn("[PostSync] PostApi未注入,跳过全量同步岗位操作, postId={}", data.getId()); + return; + } + PostSaveReqDTO dto = buildPostDTO(data); + try { + // 尝试获取,存在则更新,不存在则创建 + var existing = postApi.getPost(dto.getId()); + if (existing.isSuccess() && existing.getData() != null) { + postApi.updatePost(dto).checkError(); + log.info("[PostSync] 岗位全量同步-更新成功, postId={}, postName={}", dto.getId(), dto.getName()); + } else { + postApi.createPost(dto).checkError(); + log.info("[PostSync] 岗位全量同步-创建成功, postId={}, postName={}", dto.getId(), dto.getName()); + } + } catch (Exception e) { + // 获取失败,尝试创建 + try { + postApi.createPost(dto).checkError(); + log.info("[PostSync] 岗位全量同步-创建成功, postId={}, postName={}", dto.getId(), dto.getName()); + } catch (Exception createEx) { + log.error("[PostSync] 岗位全量同步失败, postId={}, postName={}", dto.getId(), dto.getName(), createEx); + throw createEx; + } + } + } + + private PostSaveReqDTO buildPostDTO(DatabusPostData data) { + PostSaveReqDTO dto = new PostSaveReqDTO(); + dto.setId(data.getId()); + dto.setName(data.getName()); + dto.setCode(data.getCode()); + dto.setSort(data.getSort()); + dto.setStatus(data.getStatus()); + dto.setRemark(data.getRemark()); + return dto; + } +} diff --git a/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/post/SystemPostCreateHandler.java b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/post/SystemPostCreateHandler.java new file mode 100644 index 00000000..98eb55df --- /dev/null +++ b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/post/SystemPostCreateHandler.java @@ -0,0 +1,50 @@ +package com.zt.plat.framework.databus.client.handler.post; + +import com.zt.plat.framework.databus.client.handler.SyncEventHandler; +import com.zt.plat.module.databus.api.data.DatabusPostData; +import com.zt.plat.module.databus.api.message.DatabusMessage; +import com.zt.plat.module.databus.enums.DatabusEventType; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.stereotype.Component; + +/** + * 岗位创建事件处理器 + *

+ * 使用条件: + * 1. zt.databus.sync.client.enabled=true + * 2. 存在 PostSyncService Bean + * + * @author ZT + */ +@Slf4j +@Component +@ConditionalOnProperty(prefix = "zt.databus.sync.client", name = "enabled", havingValue = "true") +@ConditionalOnBean(PostSyncService.class) +public class SystemPostCreateHandler implements SyncEventHandler { + + @Resource + private PostSyncService postSyncService; + + @Override + public DatabusEventType getSupportedEventType() { + return DatabusEventType.SYSTEM_POST_CREATE; + } + + @Override + public void handle(DatabusMessage message) { + DatabusPostData data = message.getData(); + log.info("[PostSync] 收到岗位创建事件, id={}, name={}, code={}", + data.getId(), data.getName(), data.getCode()); + + try { + postSyncService.create(data); + log.info("[PostSync] 岗位创建成功, id={}", data.getId()); + } catch (Exception e) { + log.error("[PostSync] 岗位创建失败, id={}", data.getId(), e); + throw e; // 抛出异常触发重试 + } + } +} diff --git a/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/post/SystemPostDeleteHandler.java b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/post/SystemPostDeleteHandler.java new file mode 100644 index 00000000..645648a2 --- /dev/null +++ b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/post/SystemPostDeleteHandler.java @@ -0,0 +1,49 @@ +package com.zt.plat.framework.databus.client.handler.post; + +import com.zt.plat.framework.databus.client.handler.SyncEventHandler; +import com.zt.plat.module.databus.api.data.DatabusPostData; +import com.zt.plat.module.databus.api.message.DatabusMessage; +import com.zt.plat.module.databus.enums.DatabusEventType; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.stereotype.Component; + +/** + * 岗位删除事件处理器 + *

+ * 使用条件: + * 1. zt.databus.sync.client.enabled=true + * 2. 存在 PostSyncService Bean + * + * @author ZT + */ +@Slf4j +@Component +@ConditionalOnProperty(prefix = "zt.databus.sync.client", name = "enabled", havingValue = "true") +@ConditionalOnBean(PostSyncService.class) +public class SystemPostDeleteHandler implements SyncEventHandler { + + @Resource + private PostSyncService postSyncService; + + @Override + public DatabusEventType getSupportedEventType() { + return DatabusEventType.SYSTEM_POST_DELETE; + } + + @Override + public void handle(DatabusMessage message) { + DatabusPostData data = message.getData(); + log.info("[PostSync] 收到岗位删除事件, id={}", data.getId()); + + try { + postSyncService.delete(data.getId()); + log.info("[PostSync] 岗位删除成功, id={}", data.getId()); + } catch (Exception e) { + log.error("[PostSync] 岗位删除失败, id={}", data.getId(), e); + throw e; // 抛出异常触发重试 + } + } +} diff --git a/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/post/SystemPostFullHandler.java b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/post/SystemPostFullHandler.java index 50727a74..10be89a0 100644 --- a/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/post/SystemPostFullHandler.java +++ b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/post/SystemPostFullHandler.java @@ -1,7 +1,7 @@ package com.zt.plat.framework.databus.client.handler.post; import com.zt.plat.framework.databus.client.handler.BatchSyncEventHandler; -import com.zt.plat.module.databus.api.data.PostData; +import com.zt.plat.module.databus.api.data.DatabusPostData; import com.zt.plat.module.databus.api.message.DatabusBatchMessage; import com.zt.plat.module.databus.enums.DatabusEventType; import jakarta.annotation.Resource; @@ -11,7 +11,11 @@ import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.stereotype.Component; /** - * 岗位全量同步事件处理器 + * 岗位全量同步事件处理器(批量处理) + *

+ * 使用条件: + * 1. zt.databus.sync.client.enabled=true + * 2. 存在 PostSyncService Bean * * @author ZT */ @@ -19,7 +23,7 @@ import org.springframework.stereotype.Component; @Component @ConditionalOnProperty(prefix = "zt.databus.sync.client", name = "enabled", havingValue = "true") @ConditionalOnBean(PostSyncService.class) -public class SystemPostFullHandler implements BatchSyncEventHandler { +public class SystemPostFullHandler implements BatchSyncEventHandler { @Resource private PostSyncService postSyncService; @@ -30,42 +34,35 @@ public class SystemPostFullHandler implements BatchSyncEventHandler { } @Override - public void onFullSyncStart(DatabusBatchMessage message) { - log.info("[PostSync] 全量同步开始, taskId={}, totalCount={}, totalBatch={}", - message.getTaskId(), message.getTotalCount(), message.getTotalBatch()); + public void onFullSyncStart(DatabusBatchMessage message) { + log.info("[PostSync] 开始岗位全量同步, taskId={}, totalBatch={}", + message.getTaskId(), message.getTotalBatch()); } @Override - public void handleBatch(DatabusBatchMessage message) { - log.info("[PostSync] 处理批次, batchNo={}/{}, count={}", - message.getBatchNo(), message.getTotalBatch(), message.getCount()); + public void handleBatch(DatabusBatchMessage message) { + log.info("[PostSync] 处理岗位批次数据, taskId={}, batchNo={}/{}, size={}", + message.getTaskId(), message.getBatchNo(), message.getTotalBatch(), + message.getDataList().size()); - if (message.getDataList() == null || message.getDataList().isEmpty()) { - log.warn("[PostSync] 数据列表为空, batchNo={}", message.getBatchNo()); - return; - } - - int successCount = 0; - int failCount = 0; - - for (PostData data : message.getDataList()) { + // 逐条处理全量同步数据 + for (DatabusPostData data : message.getDataList()) { try { postSyncService.fullSync(data); - successCount++; + log.debug("[PostSync] 岗位全量同步成功, id={}, name={}", data.getId(), data.getName()); } catch (Exception e) { - failCount++; - log.error("[PostSync] 处理数据项失败, postId={}", data.getId(), e); + log.error("[PostSync] 岗位全量同步失败, id={}, name={}", data.getId(), data.getName(), e); + // 单条失败不影响其他数据,继续处理 } } - log.info("[PostSync] 批次处理完成, batchNo={}, success={}, fail={}", - message.getBatchNo(), successCount, failCount); + log.info("[PostSync] 岗位批次处理完成, taskId={}, batchNo={}/{}", + message.getTaskId(), message.getBatchNo(), message.getTotalBatch()); } @Override - public void onFullSyncComplete(DatabusBatchMessage message) { - log.info("[PostSync] 全量同步完成, taskId={}, totalCount={}", - message.getTaskId(), message.getTotalCount()); + public void onFullSyncComplete(DatabusBatchMessage message) { + log.info("[PostSync] 岗位全量同步完成, taskId={}, totalBatch={}", + message.getTaskId(), message.getTotalBatch()); } - } diff --git a/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/post/SystemPostUpdateHandler.java b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/post/SystemPostUpdateHandler.java new file mode 100644 index 00000000..e8d40e82 --- /dev/null +++ b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/post/SystemPostUpdateHandler.java @@ -0,0 +1,50 @@ +package com.zt.plat.framework.databus.client.handler.post; + +import com.zt.plat.framework.databus.client.handler.SyncEventHandler; +import com.zt.plat.module.databus.api.data.DatabusPostData; +import com.zt.plat.module.databus.api.message.DatabusMessage; +import com.zt.plat.module.databus.enums.DatabusEventType; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.stereotype.Component; + +/** + * 岗位更新事件处理器 + *

+ * 使用条件: + * 1. zt.databus.sync.client.enabled=true + * 2. 存在 PostSyncService Bean + * + * @author ZT + */ +@Slf4j +@Component +@ConditionalOnProperty(prefix = "zt.databus.sync.client", name = "enabled", havingValue = "true") +@ConditionalOnBean(PostSyncService.class) +public class SystemPostUpdateHandler implements SyncEventHandler { + + @Resource + private PostSyncService postSyncService; + + @Override + public DatabusEventType getSupportedEventType() { + return DatabusEventType.SYSTEM_POST_UPDATE; + } + + @Override + public void handle(DatabusMessage message) { + DatabusPostData data = message.getData(); + log.info("[PostSync] 收到岗位更新事件, id={}, name={}, code={}", + data.getId(), data.getName(), data.getCode()); + + try { + postSyncService.update(data); + log.info("[PostSync] 岗位更新成功, id={}", data.getId()); + } catch (Exception e) { + log.error("[PostSync] 岗位更新失败, id={}", data.getId(), e); + throw e; // 抛出异常触发重试 + } + } +} diff --git a/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/user/AdminUserSyncService.java b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/user/AdminUserSyncService.java new file mode 100644 index 00000000..3409a96f --- /dev/null +++ b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/user/AdminUserSyncService.java @@ -0,0 +1,44 @@ +package com.zt.plat.framework.databus.client.handler.user; + +import com.zt.plat.module.databus.api.data.DatabusAdminUserData; + +/** + * 用户同步服务接口 + *

+ * 分公司需要实现此接口,完成数据的本地持久化 + * 或通过默认实现 {@link AdminUserSyncServiceImpl} 使用 Feign 调用远程 API + * + * @author ZT + */ +public interface AdminUserSyncService { + + /** + * 创建用户(增量同步) + * + * @param data 用户数据 + */ + void create(DatabusAdminUserData data); + + /** + * 更新用户(增量同步) + * + * @param data 用户数据 + */ + void update(DatabusAdminUserData data); + + /** + * 删除用户(增量同步) + * + * @param id 用户ID + */ + void delete(Long id); + + /** + * 全量同步单条数据 + *

+ * 逻辑:存在则更新,不存在则插入 + * + * @param data 用户数据 + */ + void fullSync(DatabusAdminUserData data); +} diff --git a/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/user/AdminUserSyncServiceImpl.java b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/user/AdminUserSyncServiceImpl.java new file mode 100644 index 00000000..d40cfd51 --- /dev/null +++ b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/user/AdminUserSyncServiceImpl.java @@ -0,0 +1,112 @@ +package com.zt.plat.framework.databus.client.handler.user; + +import com.zt.plat.module.databus.api.data.DatabusAdminUserData; +import com.zt.plat.module.system.api.user.AdminUserApi; +import com.zt.plat.module.system.api.user.dto.AdminUserSaveReqDTO; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.stereotype.Service; + +import java.util.HashSet; +import java.util.Set; + +/** + * 用户同步服务实现(通过 Feign API 调用远程服务) + *

+ * 使用条件: + * 1. zt.databus.sync.client.enabled=true + * 2. 系统中存在 AdminUserApi 接口(Feign 客户端) + *

+ * 如果分公司需要自定义实现,可以创建自己的 AdminUserSyncService Bean, + * 此默认实现会自动失效(@ConditionalOnMissingBean) + * + * @author ZT + */ +@Slf4j +@Service +@ConditionalOnProperty(prefix = "zt.databus.sync.client", name = "enabled", havingValue = "true") +@ConditionalOnClass(name = "com.zt.plat.module.system.api.user.AdminUserApi") +public class AdminUserSyncServiceImpl implements AdminUserSyncService { + + @Autowired(required = false) + private AdminUserApi adminUserApi; // Feign 远程调用接口 + + @Override + public void create(DatabusAdminUserData data) { + if (adminUserApi == null) { + log.warn("[UserSync] AdminUserApi未注入,跳过创建用户操作, userId={}", data.getId()); + return; + } + AdminUserSaveReqDTO dto = buildUserDTO(data); + adminUserApi.createUser(dto).checkError(); + log.info("[UserSync] 用户创建成功, userId={}, username={}", dto.getId(), dto.getUsername()); + } + + @Override + public void update(DatabusAdminUserData data) { + if (adminUserApi == null) { + log.warn("[UserSync] AdminUserApi未注入,跳过更新用户操作, userId={}", data.getId()); + return; + } + AdminUserSaveReqDTO dto = buildUserDTO(data); + adminUserApi.updateUser(dto).checkError(); + log.info("[UserSync] 用户更新成功, userId={}, username={}", dto.getId(), dto.getUsername()); + } + + @Override + public void delete(Long id) { + if (adminUserApi == null) { + log.warn("[UserSync] AdminUserApi未注入,跳过删除用户操作, userId={}", id); + return; + } + adminUserApi.deleteUser(id).checkError(); + log.info("[UserSync] 用户删除成功, userId={}", id); + } + + @Override + public void fullSync(DatabusAdminUserData data) { + if (adminUserApi == null) { + log.warn("[UserSync] AdminUserApi未注入,跳过全量同步用户操作, userId={}", data.getId()); + return; + } + AdminUserSaveReqDTO dto = buildUserDTO(data); + try { + // 尝试获取,存在则更新,不存在则创建 + var existing = adminUserApi.getUser(dto.getId()); + if (existing.isSuccess() && existing.getData() != null) { + adminUserApi.updateUser(dto).checkError(); + log.info("[UserSync] 用户全量同步-更新成功, userId={}", dto.getId()); + } else { + adminUserApi.createUser(dto).checkError(); + log.info("[UserSync] 用户全量同步-创建成功, userId={}", dto.getId()); + } + } catch (Exception e) { + // 获取失败,尝试创建 + log.warn("[UserSync] 用户获取失败,尝试创建, userId={}", dto.getId()); + adminUserApi.createUser(dto).checkError(); + log.info("[UserSync] 用户全量同步-创建成功, userId={}", dto.getId()); + } + } + + /** + * 构建用户 DTO(用于 Feign 调用) + */ + private AdminUserSaveReqDTO buildUserDTO(DatabusAdminUserData data) { + AdminUserSaveReqDTO dto = new AdminUserSaveReqDTO(); + dto.setId(data.getId()); + dto.setUsername(data.getUsername()); + dto.setNickname(data.getNickname()); + dto.setRemark(data.getRemark()); + // 将 List 转换为 Set + dto.setDeptIds(data.getDeptIds() != null ? new HashSet<>(data.getDeptIds()) : null); + dto.setPostIds(data.getPostIds() != null ? new HashSet<>(data.getPostIds()) : null); + dto.setEmail(data.getEmail()); + dto.setMobile(data.getMobile()); + dto.setSex(data.getSex()); + dto.setAvatar(data.getAvatar()); + dto.setStatus(data.getStatus()); + return dto; + } +} diff --git a/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/user/SystemUserCreateHandler.java b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/user/SystemUserCreateHandler.java new file mode 100644 index 00000000..875043e7 --- /dev/null +++ b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/user/SystemUserCreateHandler.java @@ -0,0 +1,49 @@ +package com.zt.plat.framework.databus.client.handler.user; + +import com.zt.plat.framework.databus.client.handler.SyncEventHandler; +import com.zt.plat.module.databus.api.data.DatabusAdminUserData; +import com.zt.plat.module.databus.api.message.DatabusMessage; +import com.zt.plat.module.databus.enums.DatabusEventType; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.stereotype.Component; + +/** + * 用户创建事件处理器 + *

+ * 使用条件: + * 1. zt.databus.sync.client.enabled=true + * 2. 存在 AdminUserSyncService Bean + * + * @author ZT + */ +@Slf4j +@Component +@ConditionalOnProperty(prefix = "zt.databus.sync.client", name = "enabled", havingValue = "true") +@ConditionalOnBean(AdminUserSyncService.class) +public class SystemUserCreateHandler implements SyncEventHandler { + + @Resource + private AdminUserSyncService adminUserSyncService; + + @Override + public DatabusEventType getSupportedEventType() { + return DatabusEventType.SYSTEM_USER_CREATE; + } + + @Override + public void handle(DatabusMessage message) { + DatabusAdminUserData data = message.getData(); + log.info("[UserSync] 收到用户创建事件, id={}, username={}", data.getId(), data.getUsername()); + + try { + adminUserSyncService.create(data); + log.info("[UserSync] 用户创建成功, id={}", data.getId()); + } catch (Exception e) { + log.error("[UserSync] 用户创建失败, id={}", data.getId(), e); + throw e; // 抛出异常触发重试 + } + } +} diff --git a/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/user/SystemUserDeleteHandler.java b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/user/SystemUserDeleteHandler.java new file mode 100644 index 00000000..139286ae --- /dev/null +++ b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/user/SystemUserDeleteHandler.java @@ -0,0 +1,49 @@ +package com.zt.plat.framework.databus.client.handler.user; + +import com.zt.plat.framework.databus.client.handler.SyncEventHandler; +import com.zt.plat.module.databus.api.data.DatabusAdminUserData; +import com.zt.plat.module.databus.api.message.DatabusMessage; +import com.zt.plat.module.databus.enums.DatabusEventType; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.stereotype.Component; + +/** + * 用户删除事件处理器 + *

+ * 使用条件: + * 1. zt.databus.sync.client.enabled=true + * 2. 存在 AdminUserSyncService Bean + * + * @author ZT + */ +@Slf4j +@Component +@ConditionalOnProperty(prefix = "zt.databus.sync.client", name = "enabled", havingValue = "true") +@ConditionalOnBean(AdminUserSyncService.class) +public class SystemUserDeleteHandler implements SyncEventHandler { + + @Resource + private AdminUserSyncService adminUserSyncService; + + @Override + public DatabusEventType getSupportedEventType() { + return DatabusEventType.SYSTEM_USER_DELETE; + } + + @Override + public void handle(DatabusMessage message) { + DatabusAdminUserData data = message.getData(); + log.info("[UserSync] 收到用户删除事件, id={}", data.getId()); + + try { + adminUserSyncService.delete(data.getId()); + log.info("[UserSync] 用户删除成功, id={}", data.getId()); + } catch (Exception e) { + log.error("[UserSync] 用户删除失败, id={}", data.getId(), e); + throw e; // 抛出异常触发重试 + } + } +} diff --git a/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/user/SystemUserFullHandler.java b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/user/SystemUserFullHandler.java new file mode 100644 index 00000000..37a2dd72 --- /dev/null +++ b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/user/SystemUserFullHandler.java @@ -0,0 +1,68 @@ +package com.zt.plat.framework.databus.client.handler.user; + +import com.zt.plat.framework.databus.client.handler.BatchSyncEventHandler; +import com.zt.plat.module.databus.api.data.DatabusAdminUserData; +import com.zt.plat.module.databus.api.message.DatabusBatchMessage; +import com.zt.plat.module.databus.enums.DatabusEventType; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.stereotype.Component; + +/** + * 用户全量同步事件处理器(批量处理) + *

+ * 使用条件: + * 1. zt.databus.sync.client.enabled=true + * 2. 存在 AdminUserSyncService Bean + * + * @author ZT + */ +@Slf4j +@Component +@ConditionalOnProperty(prefix = "zt.databus.sync.client", name = "enabled", havingValue = "true") +@ConditionalOnBean(AdminUserSyncService.class) +public class SystemUserFullHandler implements BatchSyncEventHandler { + + @Resource + private AdminUserSyncService adminUserSyncService; + + @Override + public DatabusEventType getSupportedEventType() { + return DatabusEventType.SYSTEM_USER_FULL; + } + + @Override + public void onFullSyncStart(DatabusBatchMessage message) { + log.info("[UserSync] 开始用户全量同步, taskId={}, totalBatch={}", + message.getTaskId(), message.getTotalBatch()); + } + + @Override + public void handleBatch(DatabusBatchMessage message) { + log.info("[UserSync] 处理用户批次数据, taskId={}, batchNo={}/{}, size={}", + message.getTaskId(), message.getBatchNo(), message.getTotalBatch(), + message.getDataList().size()); + + // 逐条处理全量同步数据 + for (DatabusAdminUserData data : message.getDataList()) { + try { + adminUserSyncService.fullSync(data); + log.debug("[UserSync] 用户全量同步成功, id={}, username={}", data.getId(), data.getUsername()); + } catch (Exception e) { + log.error("[UserSync] 用户全量同步失败, id={}, username={}", data.getId(), data.getUsername(), e); + // 单条失败不影响其他数据,继续处理 + } + } + + log.info("[UserSync] 用户批次处理完成, taskId={}, batchNo={}/{}", + message.getTaskId(), message.getBatchNo(), message.getTotalBatch()); + } + + @Override + public void onFullSyncComplete(DatabusBatchMessage message) { + log.info("[UserSync] 用户全量同步完成, taskId={}, totalBatch={}", + message.getTaskId(), message.getTotalBatch()); + } +} diff --git a/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/user/SystemUserUpdateHandler.java b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/user/SystemUserUpdateHandler.java new file mode 100644 index 00000000..46704f92 --- /dev/null +++ b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/user/SystemUserUpdateHandler.java @@ -0,0 +1,49 @@ +package com.zt.plat.framework.databus.client.handler.user; + +import com.zt.plat.framework.databus.client.handler.SyncEventHandler; +import com.zt.plat.module.databus.api.data.DatabusAdminUserData; +import com.zt.plat.module.databus.api.message.DatabusMessage; +import com.zt.plat.module.databus.enums.DatabusEventType; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.stereotype.Component; + +/** + * 用户更新事件处理器 + *

+ * 使用条件: + * 1. zt.databus.sync.client.enabled=true + * 2. 存在 AdminUserSyncService Bean + * + * @author ZT + */ +@Slf4j +@Component +@ConditionalOnProperty(prefix = "zt.databus.sync.client", name = "enabled", havingValue = "true") +@ConditionalOnBean(AdminUserSyncService.class) +public class SystemUserUpdateHandler implements SyncEventHandler { + + @Resource + private AdminUserSyncService adminUserSyncService; + + @Override + public DatabusEventType getSupportedEventType() { + return DatabusEventType.SYSTEM_USER_UPDATE; + } + + @Override + public void handle(DatabusMessage message) { + DatabusAdminUserData data = message.getData(); + log.info("[UserSync] 收到用户更新事件, id={}, username={}", data.getId(), data.getUsername()); + + try { + adminUserSyncService.update(data); + log.info("[UserSync] 用户更新成功, id={}", data.getId()); + } catch (Exception e) { + log.error("[UserSync] 用户更新失败, id={}", data.getId(), e); + throw e; // 抛出异常触发重试 + } + } +}