diff --git a/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/core/listener/DatabusConsumerRegistry.java b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/core/listener/DatabusConsumerRegistry.java deleted file mode 100644 index a38a9fe9..00000000 --- a/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/core/listener/DatabusConsumerRegistry.java +++ /dev/null @@ -1,148 +0,0 @@ -package com.zt.plat.framework.databus.client.core.listener; - -import com.zt.plat.framework.databus.client.config.DatabusSyncClientProperties; -import com.zt.plat.framework.databus.client.core.processor.MessageProcessor; -import com.zt.plat.module.databus.enums.DatabusEventType; -import jakarta.annotation.PostConstruct; -import jakarta.annotation.PreDestroy; -import lombok.extern.slf4j.Slf4j; -import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; -import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; -import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; -import org.apache.rocketmq.client.exception.MQClientException; -import org.apache.rocketmq.common.message.MessageExt; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; -import org.springframework.stereotype.Component; - -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.List; - -/** - * Databus Consumer 注册器 - *

- * 根据 DatabusEventType 枚举自动为所有事件类型创建消费者 - * Topic格式: {topicBase}-{module}-{entity}-{action}-{clientCode} - * 例如: databus-sync-system-user-create-company-a - * - * @author ZT - */ -@Slf4j -@Component -@ConditionalOnProperty(prefix = "zt.databus.sync.client.mq", name = "enabled", havingValue = "true", matchIfMissing = true) -public class DatabusConsumerRegistry { - - private final MessageProcessor messageProcessor; - private final DatabusSyncClientProperties properties; - - - /** - * 管理的消费者列表 - */ - private final List consumers = new ArrayList<>(); - - public DatabusConsumerRegistry(MessageProcessor messageProcessor, DatabusSyncClientProperties properties) { - this.messageProcessor = messageProcessor; - this.properties = properties; - } - - @PostConstruct - public void init() { - if (!Boolean.TRUE.equals(properties.getEnabled())) { - log.info("[Databus Client] 客户端未启用,跳过初始化"); - return; - } - - String nameServer = properties.getMq().getNameServer(); - if (nameServer == null || nameServer.isEmpty()) { - log.warn("[Databus Client] RocketMQ nameServer未配置,跳过MQ消费者初始化"); - return; - } - - String clientCode = properties.getClientCode(); - if (clientCode == null || clientCode.isEmpty()) { - log.error("[Databus Client] clientCode未配置,无法订阅Topic"); - return; - } - - String topicBase = properties.getMq().getTopicBase(); - String consumerGroupPrefix = properties.getMq().getConsumerGroupPrefix(); - if (consumerGroupPrefix == null || consumerGroupPrefix.isEmpty()) { - consumerGroupPrefix = "databus-client-" + clientCode; - } - - // 为每个事件类型创建独立的消费者 - for (DatabusEventType eventType : DatabusEventType.values()) { - try { - createConsumer(eventType, topicBase, clientCode, consumerGroupPrefix, nameServer); - } catch (Exception e) { - log.error("[Databus Client] 创建消费者失败, eventType={}", eventType.name(), e); - } - } - - log.info("[Databus Client] 消费者注册完成,共创建 {} 个消费者", consumers.size()); - } - - /** - * 为指定事件类型创建消费者 - */ - private void createConsumer(DatabusEventType eventType, String topicBase, String clientCode, String consumerGroupPrefix, String nameServer) throws MQClientException { - // Topic: databus-sync-system-user-create-company-a - String topic = eventType.getTopic(topicBase, clientCode); - // ConsumerGroup: databus-client-company-a-system-user-create - String consumerGroup = String.format("%s-%s", consumerGroupPrefix, eventType.getTopicSuffix()); - - DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup); - consumer.setNamesrvAddr(nameServer); - consumer.setConsumeThreadMin(properties.getMq().getConsumeThreadMin()); - consumer.setConsumeThreadMax(properties.getMq().getConsumeThreadMax()); - consumer.setMaxReconsumeTimes(properties.getMq().getMaxReconsumeTimes()); - - // 订阅Topic - consumer.subscribe(topic, "*"); - - // 设置消息监听器 - consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> { - for (MessageExt msg : msgs) { - try { - String messageBody = new String(msg.getBody(), StandardCharsets.UTF_8); - log.info("[Databus Client] 收到消息, topic={}, msgId={}, eventType={}", - msg.getTopic(), msg.getMsgId(), eventType.name()); - - messageProcessor.process(messageBody, eventType); - - } catch (Exception e) { - log.error("[Databus Client] 消息处理失败, topic={}, msgId={}, reconsumeTimes={}", - msg.getTopic(), msg.getMsgId(), msg.getReconsumeTimes(), e); - - if (msg.getReconsumeTimes() >= properties.getMq().getMaxReconsumeTimes()) { - log.error("[Databus Client] 重试次数已达上限,放弃处理, msgId={}", msg.getMsgId()); - return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; - } - return ConsumeConcurrentlyStatus.RECONSUME_LATER; - } - } - return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; - }); - - consumer.start(); - consumers.add(consumer); - - log.info("[Databus Client] 消费者启动成功, topic={}, consumerGroup={}, event={}", - topic, consumerGroup, eventType.getName()); - } - - @PreDestroy - public void destroy() { - for (DefaultMQPushConsumer consumer : consumers) { - try { - consumer.shutdown(); - } catch (Exception e) { - log.error("[Databus Client] 关闭消费者失败", e); - } - } - consumers.clear(); - log.info("[Databus Client] 所有消费者已关闭"); - } - -} diff --git a/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/core/registry/HandlerRegistry.java b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/core/registry/HandlerRegistry.java index a1baa90d..1a6ad276 100644 --- a/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/core/registry/HandlerRegistry.java +++ b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/core/registry/HandlerRegistry.java @@ -66,13 +66,15 @@ public class HandlerRegistry { public void init() { log.info("[HandlerRegistry] 开始初始化 Handler 注册表..."); - // 注册增量同步 Handler + // 注册增量同步 Handler(支持一个Handler注册多个事件类型) if (syncEventHandlers != null) { for (SyncEventHandler handler : syncEventHandlers) { - DatabusEventType eventType = handler.getSupportedEventType(); - incrementalHandlers.put(eventType, handler); - log.info("[HandlerRegistry] 注册增量Handler: {} -> {}", - eventType, handler.getClass().getSimpleName()); + List eventTypes = handler.getSupportedEventTypes(); + for (DatabusEventType eventType : eventTypes) { + incrementalHandlers.put(eventType, handler); + log.info("[HandlerRegistry] 注册增量Handler: {} -> {}", + eventType, handler.getClass().getSimpleName()); + } } } diff --git a/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/SyncEventHandler.java b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/SyncEventHandler.java index f89906d0..573853da 100644 --- a/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/SyncEventHandler.java +++ b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/SyncEventHandler.java @@ -5,22 +5,49 @@ import com.zt.plat.module.databus.enums.DatabusEventType; import java.lang.reflect.ParameterizedType; import java.lang.reflect.Type; +import java.util.Collections; +import java.util.List; /** * 同步事件处理器接口 *

* 业务系统需要实现此接口来处理增量数据同步 + *

+ * 支持单事件和多事件处理: + * - 单事件:实现 getSupportedEventType() + * - 多事件:实现 getSupportedEventTypes()(推荐,一个Handler处理多个操作) * * @author ZT */ public interface SyncEventHandler { /** - * 获取支持的事件类型 + * 获取支持的事件类型(单个) + *

+ * 默认实现:返回 getSupportedEventTypes() 的第一个元素 + *

+ * 建议:新Handler优先实现 getSupportedEventTypes() * * @return 事件类型枚举 */ - DatabusEventType getSupportedEventType(); + default DatabusEventType getSupportedEventType() { + List types = getSupportedEventTypes(); + return types.isEmpty() ? null : types.get(0); + } + + /** + * 获取支持的事件类型列表(多个) + *

+ * 默认实现:返回包含 getSupportedEventType() 的单元素列表 + *

+ * 子类可以覆盖此方法返回多个事件类型,从而一个Handler处理多个操作 + * + * @return 事件类型列表 + */ + default List getSupportedEventTypes() { + DatabusEventType type = getSupportedEventType(); + return type == null ? Collections.emptyList() : Collections.singletonList(type); + } /** * 处理同步消息 diff --git a/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/dept/SystemDeptCreateHandler.java b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/dept/SystemDeptCreateHandler.java deleted file mode 100644 index 7bcad72f..00000000 --- a/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/dept/SystemDeptCreateHandler.java +++ /dev/null @@ -1,50 +0,0 @@ -package com.zt.plat.framework.databus.client.handler.dept; - -import com.zt.plat.framework.databus.client.handler.SyncEventHandler; -import com.zt.plat.module.databus.api.data.DatabusDeptData; -import com.zt.plat.module.databus.api.message.DatabusMessage; -import com.zt.plat.module.databus.enums.DatabusEventType; -import jakarta.annotation.Resource; -import lombok.extern.slf4j.Slf4j; -import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; -import org.springframework.stereotype.Component; - -/** - * 部门创建事件处理器 - *

- * 使用条件: - * 1. zt.databus.sync.client.enabled=true - * 2. 存在 DeptSyncService Bean - * - * @author ZT - */ -@Slf4j -@Component -@ConditionalOnProperty(prefix = "zt.databus.sync.client", name = "enabled", havingValue = "true") -@ConditionalOnBean(DeptSyncService.class) -public class SystemDeptCreateHandler implements SyncEventHandler { - - @Resource - private DeptSyncService deptSyncService; - - @Override - public DatabusEventType getSupportedEventType() { - return DatabusEventType.SYSTEM_DEPT_CREATE; - } - - @Override - public void handle(DatabusMessage message) { - DatabusDeptData data = message.getData(); - log.info("[DeptSync] 收到部门创建事件, id={}, name={}, parentId={}", - data.getId(), data.getName(), data.getParentId()); - - try { - deptSyncService.create(data); - log.info("[DeptSync] 部门创建成功, id={}", data.getId()); - } catch (Exception e) { - log.error("[DeptSync] 部门创建失败, id={}", data.getId(), e); - throw e; // 抛出异常触发重试 - } - } -} diff --git a/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/dept/SystemDeptDeleteHandler.java b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/dept/SystemDeptDeleteHandler.java deleted file mode 100644 index 4ac8c687..00000000 --- a/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/dept/SystemDeptDeleteHandler.java +++ /dev/null @@ -1,49 +0,0 @@ -package com.zt.plat.framework.databus.client.handler.dept; - -import com.zt.plat.framework.databus.client.handler.SyncEventHandler; -import com.zt.plat.module.databus.api.data.DatabusDeptData; -import com.zt.plat.module.databus.api.message.DatabusMessage; -import com.zt.plat.module.databus.enums.DatabusEventType; -import jakarta.annotation.Resource; -import lombok.extern.slf4j.Slf4j; -import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; -import org.springframework.stereotype.Component; - -/** - * 部门删除事件处理器 - *

- * 使用条件: - * 1. zt.databus.sync.client.enabled=true - * 2. 存在 DeptSyncService Bean - * - * @author ZT - */ -@Slf4j -@Component -@ConditionalOnProperty(prefix = "zt.databus.sync.client", name = "enabled", havingValue = "true") -@ConditionalOnBean(DeptSyncService.class) -public class SystemDeptDeleteHandler implements SyncEventHandler { - - @Resource - private DeptSyncService deptSyncService; - - @Override - public DatabusEventType getSupportedEventType() { - return DatabusEventType.SYSTEM_DEPT_DELETE; - } - - @Override - public void handle(DatabusMessage message) { - DatabusDeptData data = message.getData(); - log.info("[DeptSync] 收到部门删除事件, id={}", data.getId()); - - try { - deptSyncService.delete(data.getId()); - log.info("[DeptSync] 部门删除成功, id={}", data.getId()); - } catch (Exception e) { - log.error("[DeptSync] 部门删除失败, id={}", data.getId(), e); - throw e; // 抛出异常触发重试 - } - } -} diff --git a/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/dept/SystemDeptHandler.java b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/dept/SystemDeptHandler.java new file mode 100644 index 00000000..cc1e8ab6 --- /dev/null +++ b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/dept/SystemDeptHandler.java @@ -0,0 +1,81 @@ +package com.zt.plat.framework.databus.client.handler.dept; + +import com.zt.plat.framework.databus.client.handler.SyncEventHandler; +import com.zt.plat.module.databus.api.data.DatabusDeptData; +import com.zt.plat.module.databus.api.message.DatabusMessage; +import com.zt.plat.module.databus.enums.DatabusEventType; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.stereotype.Component; + +import java.util.Arrays; +import java.util.List; + +/** + * 部门增量同步事件处理器(合并版) + *

+ * 处理事件类型: + * - SYSTEM_DEPT_CREATE:部门创建 + * - SYSTEM_DEPT_UPDATE:部门更新 + * - SYSTEM_DEPT_DELETE:部门删除 + *

+ * 使用条件: + * 1. zt.databus.sync.client.enabled=true + * 2. 存在 DeptSyncService Bean + * + * @author ZT + */ +@Slf4j +@Component +@ConditionalOnProperty(prefix = "zt.databus.sync.client", name = "enabled", havingValue = "true") +@ConditionalOnBean(DeptSyncService.class) +public class SystemDeptHandler implements SyncEventHandler { + + @Resource + private DeptSyncService deptSyncService; + + @Override + public List getSupportedEventTypes() { + return Arrays.asList( + DatabusEventType.SYSTEM_DEPT_CREATE, + DatabusEventType.SYSTEM_DEPT_UPDATE, + DatabusEventType.SYSTEM_DEPT_DELETE + ); + } + + @Override + public void handle(DatabusMessage message) { + DatabusDeptData data = message.getData(); + DatabusEventType eventType = message.getEventType(); + + log.info("[DeptSync] 收到部门{}事件, id={}, name={}, parentId={}", + eventType.getAction(), data.getId(), data.getName(), data.getParentId()); + + try { + switch (eventType) { + case SYSTEM_DEPT_CREATE: + deptSyncService.create(data); + log.info("[DeptSync] 部门创建成功, id={}", data.getId()); + break; + + case SYSTEM_DEPT_UPDATE: + deptSyncService.update(data); + log.info("[DeptSync] 部门更新成功, id={}", data.getId()); + break; + + case SYSTEM_DEPT_DELETE: + deptSyncService.delete(data.getId()); + log.info("[DeptSync] 部门删除成功, id={}", data.getId()); + break; + + default: + log.warn("[DeptSync] 未知事件类型: {}", eventType); + } + } catch (Exception e) { + log.error("[DeptSync] 部门{}失败, id={}", eventType.getAction(), data.getId(), e); + throw e; // 抛出异常触发重试 + } + } +} diff --git a/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/dept/SystemDeptUpdateHandler.java b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/dept/SystemDeptUpdateHandler.java deleted file mode 100644 index 8e7f68ed..00000000 --- a/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/dept/SystemDeptUpdateHandler.java +++ /dev/null @@ -1,49 +0,0 @@ -package com.zt.plat.framework.databus.client.handler.dept; - -import com.zt.plat.framework.databus.client.handler.SyncEventHandler; -import com.zt.plat.module.databus.api.data.DatabusDeptData; -import com.zt.plat.module.databus.api.message.DatabusMessage; -import com.zt.plat.module.databus.enums.DatabusEventType; -import jakarta.annotation.Resource; -import lombok.extern.slf4j.Slf4j; -import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; -import org.springframework.stereotype.Component; - -/** - * 部门更新事件处理器 - *

- * 使用条件: - * 1. zt.databus.sync.client.enabled=true - * 2. 存在 DeptSyncService Bean - * - * @author ZT - */ -@Slf4j -@Component -@ConditionalOnProperty(prefix = "zt.databus.sync.client", name = "enabled", havingValue = "true") -@ConditionalOnBean(DeptSyncService.class) -public class SystemDeptUpdateHandler implements SyncEventHandler { - - @Resource - private DeptSyncService deptSyncService; - - @Override - public DatabusEventType getSupportedEventType() { - return DatabusEventType.SYSTEM_DEPT_UPDATE; - } - - @Override - public void handle(DatabusMessage message) { - DatabusDeptData data = message.getData(); - log.info("[DeptSync] 收到部门更新事件, id={}, name={}", data.getId(), data.getName()); - - try { - deptSyncService.update(data); - log.info("[DeptSync] 部门更新成功, id={}", data.getId()); - } catch (Exception e) { - log.error("[DeptSync] 部门更新失败, id={}", data.getId(), e); - throw e; // 抛出异常触发重试 - } - } -} diff --git a/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/post/SystemPostCreateHandler.java b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/post/SystemPostCreateHandler.java deleted file mode 100644 index 98eb55df..00000000 --- a/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/post/SystemPostCreateHandler.java +++ /dev/null @@ -1,50 +0,0 @@ -package com.zt.plat.framework.databus.client.handler.post; - -import com.zt.plat.framework.databus.client.handler.SyncEventHandler; -import com.zt.plat.module.databus.api.data.DatabusPostData; -import com.zt.plat.module.databus.api.message.DatabusMessage; -import com.zt.plat.module.databus.enums.DatabusEventType; -import jakarta.annotation.Resource; -import lombok.extern.slf4j.Slf4j; -import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; -import org.springframework.stereotype.Component; - -/** - * 岗位创建事件处理器 - *

- * 使用条件: - * 1. zt.databus.sync.client.enabled=true - * 2. 存在 PostSyncService Bean - * - * @author ZT - */ -@Slf4j -@Component -@ConditionalOnProperty(prefix = "zt.databus.sync.client", name = "enabled", havingValue = "true") -@ConditionalOnBean(PostSyncService.class) -public class SystemPostCreateHandler implements SyncEventHandler { - - @Resource - private PostSyncService postSyncService; - - @Override - public DatabusEventType getSupportedEventType() { - return DatabusEventType.SYSTEM_POST_CREATE; - } - - @Override - public void handle(DatabusMessage message) { - DatabusPostData data = message.getData(); - log.info("[PostSync] 收到岗位创建事件, id={}, name={}, code={}", - data.getId(), data.getName(), data.getCode()); - - try { - postSyncService.create(data); - log.info("[PostSync] 岗位创建成功, id={}", data.getId()); - } catch (Exception e) { - log.error("[PostSync] 岗位创建失败, id={}", data.getId(), e); - throw e; // 抛出异常触发重试 - } - } -} diff --git a/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/post/SystemPostDeleteHandler.java b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/post/SystemPostDeleteHandler.java deleted file mode 100644 index 645648a2..00000000 --- a/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/post/SystemPostDeleteHandler.java +++ /dev/null @@ -1,49 +0,0 @@ -package com.zt.plat.framework.databus.client.handler.post; - -import com.zt.plat.framework.databus.client.handler.SyncEventHandler; -import com.zt.plat.module.databus.api.data.DatabusPostData; -import com.zt.plat.module.databus.api.message.DatabusMessage; -import com.zt.plat.module.databus.enums.DatabusEventType; -import jakarta.annotation.Resource; -import lombok.extern.slf4j.Slf4j; -import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; -import org.springframework.stereotype.Component; - -/** - * 岗位删除事件处理器 - *

- * 使用条件: - * 1. zt.databus.sync.client.enabled=true - * 2. 存在 PostSyncService Bean - * - * @author ZT - */ -@Slf4j -@Component -@ConditionalOnProperty(prefix = "zt.databus.sync.client", name = "enabled", havingValue = "true") -@ConditionalOnBean(PostSyncService.class) -public class SystemPostDeleteHandler implements SyncEventHandler { - - @Resource - private PostSyncService postSyncService; - - @Override - public DatabusEventType getSupportedEventType() { - return DatabusEventType.SYSTEM_POST_DELETE; - } - - @Override - public void handle(DatabusMessage message) { - DatabusPostData data = message.getData(); - log.info("[PostSync] 收到岗位删除事件, id={}", data.getId()); - - try { - postSyncService.delete(data.getId()); - log.info("[PostSync] 岗位删除成功, id={}", data.getId()); - } catch (Exception e) { - log.error("[PostSync] 岗位删除失败, id={}", data.getId(), e); - throw e; // 抛出异常触发重试 - } - } -} diff --git a/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/post/SystemPostHandler.java b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/post/SystemPostHandler.java new file mode 100644 index 00000000..c7a4e787 --- /dev/null +++ b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/post/SystemPostHandler.java @@ -0,0 +1,81 @@ +package com.zt.plat.framework.databus.client.handler.post; + +import com.zt.plat.framework.databus.client.handler.SyncEventHandler; +import com.zt.plat.module.databus.api.data.DatabusPostData; +import com.zt.plat.module.databus.api.message.DatabusMessage; +import com.zt.plat.module.databus.enums.DatabusEventType; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.stereotype.Component; + +import java.util.Arrays; +import java.util.List; + +/** + * 岗位增量同步事件处理器(合并版) + *

+ * 处理事件类型: + * - SYSTEM_POST_CREATE:岗位创建 + * - SYSTEM_POST_UPDATE:岗位更新 + * - SYSTEM_POST_DELETE:岗位删除 + *

+ * 使用条件: + * 1. zt.databus.sync.client.enabled=true + * 2. 存在 PostSyncService Bean + * + * @author ZT + */ +@Slf4j +@Component +@ConditionalOnProperty(prefix = "zt.databus.sync.client", name = "enabled", havingValue = "true") +@ConditionalOnBean(PostSyncService.class) +public class SystemPostHandler implements SyncEventHandler { + + @Resource + private PostSyncService postSyncService; + + @Override + public List getSupportedEventTypes() { + return Arrays.asList( + DatabusEventType.SYSTEM_POST_CREATE, + DatabusEventType.SYSTEM_POST_UPDATE, + DatabusEventType.SYSTEM_POST_DELETE + ); + } + + @Override + public void handle(DatabusMessage message) { + DatabusPostData data = message.getData(); + DatabusEventType eventType = message.getEventType(); + + log.info("[PostSync] 收到岗位{}事件, id={}, name={}, code={}", + eventType.getAction(), data.getId(), data.getName(), data.getCode()); + + try { + switch (eventType) { + case SYSTEM_POST_CREATE: + postSyncService.create(data); + log.info("[PostSync] 岗位创建成功, id={}", data.getId()); + break; + + case SYSTEM_POST_UPDATE: + postSyncService.update(data); + log.info("[PostSync] 岗位更新成功, id={}", data.getId()); + break; + + case SYSTEM_POST_DELETE: + postSyncService.delete(data.getId()); + log.info("[PostSync] 岗位删除成功, id={}", data.getId()); + break; + + default: + log.warn("[PostSync] 未知事件类型: {}", eventType); + } + } catch (Exception e) { + log.error("[PostSync] 岗位{}失败, id={}", eventType.getAction(), data.getId(), e); + throw e; // 抛出异常触发重试 + } + } +} diff --git a/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/post/SystemPostUpdateHandler.java b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/post/SystemPostUpdateHandler.java deleted file mode 100644 index e8d40e82..00000000 --- a/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/post/SystemPostUpdateHandler.java +++ /dev/null @@ -1,50 +0,0 @@ -package com.zt.plat.framework.databus.client.handler.post; - -import com.zt.plat.framework.databus.client.handler.SyncEventHandler; -import com.zt.plat.module.databus.api.data.DatabusPostData; -import com.zt.plat.module.databus.api.message.DatabusMessage; -import com.zt.plat.module.databus.enums.DatabusEventType; -import jakarta.annotation.Resource; -import lombok.extern.slf4j.Slf4j; -import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; -import org.springframework.stereotype.Component; - -/** - * 岗位更新事件处理器 - *

- * 使用条件: - * 1. zt.databus.sync.client.enabled=true - * 2. 存在 PostSyncService Bean - * - * @author ZT - */ -@Slf4j -@Component -@ConditionalOnProperty(prefix = "zt.databus.sync.client", name = "enabled", havingValue = "true") -@ConditionalOnBean(PostSyncService.class) -public class SystemPostUpdateHandler implements SyncEventHandler { - - @Resource - private PostSyncService postSyncService; - - @Override - public DatabusEventType getSupportedEventType() { - return DatabusEventType.SYSTEM_POST_UPDATE; - } - - @Override - public void handle(DatabusMessage message) { - DatabusPostData data = message.getData(); - log.info("[PostSync] 收到岗位更新事件, id={}, name={}, code={}", - data.getId(), data.getName(), data.getCode()); - - try { - postSyncService.update(data); - log.info("[PostSync] 岗位更新成功, id={}", data.getId()); - } catch (Exception e) { - log.error("[PostSync] 岗位更新失败, id={}", data.getId(), e); - throw e; // 抛出异常触发重试 - } - } -} diff --git a/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/user/SystemUserCreateHandler.java b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/user/SystemUserCreateHandler.java deleted file mode 100644 index 875043e7..00000000 --- a/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/user/SystemUserCreateHandler.java +++ /dev/null @@ -1,49 +0,0 @@ -package com.zt.plat.framework.databus.client.handler.user; - -import com.zt.plat.framework.databus.client.handler.SyncEventHandler; -import com.zt.plat.module.databus.api.data.DatabusAdminUserData; -import com.zt.plat.module.databus.api.message.DatabusMessage; -import com.zt.plat.module.databus.enums.DatabusEventType; -import jakarta.annotation.Resource; -import lombok.extern.slf4j.Slf4j; -import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; -import org.springframework.stereotype.Component; - -/** - * 用户创建事件处理器 - *

- * 使用条件: - * 1. zt.databus.sync.client.enabled=true - * 2. 存在 AdminUserSyncService Bean - * - * @author ZT - */ -@Slf4j -@Component -@ConditionalOnProperty(prefix = "zt.databus.sync.client", name = "enabled", havingValue = "true") -@ConditionalOnBean(AdminUserSyncService.class) -public class SystemUserCreateHandler implements SyncEventHandler { - - @Resource - private AdminUserSyncService adminUserSyncService; - - @Override - public DatabusEventType getSupportedEventType() { - return DatabusEventType.SYSTEM_USER_CREATE; - } - - @Override - public void handle(DatabusMessage message) { - DatabusAdminUserData data = message.getData(); - log.info("[UserSync] 收到用户创建事件, id={}, username={}", data.getId(), data.getUsername()); - - try { - adminUserSyncService.create(data); - log.info("[UserSync] 用户创建成功, id={}", data.getId()); - } catch (Exception e) { - log.error("[UserSync] 用户创建失败, id={}", data.getId(), e); - throw e; // 抛出异常触发重试 - } - } -} diff --git a/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/user/SystemUserDeleteHandler.java b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/user/SystemUserDeleteHandler.java deleted file mode 100644 index 139286ae..00000000 --- a/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/user/SystemUserDeleteHandler.java +++ /dev/null @@ -1,49 +0,0 @@ -package com.zt.plat.framework.databus.client.handler.user; - -import com.zt.plat.framework.databus.client.handler.SyncEventHandler; -import com.zt.plat.module.databus.api.data.DatabusAdminUserData; -import com.zt.plat.module.databus.api.message.DatabusMessage; -import com.zt.plat.module.databus.enums.DatabusEventType; -import jakarta.annotation.Resource; -import lombok.extern.slf4j.Slf4j; -import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; -import org.springframework.stereotype.Component; - -/** - * 用户删除事件处理器 - *

- * 使用条件: - * 1. zt.databus.sync.client.enabled=true - * 2. 存在 AdminUserSyncService Bean - * - * @author ZT - */ -@Slf4j -@Component -@ConditionalOnProperty(prefix = "zt.databus.sync.client", name = "enabled", havingValue = "true") -@ConditionalOnBean(AdminUserSyncService.class) -public class SystemUserDeleteHandler implements SyncEventHandler { - - @Resource - private AdminUserSyncService adminUserSyncService; - - @Override - public DatabusEventType getSupportedEventType() { - return DatabusEventType.SYSTEM_USER_DELETE; - } - - @Override - public void handle(DatabusMessage message) { - DatabusAdminUserData data = message.getData(); - log.info("[UserSync] 收到用户删除事件, id={}", data.getId()); - - try { - adminUserSyncService.delete(data.getId()); - log.info("[UserSync] 用户删除成功, id={}", data.getId()); - } catch (Exception e) { - log.error("[UserSync] 用户删除失败, id={}", data.getId(), e); - throw e; // 抛出异常触发重试 - } - } -} diff --git a/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/user/SystemUserHandler.java b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/user/SystemUserHandler.java new file mode 100644 index 00000000..3af1fd2a --- /dev/null +++ b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/user/SystemUserHandler.java @@ -0,0 +1,81 @@ +package com.zt.plat.framework.databus.client.handler.user; + +import com.zt.plat.framework.databus.client.handler.SyncEventHandler; +import com.zt.plat.module.databus.api.data.DatabusAdminUserData; +import com.zt.plat.module.databus.api.message.DatabusMessage; +import com.zt.plat.module.databus.enums.DatabusEventType; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.stereotype.Component; + +import java.util.Arrays; +import java.util.List; + +/** + * 用户增量同步事件处理器(合并版) + *

+ * 处理事件类型: + * - SYSTEM_USER_CREATE:用户创建 + * - SYSTEM_USER_UPDATE:用户更新 + * - SYSTEM_USER_DELETE:用户删除 + *

+ * 使用条件: + * 1. zt.databus.sync.client.enabled=true + * 2. 存在 AdminUserSyncService Bean + * + * @author ZT + */ +@Slf4j +@Component +@ConditionalOnProperty(prefix = "zt.databus.sync.client", name = "enabled", havingValue = "true") +@ConditionalOnBean(AdminUserSyncService.class) +public class SystemUserHandler implements SyncEventHandler { + + @Resource + private AdminUserSyncService adminUserSyncService; + + @Override + public List getSupportedEventTypes() { + return Arrays.asList( + DatabusEventType.SYSTEM_USER_CREATE, + DatabusEventType.SYSTEM_USER_UPDATE, + DatabusEventType.SYSTEM_USER_DELETE + ); + } + + @Override + public void handle(DatabusMessage message) { + DatabusAdminUserData data = message.getData(); + DatabusEventType eventType = message.getEventType(); + + log.info("[UserSync] 收到用户{}事件, id={}, username={}, nickname={}", + eventType.getAction(), data.getId(), data.getUsername(), data.getNickname()); + + try { + switch (eventType) { + case SYSTEM_USER_CREATE: + adminUserSyncService.create(data); + log.info("[UserSync] 用户创建成功, id={}", data.getId()); + break; + + case SYSTEM_USER_UPDATE: + adminUserSyncService.update(data); + log.info("[UserSync] 用户更新成功, id={}", data.getId()); + break; + + case SYSTEM_USER_DELETE: + adminUserSyncService.delete(data.getId()); + log.info("[UserSync] 用户删除成功, id={}", data.getId()); + break; + + default: + log.warn("[UserSync] 未知事件类型: {}", eventType); + } + } catch (Exception e) { + log.error("[UserSync] 用户{}失败, id={}", eventType.getAction(), data.getId(), e); + throw e; // 抛出异常触发重试 + } + } +} diff --git a/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/user/SystemUserUpdateHandler.java b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/user/SystemUserUpdateHandler.java deleted file mode 100644 index 46704f92..00000000 --- a/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/user/SystemUserUpdateHandler.java +++ /dev/null @@ -1,49 +0,0 @@ -package com.zt.plat.framework.databus.client.handler.user; - -import com.zt.plat.framework.databus.client.handler.SyncEventHandler; -import com.zt.plat.module.databus.api.data.DatabusAdminUserData; -import com.zt.plat.module.databus.api.message.DatabusMessage; -import com.zt.plat.module.databus.enums.DatabusEventType; -import jakarta.annotation.Resource; -import lombok.extern.slf4j.Slf4j; -import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; -import org.springframework.stereotype.Component; - -/** - * 用户更新事件处理器 - *

- * 使用条件: - * 1. zt.databus.sync.client.enabled=true - * 2. 存在 AdminUserSyncService Bean - * - * @author ZT - */ -@Slf4j -@Component -@ConditionalOnProperty(prefix = "zt.databus.sync.client", name = "enabled", havingValue = "true") -@ConditionalOnBean(AdminUserSyncService.class) -public class SystemUserUpdateHandler implements SyncEventHandler { - - @Resource - private AdminUserSyncService adminUserSyncService; - - @Override - public DatabusEventType getSupportedEventType() { - return DatabusEventType.SYSTEM_USER_UPDATE; - } - - @Override - public void handle(DatabusMessage message) { - DatabusAdminUserData data = message.getData(); - log.info("[UserSync] 收到用户更新事件, id={}, username={}", data.getId(), data.getUsername()); - - try { - adminUserSyncService.update(data); - log.info("[UserSync] 用户更新成功, id={}", data.getId()); - } catch (Exception e) { - log.error("[UserSync] 用户更新失败, id={}", data.getId(), e); - throw e; // 抛出异常触发重试 - } - } -} diff --git a/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/userdept/SystemUserDeptFullHandler.java b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/userdept/SystemUserDeptFullHandler.java new file mode 100644 index 00000000..42934904 --- /dev/null +++ b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/userdept/SystemUserDeptFullHandler.java @@ -0,0 +1,70 @@ +package com.zt.plat.framework.databus.client.handler.userdept; + +import com.zt.plat.framework.databus.client.handler.BatchSyncEventHandler; +import com.zt.plat.module.databus.api.data.DatabusUserDeptData; +import com.zt.plat.module.databus.api.message.DatabusBatchMessage; +import com.zt.plat.module.databus.enums.DatabusEventType; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.stereotype.Component; + +/** + * 用户-部门关系全量同步事件处理器(批量处理) + *

+ * 使用条件: + * 1. zt.databus.sync.client.enabled=true + * 2. 存在 UserDeptSyncService Bean + * + * @author ZT + */ +@Slf4j +@Component +@ConditionalOnProperty(prefix = "zt.databus.sync.client", name = "enabled", havingValue = "true") +@ConditionalOnBean(UserDeptSyncService.class) +public class SystemUserDeptFullHandler implements BatchSyncEventHandler { + + @Resource + private UserDeptSyncService userDeptSyncService; + + @Override + public DatabusEventType getSupportedEventType() { + return DatabusEventType.SYSTEM_USER_DEPT_FULL; + } + + @Override + public void onFullSyncStart(DatabusBatchMessage message) { + log.info("[UserDeptSync] 开始用户-部门关系全量同步, taskId={}, totalBatch={}", + message.getTaskId(), message.getTotalBatch()); + } + + @Override + public void handleBatch(DatabusBatchMessage message) { + log.info("[UserDeptSync] 处理用户-部门关系批次数据, taskId={}, batchNo={}/{}, size={}", + message.getTaskId(), message.getBatchNo(), message.getTotalBatch(), + message.getDataList().size()); + + // 逐条处理全量同步数据 + for (DatabusUserDeptData data : message.getDataList()) { + try { + userDeptSyncService.fullSync(data); + log.debug("[UserDeptSync] 用户-部门关系全量同步成功, userId={}, deptId={}", + data.getUserId(), data.getDeptId()); + } catch (Exception e) { + log.error("[UserDeptSync] 用户-部门关系全量同步失败, userId={}, deptId={}", + data.getUserId(), data.getDeptId(), e); + // 单条失败不影响其他数据,继续处理 + } + } + + log.info("[UserDeptSync] 用户-部门关系批次处理完成, taskId={}, batchNo={}/{}", + message.getTaskId(), message.getBatchNo(), message.getTotalBatch()); + } + + @Override + public void onFullSyncComplete(DatabusBatchMessage message) { + log.info("[UserDeptSync] 用户-部门关系全量同步完成, taskId={}, totalBatch={}", + message.getTaskId(), message.getTotalBatch()); + } +} diff --git a/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/userdept/SystemUserDeptHandler.java b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/userdept/SystemUserDeptHandler.java new file mode 100644 index 00000000..9a81c18b --- /dev/null +++ b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/userdept/SystemUserDeptHandler.java @@ -0,0 +1,87 @@ +package com.zt.plat.framework.databus.client.handler.userdept; + +import com.zt.plat.framework.databus.client.handler.SyncEventHandler; +import com.zt.plat.module.databus.api.data.DatabusUserDeptData; +import com.zt.plat.module.databus.api.message.DatabusMessage; +import com.zt.plat.module.databus.enums.DatabusEventType; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.stereotype.Component; + +import java.util.Arrays; +import java.util.List; + +/** + * 用户-部门关系增量同步事件处理器(合并版) + *

+ * 处理事件类型: + * - SYSTEM_USER_DEPT_CREATE:用户-部门关系创建 + * - SYSTEM_USER_DEPT_UPDATE:用户-部门关系更新 + * - SYSTEM_USER_DEPT_DELETE:用户-部门关系删除 + *

+ * 使用条件: + * 1. zt.databus.sync.client.enabled=true + * 2. 存在 UserDeptSyncService Bean + * + * @author ZT + */ +@Slf4j +@Component +@ConditionalOnProperty(prefix = "zt.databus.sync.client", name = "enabled", havingValue = "true") +@ConditionalOnBean(UserDeptSyncService.class) +public class SystemUserDeptHandler implements SyncEventHandler { + + @Resource + private UserDeptSyncService userDeptSyncService; + + @Override + public List getSupportedEventTypes() { + return Arrays.asList( + DatabusEventType.SYSTEM_USER_DEPT_CREATE, + DatabusEventType.SYSTEM_USER_DEPT_UPDATE, + DatabusEventType.SYSTEM_USER_DEPT_DELETE + ); + } + + @Override + public void handle(DatabusMessage message) { + DatabusUserDeptData data = message.getData(); + DatabusEventType eventType = message.getEventType(); + + log.info("[UserDeptSync] 收到用户-部门关系{}事件, userId={}, deptId={}", + eventType.getAction(), data != null ? data.getUserId() : null, + data != null ? data.getDeptId() : null); + + try { + switch (eventType) { + case SYSTEM_USER_DEPT_CREATE: + userDeptSyncService.create(data); + log.info("[UserDeptSync] 用户-部门关系创建成功, userId={}, deptId={}", + data.getUserId(), data.getDeptId()); + break; + + case SYSTEM_USER_DEPT_UPDATE: + userDeptSyncService.update(data); + log.info("[UserDeptSync] 用户-部门关系更新成功, userId={}, deptId={}", + data.getUserId(), data.getDeptId()); + break; + + case SYSTEM_USER_DEPT_DELETE: + userDeptSyncService.delete(data.getId()); + log.info("[UserDeptSync] 用户-部门关系删除成功, id={}", data.getId()); + break; + + default: + log.warn("[UserDeptSync] 未知事件类型: {}", eventType); + } + } catch (Exception e) { + log.error("[UserDeptSync] 用户-部门关系{}失败, userId={}, deptId={}", + eventType.getAction(), + data != null ? data.getUserId() : null, + data != null ? data.getDeptId() : null, e); + throw e; // 抛出异常触发重试 + } + } +} diff --git a/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/userdept/UserDeptSyncService.java b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/userdept/UserDeptSyncService.java new file mode 100644 index 00000000..26023a40 --- /dev/null +++ b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/userdept/UserDeptSyncService.java @@ -0,0 +1,44 @@ +package com.zt.plat.framework.databus.client.handler.userdept; + +import com.zt.plat.module.databus.api.data.DatabusUserDeptData; + +/** + * 用户-部门关系同步服务接口 + *

+ * 分公司需要实现此接口,完成数据的本地持久化 + * 或通过默认实现 {@link UserDeptSyncServiceImpl} 使用 Feign 调用远程 API + * + * @author ZT + */ +public interface UserDeptSyncService { + + /** + * 创建用户-部门关系(增量同步) + * + * @param data 用户-部门关系数据 + */ + void create(DatabusUserDeptData data); + + /** + * 更新用户-部门关系(增量同步) + * + * @param data 用户-部门关系数据 + */ + void update(DatabusUserDeptData data); + + /** + * 删除用户-部门关系(增量同步) + * + * @param id 关系ID + */ + void delete(Long id); + + /** + * 全量同步单条数据 + *

+ * 逻辑:存在则更新,不存在则插入 + * + * @param data 用户-部门关系数据 + */ + void fullSync(DatabusUserDeptData data); +} diff --git a/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/userdept/UserDeptSyncServiceImpl.java b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/userdept/UserDeptSyncServiceImpl.java new file mode 100644 index 00000000..9027e379 --- /dev/null +++ b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/userdept/UserDeptSyncServiceImpl.java @@ -0,0 +1,56 @@ +package com.zt.plat.framework.databus.client.handler.userdept; + +import com.zt.plat.module.databus.api.data.DatabusUserDeptData; +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.stereotype.Service; + +/** + * 用户-部门关系同步服务实现 + *

+ * 使用条件: + * 1. zt.databus.sync.client.enabled=true + *

+ * 注意:由于用户-部门关系通常集成在用户管理中,此实现为占位符。 + * 分公司可以根据实际情况: + * 1. 自定义实现此接口,直接操作本地数据库 + * 2. 或者通过用户管理 API 间接处理关联关系 + * + * @author ZT + */ +@Slf4j +@Service +@ConditionalOnProperty(prefix = "zt.databus.sync.client", name = "enabled", havingValue = "true") +public class UserDeptSyncServiceImpl implements UserDeptSyncService { + + @Override + public void create(DatabusUserDeptData data) { + log.info("[UserDeptSync] 收到创建用户-部门关系请求, userId={}, deptId={}", + data.getUserId(), data.getDeptId()); + log.warn("[UserDeptSync] 用户-部门关系同步服务需要分公司自定义实现,当前为占位符实现"); + // TODO: 分公司需要实现此方法,通过本地 API 或直接数据库操作完成同步 + } + + @Override + public void update(DatabusUserDeptData data) { + log.info("[UserDeptSync] 收到更新用户-部门关系请求, userId={}, deptId={}", + data.getUserId(), data.getDeptId()); + log.warn("[UserDeptSync] 用户-部门关系同步服务需要分公司自定义实现,当前为占位符实现"); + // TODO: 分公司需要实现此方法 + } + + @Override + public void delete(Long id) { + log.info("[UserDeptSync] 收到删除用户-部门关系请求, id={}", id); + log.warn("[UserDeptSync] 用户-部门关系同步服务需要分公司自定义实现,当前为占位符实现"); + // TODO: 分公司需要实现此方法 + } + + @Override + public void fullSync(DatabusUserDeptData data) { + log.info("[UserDeptSync] 收到全量同步用户-部门关系请求, userId={}, deptId={}", + data.getUserId(), data.getDeptId()); + log.warn("[UserDeptSync] 用户-部门关系同步服务需要分公司自定义实现,当前为占位符实现"); + // TODO: 分公司需要实现此方法,逻辑:存在则更新,不存在则插入 + } +} diff --git a/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/userpost/SystemUserPostFullHandler.java b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/userpost/SystemUserPostFullHandler.java new file mode 100644 index 00000000..486c1a8f --- /dev/null +++ b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/userpost/SystemUserPostFullHandler.java @@ -0,0 +1,70 @@ +package com.zt.plat.framework.databus.client.handler.userpost; + +import com.zt.plat.framework.databus.client.handler.BatchSyncEventHandler; +import com.zt.plat.module.databus.api.data.DatabusUserPostData; +import com.zt.plat.module.databus.api.message.DatabusBatchMessage; +import com.zt.plat.module.databus.enums.DatabusEventType; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.stereotype.Component; + +/** + * 用户-岗位关系全量同步事件处理器(批量处理) + *

+ * 使用条件: + * 1. zt.databus.sync.client.enabled=true + * 2. 存在 UserPostSyncService Bean + * + * @author ZT + */ +@Slf4j +@Component +@ConditionalOnProperty(prefix = "zt.databus.sync.client", name = "enabled", havingValue = "true") +@ConditionalOnBean(UserPostSyncService.class) +public class SystemUserPostFullHandler implements BatchSyncEventHandler { + + @Resource + private UserPostSyncService userPostSyncService; + + @Override + public DatabusEventType getSupportedEventType() { + return DatabusEventType.SYSTEM_USER_POST_FULL; + } + + @Override + public void onFullSyncStart(DatabusBatchMessage message) { + log.info("[UserPostSync] 开始用户-岗位关系全量同步, taskId={}, totalBatch={}", + message.getTaskId(), message.getTotalBatch()); + } + + @Override + public void handleBatch(DatabusBatchMessage message) { + log.info("[UserPostSync] 处理用户-岗位关系批次数据, taskId={}, batchNo={}/{}, size={}", + message.getTaskId(), message.getBatchNo(), message.getTotalBatch(), + message.getDataList().size()); + + // 逐条处理全量同步数据 + for (DatabusUserPostData data : message.getDataList()) { + try { + userPostSyncService.fullSync(data); + log.debug("[UserPostSync] 用户-岗位关系全量同步成功, userId={}, postId={}", + data.getUserId(), data.getPostId()); + } catch (Exception e) { + log.error("[UserPostSync] 用户-岗位关系全量同步失败, userId={}, postId={}", + data.getUserId(), data.getPostId(), e); + // 单条失败不影响其他数据,继续处理 + } + } + + log.info("[UserPostSync] 用户-岗位关系批次处理完成, taskId={}, batchNo={}/{}", + message.getTaskId(), message.getBatchNo(), message.getTotalBatch()); + } + + @Override + public void onFullSyncComplete(DatabusBatchMessage message) { + log.info("[UserPostSync] 用户-岗位关系全量同步完成, taskId={}, totalBatch={}", + message.getTaskId(), message.getTotalBatch()); + } +} diff --git a/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/userpost/SystemUserPostHandler.java b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/userpost/SystemUserPostHandler.java new file mode 100644 index 00000000..cfdcc8d8 --- /dev/null +++ b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/userpost/SystemUserPostHandler.java @@ -0,0 +1,87 @@ +package com.zt.plat.framework.databus.client.handler.userpost; + +import com.zt.plat.framework.databus.client.handler.SyncEventHandler; +import com.zt.plat.module.databus.api.data.DatabusUserPostData; +import com.zt.plat.module.databus.api.message.DatabusMessage; +import com.zt.plat.module.databus.enums.DatabusEventType; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.stereotype.Component; + +import java.util.Arrays; +import java.util.List; + +/** + * 用户-岗位关系增量同步事件处理器(合并版) + *

+ * 处理事件类型: + * - SYSTEM_USER_POST_CREATE:用户-岗位关系创建 + * - SYSTEM_USER_POST_UPDATE:用户-岗位关系更新 + * - SYSTEM_USER_POST_DELETE:用户-岗位关系删除 + *

+ * 使用条件: + * 1. zt.databus.sync.client.enabled=true + * 2. 存在 UserPostSyncService Bean + * + * @author ZT + */ +@Slf4j +@Component +@ConditionalOnProperty(prefix = "zt.databus.sync.client", name = "enabled", havingValue = "true") +@ConditionalOnBean(UserPostSyncService.class) +public class SystemUserPostHandler implements SyncEventHandler { + + @Resource + private UserPostSyncService userPostSyncService; + + @Override + public List getSupportedEventTypes() { + return Arrays.asList( + DatabusEventType.SYSTEM_USER_POST_CREATE, + DatabusEventType.SYSTEM_USER_POST_UPDATE, + DatabusEventType.SYSTEM_USER_POST_DELETE + ); + } + + @Override + public void handle(DatabusMessage message) { + DatabusUserPostData data = message.getData(); + DatabusEventType eventType = message.getEventType(); + + log.info("[UserPostSync] 收到用户-岗位关系{}事件, userId={}, postId={}", + eventType.getAction(), data != null ? data.getUserId() : null, + data != null ? data.getPostId() : null); + + try { + switch (eventType) { + case SYSTEM_USER_POST_CREATE: + userPostSyncService.create(data); + log.info("[UserPostSync] 用户-岗位关系创建成功, userId={}, postId={}", + data.getUserId(), data.getPostId()); + break; + + case SYSTEM_USER_POST_UPDATE: + userPostSyncService.update(data); + log.info("[UserPostSync] 用户-岗位关系更新成功, userId={}, postId={}", + data.getUserId(), data.getPostId()); + break; + + case SYSTEM_USER_POST_DELETE: + userPostSyncService.delete(data.getId()); + log.info("[UserPostSync] 用户-岗位关系删除成功, id={}", data.getId()); + break; + + default: + log.warn("[UserPostSync] 未知事件类型: {}", eventType); + } + } catch (Exception e) { + log.error("[UserPostSync] 用户-岗位关系{}失败, userId={}, postId={}", + eventType.getAction(), + data != null ? data.getUserId() : null, + data != null ? data.getPostId() : null, e); + throw e; // 抛出异常触发重试 + } + } +} diff --git a/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/userpost/UserPostSyncService.java b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/userpost/UserPostSyncService.java new file mode 100644 index 00000000..f6d6b780 --- /dev/null +++ b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/userpost/UserPostSyncService.java @@ -0,0 +1,44 @@ +package com.zt.plat.framework.databus.client.handler.userpost; + +import com.zt.plat.module.databus.api.data.DatabusUserPostData; + +/** + * 用户-岗位关系同步服务接口 + *

+ * 分公司需要实现此接口,完成数据的本地持久化 + * 或通过默认实现 {@link UserPostSyncServiceImpl} 使用 Feign 调用远程 API + * + * @author ZT + */ +public interface UserPostSyncService { + + /** + * 创建用户-岗位关系(增量同步) + * + * @param data 用户-岗位关系数据 + */ + void create(DatabusUserPostData data); + + /** + * 更新用户-岗位关系(增量同步) + * + * @param data 用户-岗位关系数据 + */ + void update(DatabusUserPostData data); + + /** + * 删除用户-岗位关系(增量同步) + * + * @param id 关系ID + */ + void delete(Long id); + + /** + * 全量同步单条数据 + *

+ * 逻辑:存在则更新,不存在则插入 + * + * @param data 用户-岗位关系数据 + */ + void fullSync(DatabusUserPostData data); +} diff --git a/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/userpost/UserPostSyncServiceImpl.java b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/userpost/UserPostSyncServiceImpl.java new file mode 100644 index 00000000..b41d6ac8 --- /dev/null +++ b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/userpost/UserPostSyncServiceImpl.java @@ -0,0 +1,56 @@ +package com.zt.plat.framework.databus.client.handler.userpost; + +import com.zt.plat.module.databus.api.data.DatabusUserPostData; +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.stereotype.Service; + +/** + * 用户-岗位关系同步服务实现 + *

+ * 使用条件: + * 1. zt.databus.sync.client.enabled=true + *

+ * 注意:由于用户-岗位关系通常集成在用户管理中,此实现为占位符。 + * 分公司可以根据实际情况: + * 1. 自定义实现此接口,直接操作本地数据库 + * 2. 或者通过用户管理 API 间接处理关联关系 + * + * @author ZT + */ +@Slf4j +@Service +@ConditionalOnProperty(prefix = "zt.databus.sync.client", name = "enabled", havingValue = "true") +public class UserPostSyncServiceImpl implements UserPostSyncService { + + @Override + public void create(DatabusUserPostData data) { + log.info("[UserPostSync] 收到创建用户-岗位关系请求, userId={}, postId={}", + data.getUserId(), data.getPostId()); + log.warn("[UserPostSync] 用户-岗位关系同步服务需要分公司自定义实现,当前为占位符实现"); + // TODO: 分公司需要实现此方法,通过本地 API 或直接数据库操作完成同步 + } + + @Override + public void update(DatabusUserPostData data) { + log.info("[UserPostSync] 收到更新用户-岗位关系请求, userId={}, postId={}", + data.getUserId(), data.getPostId()); + log.warn("[UserPostSync] 用户-岗位关系同步服务需要分公司自定义实现,当前为占位符实现"); + // TODO: 分公司需要实现此方法 + } + + @Override + public void delete(Long id) { + log.info("[UserPostSync] 收到删除用户-岗位关系请求, id={}", id); + log.warn("[UserPostSync] 用户-岗位关系同步服务需要分公司自定义实现,当前为占位符实现"); + // TODO: 分公司需要实现此方法 + } + + @Override + public void fullSync(DatabusUserPostData data) { + log.info("[UserPostSync] 收到全量同步用户-岗位关系请求, userId={}, postId={}", + data.getUserId(), data.getPostId()); + log.warn("[UserPostSync] 用户-岗位关系同步服务需要分公司自定义实现,当前为占位符实现"); + // TODO: 分公司需要实现此方法,逻辑:存在则更新,不存在则插入 + } +} diff --git a/zt-module-databus/zt-module-databus-api/src/main/java/com/zt/plat/module/databus/api/data/DatabusUserDeptData.java b/zt-module-databus/zt-module-databus-api/src/main/java/com/zt/plat/module/databus/api/data/DatabusUserDeptData.java new file mode 100644 index 00000000..b5409176 --- /dev/null +++ b/zt-module-databus/zt-module-databus-api/src/main/java/com/zt/plat/module/databus/api/data/DatabusUserDeptData.java @@ -0,0 +1,48 @@ +package com.zt.plat.module.databus.api.data; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.io.Serializable; + +/** + * 用户与组织机构关联关系数据对象 + *

+ * 对应表:system_user_dept + * + * @author ZT + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class DatabusUserDeptData implements Serializable { + + /** + * 关联关系ID + */ + private Long id; + + /** + * 用户ID + */ + private Long userId; + + /** + * 部门ID + */ + private Long deptId; + + /** + * 租户ID + */ + private Long tenantId; + + /** + * 备注 + */ + private String remark; + +} diff --git a/zt-module-databus/zt-module-databus-api/src/main/java/com/zt/plat/module/databus/api/data/DatabusUserPostData.java b/zt-module-databus/zt-module-databus-api/src/main/java/com/zt/plat/module/databus/api/data/DatabusUserPostData.java new file mode 100644 index 00000000..b27ac361 --- /dev/null +++ b/zt-module-databus/zt-module-databus-api/src/main/java/com/zt/plat/module/databus/api/data/DatabusUserPostData.java @@ -0,0 +1,38 @@ +package com.zt.plat.module.databus.api.data; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.io.Serializable; + +/** + * 用户与岗位关联关系数据对象 + *

+ * 对应表:system_user_post + * + * @author ZT + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class DatabusUserPostData implements Serializable { + + /** + * 关联关系ID + */ + private Long id; + + /** + * 用户ID + */ + private Long userId; + + /** + * 岗位ID + */ + private Long postId; + +} diff --git a/zt-module-databus/zt-module-databus-api/src/main/java/com/zt/plat/module/databus/enums/DatabusEventType.java b/zt-module-databus/zt-module-databus-api/src/main/java/com/zt/plat/module/databus/enums/DatabusEventType.java index bb853bfe..80886ad3 100644 --- a/zt-module-databus/zt-module-databus-api/src/main/java/com/zt/plat/module/databus/enums/DatabusEventType.java +++ b/zt-module-databus/zt-module-databus-api/src/main/java/com/zt/plat/module/databus/enums/DatabusEventType.java @@ -41,49 +41,45 @@ public enum DatabusEventType { SYSTEM_USER_FULL("system", "user", "full", "用户全量同步"), /** - * 部门-创建 + * 组织机构-创建 + */ + SYSTEM_ORG_CREATE("system", "org", "create", "组织机构创建"), + + /** + * 组织机构-更新 + */ + SYSTEM_ORG_UPDATE("system", "org", "update", "组织机构更新"), + + /** + * 组织机构-删除 + */ + SYSTEM_ORG_DELETE("system", "org", "delete", "组织机构删除"), + + /** + * 组织机构-全量同步 + */ + SYSTEM_ORG_FULL("system", "org", "full", "组织机构全量同步"), + + /** + * 部门-创建(新命名,推荐使用) */ SYSTEM_DEPT_CREATE("system", "dept", "create", "部门创建"), /** - * 部门-更新 + * 部门-更新(新命名,推荐使用) */ SYSTEM_DEPT_UPDATE("system", "dept", "update", "部门更新"), /** - * 部门-删除 + * 部门-删除(新命名,推荐使用) */ SYSTEM_DEPT_DELETE("system", "dept", "delete", "部门删除"), /** - * 部门-全量同步 + * 部门-全量同步(新命名,推荐使用) */ SYSTEM_DEPT_FULL("system", "dept", "full", "部门全量同步"), - /** - * 组织机构-创建(兼容老代码,保留但不推荐使用) - */ - @Deprecated - SYSTEM_ORG_CREATE("system", "org", "create", "组织机构创建"), - - /** - * 组织机构-更新(兼容老代码,保留但不推荐使用) - */ - @Deprecated - SYSTEM_ORG_UPDATE("system", "org", "update", "组织机构更新"), - - /** - * 组织机构-删除(兼容老代码,保留但不推荐使用) - */ - @Deprecated - SYSTEM_ORG_DELETE("system", "org", "delete", "组织机构删除"), - - /** - * 组织机构-全量同步(兼容老代码,保留但不推荐使用) - */ - @Deprecated - SYSTEM_ORG_FULL("system", "org", "full", "组织机构全量同步"), - /** * 岗位-创建 */ @@ -104,6 +100,84 @@ public enum DatabusEventType { */ SYSTEM_POST_FULL("system", "post", "full", "岗位全量同步"), + /** + * 用户-部门关系-创建 + */ + SYSTEM_USER_DEPT_CREATE("system", "user-dept", "create", "用户-部门关系创建"), + + /** + * 用户-部门关系-更新 + */ + SYSTEM_USER_DEPT_UPDATE("system", "user-dept", "update", "用户-部门关系更新"), + + /** + * 用户-部门关系-删除 + */ + SYSTEM_USER_DEPT_DELETE("system", "user-dept", "delete", "用户-部门关系删除"), + + /** + * 用户-部门关系-全量同步 + */ + SYSTEM_USER_DEPT_FULL("system", "user-dept", "full", "用户-部门关系全量同步"), + + /** + * 用户-岗位关系-创建 + */ + SYSTEM_USER_POST_CREATE("system", "user-post", "create", "用户-岗位关系创建"), + + /** + * 用户-岗位关系-更新 + */ + SYSTEM_USER_POST_UPDATE("system", "user-post", "update", "用户-岗位关系更新"), + + /** + * 用户-岗位关系-删除 + */ + SYSTEM_USER_POST_DELETE("system", "user-post", "delete", "用户-岗位关系删除"), + + /** + * 用户-岗位关系-全量同步 + */ + SYSTEM_USER_POST_FULL("system", "user-post", "full", "用户-岗位关系全量同步"), + + /** + * @deprecated 已拆分为 SYSTEM_USER_DEPT_* 和 SYSTEM_USER_POST_*,保留用于向后兼容 + */ + @Deprecated + SYSTEM_USER_RELATION_CREATE("system", "user-relation", "create", "用户关联创建"), + + /** + * @deprecated 已拆分为 SYSTEM_USER_DEPT_* 和 SYSTEM_USER_POST_*,保留用于向后兼容 + */ + @Deprecated + SYSTEM_USER_RELATION_UPDATE("system", "user-relation", "update", "用户关联更新"), + + /** + * @deprecated 已拆分为 SYSTEM_USER_DEPT_* 和 SYSTEM_USER_POST_*,保留用于向后兼容 + */ + @Deprecated + SYSTEM_USER_RELATION_DELETE("system", "user-relation", "delete", "用户关联删除"), + + /** + * @deprecated 已拆分为 SYSTEM_USER_DEPT_* 和 SYSTEM_USER_POST_*,保留用于向后兼容 + */ + @Deprecated + SYSTEM_USER_RELATION_FULL("system", "user-relation", "full", "用户关联全量同步"), + + /** + * 基础数据-全量同步(组合编排事件) + *

+ * 按依赖顺序触发以下全量同步: + * 1. 组织机构(SYSTEM_DEPT_FULL) + * 2. 岗位(SYSTEM_POST_FULL) + * 3. 用户(SYSTEM_USER_FULL) + * 4. 用户-部门关系(SYSTEM_USER_DEPT_FULL) + * 5. 用户-岗位关系(SYSTEM_USER_POST_FULL) + *

+ * 注意:本事件只负责触发,实际数据同步由5个独立的全量同步任务完成 + */ + SYSTEM_BASE_DATA_FULL("system", "base-data", "full", "基础数据全量同步"), + /** * 角色-创建 */ @@ -167,7 +241,7 @@ public enum DatabusEventType { BASE_MATERIAL_FULL("base", "material", "full", "物料全量同步"), /** - * 供应��-创建 + * 供应商-创建 */ BASE_SUPPLIER_CREATE("base", "supplier", "create", "供应商创建"), @@ -238,13 +312,10 @@ public enum DatabusEventType { /** * 获取完整Topic名称(服务端转发用) - * 格式: {topicBase}-{clientCode}(简化版,所有事件共用一个 Topic) - * 示例: databus-sync-branch-001 - * - * 注意:不再为每个事件创建独立 Topic,而是通过消息体中的 eventType 字段路由 + * 格式: {topicBase}-{module}-{entity}-{action}-{clientCode} */ public String getTopic(String topicBase, String clientCode) { - return String.format("%s-%s", topicBase, clientCode); + return String.format("%s-%s-%s-%s-%s", topicBase, module, entity, action, clientCode); } /** @@ -273,31 +344,6 @@ public enum DatabusEventType { return null; } - /** - * 根据事件类型字符串获取枚举(支持大写下划线格式) - * 例如:SYSTEM_POST_FULL → DatabusEventType.SYSTEM_POST_FULL - * - * @param eventType 事件类型字符串(格式: MODULE_ENTITY_ACTION) - * @return 枚举值,未找到返回null - */ - public static DatabusEventType getByEventType(String eventType) { - if (eventType == null) { - return null; - } - // 先尝试直接匹配枚举名称 - try { - return DatabusEventType.valueOf(eventType.toUpperCase()); - } catch (IllegalArgumentException e) { - // 如果失败,尝试转换格式后匹配(如 system-post-full → SYSTEM_POST_FULL) - String normalized = eventType.toUpperCase().replace("-", "_"); - try { - return DatabusEventType.valueOf(normalized); - } catch (IllegalArgumentException ex) { - return null; - } - } - } - /** * 根据模块、实体、操作获取枚举 * @@ -317,6 +363,25 @@ public enum DatabusEventType { return null; } + /** + * 根据事件类型名称获取枚举 + *

+ * 支持大写下划线格式,如: SYSTEM_POST_FULL + * + * @param eventType 事件类型名称(枚举常量名) + * @return 枚举值,未找到返回null + */ + public static DatabusEventType getByEventType(String eventType) { + if (eventType == null || eventType.isEmpty()) { + return null; + } + try { + return DatabusEventType.valueOf(eventType); + } catch (IllegalArgumentException e) { + return null; + } + } + /** * 判断是否为全量同步事件 */ diff --git a/zt-module-databus/zt-module-databus-server/src/main/java/com/zt/plat/module/databus/framework/rpc/config/RpcConfiguration.java b/zt-module-databus/zt-module-databus-server/src/main/java/com/zt/plat/module/databus/framework/rpc/config/RpcConfiguration.java index 5d974e23..32e911e1 100644 --- a/zt-module-databus/zt-module-databus-server/src/main/java/com/zt/plat/module/databus/framework/rpc/config/RpcConfiguration.java +++ b/zt-module-databus/zt-module-databus-server/src/main/java/com/zt/plat/module/databus/framework/rpc/config/RpcConfiguration.java @@ -4,6 +4,8 @@ import com.zt.plat.framework.common.biz.system.oauth2.OAuth2TokenCommonApi; import com.zt.plat.module.databus.api.provider.DatabusDeptProviderApi; import com.zt.plat.module.databus.api.provider.DatabusPostProviderApi; import com.zt.plat.module.databus.api.provider.DatabusUserProviderApi; +import com.zt.plat.module.system.api.dept.DeptApi; +import com.zt.plat.module.system.api.dept.PostApi; import com.zt.plat.module.system.api.user.AdminUserApi; import org.springframework.cloud.openfeign.EnableFeignClients; import org.springframework.context.annotation.Configuration; @@ -18,7 +20,10 @@ import org.springframework.context.annotation.Configuration; // DataBus 数据提供者 Feign 客户端 DatabusDeptProviderApi.class, DatabusUserProviderApi.class, - DatabusPostProviderApi.class + DatabusPostProviderApi.class, + PostApi.class, + DeptApi.class, + AdminUserApi.class, }) public class RpcConfiguration { } diff --git a/zt-module-system/zt-module-system-server/src/main/java/com/zt/plat/module/system/mq/producer/databus/DatabusUserDeptChangeProducer.java b/zt-module-system/zt-module-system-server/src/main/java/com/zt/plat/module/system/mq/producer/databus/DatabusUserDeptChangeProducer.java new file mode 100644 index 00000000..d5f75058 --- /dev/null +++ b/zt-module-system/zt-module-system-server/src/main/java/com/zt/plat/module/system/mq/producer/databus/DatabusUserDeptChangeProducer.java @@ -0,0 +1,110 @@ +package com.zt.plat.module.system.mq.producer.databus; + +import cn.hutool.core.util.IdUtil; +import com.zt.plat.module.databus.api.data.DatabusUserDeptData; +import com.zt.plat.module.databus.api.message.DatabusMessage; +import com.zt.plat.module.databus.enums.DatabusEventType; +import com.zt.plat.module.system.dal.dataobject.userdept.UserDeptDO; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.core.RocketMQTemplate; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +/** + * 用户-部门关系变更消息 Producer + *

+ * 负责发送用户与部门的关联关系变更事件 + * + * @author ZT + */ +@Slf4j +@Component +public class DatabusUserDeptChangeProducer { + + @Resource + private RocketMQTemplate rocketMQTemplate; + + @Value("${zt.databus.change.topic-prefix:databus-change}") + private String topicPrefix; + + private static final String TOPIC_SUFFIX = "-system-user-dept"; + + /** + * 发送用户-部门关系创建消息 + * + * @param userDept 用户-部门关系对象 + */ + public void sendUserDeptCreatedMessage(UserDeptDO userDept) { + try { + DatabusUserDeptData data = buildUserDeptData(userDept); + sendMessage(DatabusEventType.SYSTEM_USER_DEPT_CREATE, data); + log.info("[DatabusUserDeptChange] 发送用户-部门关系创建消息, userId={}, deptId={}", + userDept.getUserId(), userDept.getDeptId()); + } catch (Exception e) { + log.error("[DatabusUserDeptChange] 发送用户-部门关系创建消息失败, userId={}, deptId={}", + userDept.getUserId(), userDept.getDeptId(), e); + } + } + + /** + * 发送用户-部门关系更新消息 + * + * @param userDept 用户-部门关系对象 + */ + public void sendUserDeptUpdatedMessage(UserDeptDO userDept) { + try { + DatabusUserDeptData data = buildUserDeptData(userDept); + sendMessage(DatabusEventType.SYSTEM_USER_DEPT_UPDATE, data); + log.info("[DatabusUserDeptChange] 发送用户-部门关系更新消息, userId={}, deptId={}", + userDept.getUserId(), userDept.getDeptId()); + } catch (Exception e) { + log.error("[DatabusUserDeptChange] 发送用户-部门关系更新消息失败, userId={}, deptId={}", + userDept.getUserId(), userDept.getDeptId(), e); + } + } + + /** + * 发送用户-部门关系删除消息 + * + * @param userDept 用户-部门关系对象(删除前查询的) + */ + public void sendUserDeptDeletedMessage(UserDeptDO userDept) { + try { + DatabusUserDeptData data = buildUserDeptData(userDept); + sendMessage(DatabusEventType.SYSTEM_USER_DEPT_DELETE, data); + log.info("[DatabusUserDeptChange] 发送用户-部门关系删除消息, userId={}, deptId={}", + userDept.getUserId(), userDept.getDeptId()); + } catch (Exception e) { + log.error("[DatabusUserDeptChange] 发送用户-部门关系删除消息失败, userId={}, deptId={}", + userDept.getUserId(), userDept.getDeptId(), e); + } + } + + /** + * 构建用户-部门关系数据 + */ + private DatabusUserDeptData buildUserDeptData(UserDeptDO userDept) { + return DatabusUserDeptData.builder() + .id(userDept.getId()) + .userId(userDept.getUserId()) + .deptId(userDept.getDeptId()) + .tenantId(userDept.getTenantId()) + .remark(userDept.getRemark()) + .build(); + } + + /** + * 发送消息到 MQ + */ + private void sendMessage(DatabusEventType eventType, DatabusUserDeptData data) { + DatabusMessage message = new DatabusMessage<>(); + message.setEventType(eventType); + message.setData(data); + message.setTimestamp(java.time.LocalDateTime.now()); + message.setMessageId(IdUtil.fastSimpleUUID()); + + String topic = topicPrefix + TOPIC_SUFFIX; + rocketMQTemplate.syncSend(topic, message); + } +} diff --git a/zt-module-system/zt-module-system-server/src/main/java/com/zt/plat/module/system/mq/producer/databus/DatabusUserPostChangeProducer.java b/zt-module-system/zt-module-system-server/src/main/java/com/zt/plat/module/system/mq/producer/databus/DatabusUserPostChangeProducer.java new file mode 100644 index 00000000..b6201f5d --- /dev/null +++ b/zt-module-system/zt-module-system-server/src/main/java/com/zt/plat/module/system/mq/producer/databus/DatabusUserPostChangeProducer.java @@ -0,0 +1,108 @@ +package com.zt.plat.module.system.mq.producer.databus; + +import cn.hutool.core.util.IdUtil; +import com.zt.plat.module.databus.api.data.DatabusUserPostData; +import com.zt.plat.module.databus.api.message.DatabusMessage; +import com.zt.plat.module.databus.enums.DatabusEventType; +import com.zt.plat.module.system.dal.dataobject.dept.UserPostDO; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.core.RocketMQTemplate; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +/** + * 用户-岗位关系变更消息 Producer + *

+ * 负责发送用户与岗位的关联关系变更事件 + * + * @author ZT + */ +@Slf4j +@Component +public class DatabusUserPostChangeProducer { + + @Resource + private RocketMQTemplate rocketMQTemplate; + + @Value("${zt.databus.change.topic-prefix:databus-change}") + private String topicPrefix; + + private static final String TOPIC_SUFFIX = "-system-user-post"; + + /** + * 发送用户-岗位关系创建消息 + * + * @param userPost 用户-岗位关系对象 + */ + public void sendUserPostCreatedMessage(UserPostDO userPost) { + try { + DatabusUserPostData data = buildUserPostData(userPost); + sendMessage(DatabusEventType.SYSTEM_USER_POST_CREATE, data); + log.info("[DatabusUserPostChange] 发送用户-岗位关系创建消息, userId={}, postId={}", + userPost.getUserId(), userPost.getPostId()); + } catch (Exception e) { + log.error("[DatabusUserPostChange] 发送用户-岗位关系创建消息失败, userId={}, postId={}", + userPost.getUserId(), userPost.getPostId(), e); + } + } + + /** + * 发送用户-岗位关系更新消息 + * + * @param userPost 用户-岗位关系对象 + */ + public void sendUserPostUpdatedMessage(UserPostDO userPost) { + try { + DatabusUserPostData data = buildUserPostData(userPost); + sendMessage(DatabusEventType.SYSTEM_USER_POST_UPDATE, data); + log.info("[DatabusUserPostChange] 发送用户-岗位关系更新消息, userId={}, postId={}", + userPost.getUserId(), userPost.getPostId()); + } catch (Exception e) { + log.error("[DatabusUserPostChange] 发送用户-岗位关系更新消息失败, userId={}, postId={}", + userPost.getUserId(), userPost.getPostId(), e); + } + } + + /** + * 发送用户-岗位关系删除消息 + * + * @param userPost 用户-岗位关系对象(删除前查询的) + */ + public void sendUserPostDeletedMessage(UserPostDO userPost) { + try { + DatabusUserPostData data = buildUserPostData(userPost); + sendMessage(DatabusEventType.SYSTEM_USER_POST_DELETE, data); + log.info("[DatabusUserPostChange] 发送用户-岗位关系删除消息, userId={}, postId={}", + userPost.getUserId(), userPost.getPostId()); + } catch (Exception e) { + log.error("[DatabusUserPostChange] 发送用户-岗位关系删除消息失败, userId={}, postId={}", + userPost.getUserId(), userPost.getPostId(), e); + } + } + + /** + * 构建用户-岗位关系数据 + */ + private DatabusUserPostData buildUserPostData(UserPostDO userPost) { + return DatabusUserPostData.builder() + .id(userPost.getId()) + .userId(userPost.getUserId()) + .postId(userPost.getPostId()) + .build(); + } + + /** + * 发送消息到 MQ + */ + private void sendMessage(DatabusEventType eventType, DatabusUserPostData data) { + DatabusMessage message = new DatabusMessage<>(); + message.setEventType(eventType); + message.setData(data); + message.setTimestamp(java.time.LocalDateTime.now()); + message.setMessageId(IdUtil.fastSimpleUUID()); + + String topic = topicPrefix + TOPIC_SUFFIX; + rocketMQTemplate.syncSend(topic, message); + } +} diff --git a/zt-module-system/zt-module-system-server/src/main/java/com/zt/plat/module/system/service/user/AdminUserServiceImpl.java b/zt-module-system/zt-module-system-server/src/main/java/com/zt/plat/module/system/service/user/AdminUserServiceImpl.java index 7d430fcd..b9126216 100644 --- a/zt-module-system/zt-module-system-server/src/main/java/com/zt/plat/module/system/service/user/AdminUserServiceImpl.java +++ b/zt-module-system/zt-module-system-server/src/main/java/com/zt/plat/module/system/service/user/AdminUserServiceImpl.java @@ -29,6 +29,7 @@ import com.zt.plat.module.system.dal.mysql.dept.UserPostMapper; import com.zt.plat.module.system.dal.mysql.user.AdminUserMapper; import com.zt.plat.module.system.enums.user.PasswordStrategyEnum; import com.zt.plat.module.system.enums.user.UserSourceEnum; +import com.zt.plat.module.system.mq.producer.databus.DatabusChangeProducer; import com.zt.plat.module.system.service.dept.DeptService; import com.zt.plat.module.system.service.dept.PostService; import com.zt.plat.module.system.service.permission.PermissionService; @@ -93,11 +94,10 @@ public class AdminUserServiceImpl implements AdminUserService { @Resource private ConfigApi configApi; - - @Resource - private com.zt.plat.module.system.mq.producer.databus.DatabusChangeProducer databusChangeProducer; @Resource private UserDeptService userDeptService; + @Resource + private DatabusChangeProducer databusChangeProducer; @Override @GlobalTransactional(rollbackFor = Exception.class) @@ -128,7 +128,6 @@ public class AdminUserServiceImpl implements AdminUserService { if (user.getUserSource() == null) { user.setUserSource(UserSourceEnum.EXTERNAL.getSource()); } - user.setWorkcode(normalizeWorkcode(createReqVO.getWorkcode())); PasswordStrategyEnum passwordStrategy = determinePasswordStrategy(user.getUserSource()); user.setAvatar(normalizeAvatarValue(createReqVO.getAvatar())); user.setPassword(encodePassword(createReqVO.getPassword(), passwordStrategy)); @@ -144,10 +143,10 @@ public class AdminUserServiceImpl implements AdminUserService { postId -> new UserPostDO().setUserId(user.getId()).setPostId(postId))); } - // 2.4 发布用户创建事件 + // 3. 发送用户创建消息到MQ(供Databus消费转发) databusChangeProducer.sendUserCreatedMessage(user); - // 3. 记录操作日志上下文 + // 4. 记录操作日志上下文 LogRecordContext.putVariable("user", user); return user.getId(); } @@ -237,13 +236,13 @@ public class AdminUserServiceImpl implements AdminUserService { // 2.3 更新岗位 updateUserPost(updateReqVO, updateObj); - // 2.4 发布用户更新事件(重新查询获取完整数据) + // 3. 发送用户更新消息到MQ(供Databus消费转发) AdminUserDO updatedUser = userMapper.selectById(updateObj.getId()); if (updatedUser != null) { databusChangeProducer.sendUserUpdatedMessage(updatedUser); } - // 3. 记录操作日志上下文 + // 4. 记录操作日志上下文 LogRecordContext.putVariable(DiffParseFunction.OLD_OBJECT, BeanUtils.toBean(oldUser, UserSaveReqVO.class)); LogRecordContext.putVariable("user", oldUser); } @@ -332,10 +331,10 @@ public class AdminUserServiceImpl implements AdminUserService { // 2.2 删除用户岗位 userPostMapper.deleteByUserId(id); - // 2.3 发布用户删除事件 + // 3. 发送用户删除消息到MQ(供Databus消费转发) databusChangeProducer.sendUserDeletedMessage(id, user.getTenantId()); - // 3. 记录操作日志上下文 + // 4. 记录操作日志上下文 LogRecordContext.putVariable("user", user); } diff --git a/zt-module-system/zt-module-system-server/src/main/java/com/zt/plat/module/system/service/userdept/UserDeptServiceImpl.java b/zt-module-system/zt-module-system-server/src/main/java/com/zt/plat/module/system/service/userdept/UserDeptServiceImpl.java index f7e4943f..3b699c91 100644 --- a/zt-module-system/zt-module-system-server/src/main/java/com/zt/plat/module/system/service/userdept/UserDeptServiceImpl.java +++ b/zt-module-system/zt-module-system-server/src/main/java/com/zt/plat/module/system/service/userdept/UserDeptServiceImpl.java @@ -6,6 +6,7 @@ import com.zt.plat.framework.security.core.LoginUser; import com.zt.plat.module.system.dal.dataobject.userdept.UserDeptDO; import com.zt.plat.module.system.dal.mysql.userdept.UserDeptMapper; import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import org.springframework.validation.annotation.Validated; @@ -23,18 +24,29 @@ import static com.zt.plat.module.system.enums.ErrorCodeConstants.USER_DEPT_NOT_E * * @author 管理员 */ +@Slf4j @Service @Validated public class UserDeptServiceImpl implements UserDeptService { @Resource private UserDeptMapper userDeptMapper; + @Resource + private com.zt.plat.module.system.mq.producer.databus.DatabusUserDeptChangeProducer databusUserDeptChangeProducer; @Override public Long createUserDept(UserDeptDO createReqVO) { // 插入 UserDeptDO userDept = BeanUtils.toBean(createReqVO, UserDeptDO.class); userDeptMapper.insert(userDept); + + // 发送用户-部门关系创建消息 + try { + databusUserDeptChangeProducer.sendUserDeptCreatedMessage(userDept); + } catch (Exception e) { + log.error("[createUserDept] 发送用户-部门关系创建消息失败", e); + } + // 返回 return userDept.getId(); } @@ -46,14 +58,34 @@ public class UserDeptServiceImpl implements UserDeptService { // 更新 UserDeptDO updateObj = BeanUtils.toBean(updateReqVO, UserDeptDO.class); userDeptMapper.updateById(updateObj); + + // 发送用户-部门关系更新消息 + try { + databusUserDeptChangeProducer.sendUserDeptUpdatedMessage(updateObj); + } catch (Exception e) { + log.error("[updateUserDept] 发送用户-部门关系更新消息失败", e); + } } @Override public void deleteUserDept(Long id) { // 校验存在 validateUserDeptExists(id); + + // 查询完整对象(删除前查询,删除后无法查询) + UserDeptDO userDept = userDeptMapper.selectById(id); + // 删除 userDeptMapper.deleteById(id); + + // 发送用户-部门关系删除消息 + if (userDept != null) { + try { + databusUserDeptChangeProducer.sendUserDeptDeletedMessage(userDept); + } catch (Exception e) { + log.error("[deleteUserDept] 发送用户-部门关系删除消息失败", e); + } + } } @Override @@ -111,6 +143,16 @@ public class UserDeptServiceImpl implements UserDeptService { Long tenantId = Optional.ofNullable(getLoginUser()).orElse(new LoginUser()).getTenantId(); list.forEach(item -> item.setTenantId(tenantId)); userDeptMapper.insertBatch(list); + + // 发送用户-部门关系创建消息(为每个关系发送) + for (UserDeptDO userDept : list) { + try { + databusUserDeptChangeProducer.sendUserDeptCreatedMessage(userDept); + } catch (Exception e) { + log.error("[batchCreateUserDept] 发送用户-部门关系创建消息失败, userId={}, deptId={}", + userDept.getUserId(), userDept.getDeptId(), e); + } + } } } \ No newline at end of file