From a57d05ccd6f23d05ca507b0a2f349131ea65a760 Mon Sep 17 00:00:00 2001 From: hewencai <2357300448@qq.com> Date: Mon, 15 Dec 2025 20:04:43 +0800 Subject: [PATCH 1/6] =?UTF-8?q?update=EF=BC=9A=E6=95=B0=E6=8D=AE=E5=90=8C?= =?UTF-8?q?=E6=AD=A5=E7=BB=9F=E4=B8=80=E4=BD=BF=E7=94=A8=E6=9E=9A=E4=B8=BE?= =?UTF-8?q?=E7=B1=BB?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../databus/client/core/listener/DatabusConsumerRegistry.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/core/listener/DatabusConsumerRegistry.java b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/core/listener/DatabusConsumerRegistry.java index bf1ba313..a38a9fe9 100644 --- a/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/core/listener/DatabusConsumerRegistry.java +++ b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/core/listener/DatabusConsumerRegistry.java @@ -106,7 +106,7 @@ public class DatabusConsumerRegistry { for (MessageExt msg : msgs) { try { String messageBody = new String(msg.getBody(), StandardCharsets.UTF_8); - log.debug("[Databus Client] 收到消息, topic={}, msgId={}, eventType={}", + log.info("[Databus Client] 收到消息, topic={}, msgId={}, eventType={}", msg.getTopic(), msg.getMsgId(), eventType.name()); messageProcessor.process(messageBody, eventType); From 8782631eaa1e84fba53295a40449dca5532eda51 Mon Sep 17 00:00:00 2001 From: hewencai <2357300448@qq.com> Date: Tue, 16 Dec 2025 12:03:15 +0800 Subject: [PATCH 2/6] =?UTF-8?q?update=EF=BC=9A=E6=95=B0=E6=8D=AE=E5=90=8C?= =?UTF-8?q?=E6=AD=A5=E5=88=86=E5=8F=91=E6=9C=BA=E6=9E=84=E5=B2=97=E4=BD=8D?= =?UTF-8?q?=E7=BB=91=E5=AE=9A=E5=85=B3=E7=B3=BB?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../listener/DatabusConsumerRegistry.java | 148 -------------- .../client/core/registry/HandlerRegistry.java | 12 +- .../client/handler/SyncEventHandler.java | 31 ++- .../handler/dept/SystemDeptCreateHandler.java | 50 ----- .../handler/dept/SystemDeptDeleteHandler.java | 49 ----- .../handler/dept/SystemDeptHandler.java | 81 ++++++++ .../handler/dept/SystemDeptUpdateHandler.java | 49 ----- .../handler/post/SystemPostCreateHandler.java | 50 ----- .../handler/post/SystemPostDeleteHandler.java | 49 ----- .../handler/post/SystemPostHandler.java | 81 ++++++++ .../handler/post/SystemPostUpdateHandler.java | 50 ----- .../handler/user/SystemUserCreateHandler.java | 49 ----- .../handler/user/SystemUserDeleteHandler.java | 49 ----- .../handler/user/SystemUserHandler.java | 81 ++++++++ .../handler/user/SystemUserUpdateHandler.java | 49 ----- .../userdept/SystemUserDeptFullHandler.java | 70 +++++++ .../userdept/SystemUserDeptHandler.java | 87 +++++++++ .../handler/userdept/UserDeptSyncService.java | 44 +++++ .../userdept/UserDeptSyncServiceImpl.java | 56 ++++++ .../userpost/SystemUserPostFullHandler.java | 70 +++++++ .../userpost/SystemUserPostHandler.java | 87 +++++++++ .../handler/userpost/UserPostSyncService.java | 44 +++++ .../userpost/UserPostSyncServiceImpl.java | 56 ++++++ .../databus/api/data/DatabusUserDeptData.java | 48 +++++ .../databus/api/data/DatabusUserPostData.java | 38 ++++ .../databus/enums/DatabusEventType.java | 183 ++++++++++++------ .../rpc/config/RpcConfiguration.java | 7 +- .../DatabusUserDeptChangeProducer.java | 110 +++++++++++ .../DatabusUserPostChangeProducer.java | 108 +++++++++++ .../service/user/AdminUserServiceImpl.java | 19 +- .../service/userdept/UserDeptServiceImpl.java | 42 ++++ 31 files changed, 1278 insertions(+), 669 deletions(-) delete mode 100644 zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/core/listener/DatabusConsumerRegistry.java delete mode 100644 zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/dept/SystemDeptCreateHandler.java delete mode 100644 zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/dept/SystemDeptDeleteHandler.java create mode 100644 zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/dept/SystemDeptHandler.java delete mode 100644 zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/dept/SystemDeptUpdateHandler.java delete mode 100644 zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/post/SystemPostCreateHandler.java delete mode 100644 zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/post/SystemPostDeleteHandler.java create mode 100644 zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/post/SystemPostHandler.java delete mode 100644 zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/post/SystemPostUpdateHandler.java delete mode 100644 zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/user/SystemUserCreateHandler.java delete mode 100644 zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/user/SystemUserDeleteHandler.java create mode 100644 zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/user/SystemUserHandler.java delete mode 100644 zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/user/SystemUserUpdateHandler.java create mode 100644 zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/userdept/SystemUserDeptFullHandler.java create mode 100644 zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/userdept/SystemUserDeptHandler.java create mode 100644 zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/userdept/UserDeptSyncService.java create mode 100644 zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/userdept/UserDeptSyncServiceImpl.java create mode 100644 zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/userpost/SystemUserPostFullHandler.java create mode 100644 zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/userpost/SystemUserPostHandler.java create mode 100644 zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/userpost/UserPostSyncService.java create mode 100644 zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/userpost/UserPostSyncServiceImpl.java create mode 100644 zt-module-databus/zt-module-databus-api/src/main/java/com/zt/plat/module/databus/api/data/DatabusUserDeptData.java create mode 100644 zt-module-databus/zt-module-databus-api/src/main/java/com/zt/plat/module/databus/api/data/DatabusUserPostData.java create mode 100644 zt-module-system/zt-module-system-server/src/main/java/com/zt/plat/module/system/mq/producer/databus/DatabusUserDeptChangeProducer.java create mode 100644 zt-module-system/zt-module-system-server/src/main/java/com/zt/plat/module/system/mq/producer/databus/DatabusUserPostChangeProducer.java diff --git a/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/core/listener/DatabusConsumerRegistry.java b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/core/listener/DatabusConsumerRegistry.java deleted file mode 100644 index a38a9fe9..00000000 --- a/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/core/listener/DatabusConsumerRegistry.java +++ /dev/null @@ -1,148 +0,0 @@ -package com.zt.plat.framework.databus.client.core.listener; - -import com.zt.plat.framework.databus.client.config.DatabusSyncClientProperties; -import com.zt.plat.framework.databus.client.core.processor.MessageProcessor; -import com.zt.plat.module.databus.enums.DatabusEventType; -import jakarta.annotation.PostConstruct; -import jakarta.annotation.PreDestroy; -import lombok.extern.slf4j.Slf4j; -import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; -import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; -import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; -import org.apache.rocketmq.client.exception.MQClientException; -import org.apache.rocketmq.common.message.MessageExt; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; -import org.springframework.stereotype.Component; - -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.List; - -/** - * Databus Consumer 注册器 - *

- * 根据 DatabusEventType 枚举自动为所有事件类型创建消费者 - * Topic格式: {topicBase}-{module}-{entity}-{action}-{clientCode} - * 例如: databus-sync-system-user-create-company-a - * - * @author ZT - */ -@Slf4j -@Component -@ConditionalOnProperty(prefix = "zt.databus.sync.client.mq", name = "enabled", havingValue = "true", matchIfMissing = true) -public class DatabusConsumerRegistry { - - private final MessageProcessor messageProcessor; - private final DatabusSyncClientProperties properties; - - - /** - * 管理的消费者列表 - */ - private final List consumers = new ArrayList<>(); - - public DatabusConsumerRegistry(MessageProcessor messageProcessor, DatabusSyncClientProperties properties) { - this.messageProcessor = messageProcessor; - this.properties = properties; - } - - @PostConstruct - public void init() { - if (!Boolean.TRUE.equals(properties.getEnabled())) { - log.info("[Databus Client] 客户端未启用,跳过初始化"); - return; - } - - String nameServer = properties.getMq().getNameServer(); - if (nameServer == null || nameServer.isEmpty()) { - log.warn("[Databus Client] RocketMQ nameServer未配置,跳过MQ消费者初始化"); - return; - } - - String clientCode = properties.getClientCode(); - if (clientCode == null || clientCode.isEmpty()) { - log.error("[Databus Client] clientCode未配置,无法订阅Topic"); - return; - } - - String topicBase = properties.getMq().getTopicBase(); - String consumerGroupPrefix = properties.getMq().getConsumerGroupPrefix(); - if (consumerGroupPrefix == null || consumerGroupPrefix.isEmpty()) { - consumerGroupPrefix = "databus-client-" + clientCode; - } - - // 为每个事件类型创建独立的消费者 - for (DatabusEventType eventType : DatabusEventType.values()) { - try { - createConsumer(eventType, topicBase, clientCode, consumerGroupPrefix, nameServer); - } catch (Exception e) { - log.error("[Databus Client] 创建消费者失败, eventType={}", eventType.name(), e); - } - } - - log.info("[Databus Client] 消费者注册完成,共创建 {} 个消费者", consumers.size()); - } - - /** - * 为指定事件类型创建消费者 - */ - private void createConsumer(DatabusEventType eventType, String topicBase, String clientCode, String consumerGroupPrefix, String nameServer) throws MQClientException { - // Topic: databus-sync-system-user-create-company-a - String topic = eventType.getTopic(topicBase, clientCode); - // ConsumerGroup: databus-client-company-a-system-user-create - String consumerGroup = String.format("%s-%s", consumerGroupPrefix, eventType.getTopicSuffix()); - - DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup); - consumer.setNamesrvAddr(nameServer); - consumer.setConsumeThreadMin(properties.getMq().getConsumeThreadMin()); - consumer.setConsumeThreadMax(properties.getMq().getConsumeThreadMax()); - consumer.setMaxReconsumeTimes(properties.getMq().getMaxReconsumeTimes()); - - // 订阅Topic - consumer.subscribe(topic, "*"); - - // 设置消息监听器 - consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> { - for (MessageExt msg : msgs) { - try { - String messageBody = new String(msg.getBody(), StandardCharsets.UTF_8); - log.info("[Databus Client] 收到消息, topic={}, msgId={}, eventType={}", - msg.getTopic(), msg.getMsgId(), eventType.name()); - - messageProcessor.process(messageBody, eventType); - - } catch (Exception e) { - log.error("[Databus Client] 消息处理失败, topic={}, msgId={}, reconsumeTimes={}", - msg.getTopic(), msg.getMsgId(), msg.getReconsumeTimes(), e); - - if (msg.getReconsumeTimes() >= properties.getMq().getMaxReconsumeTimes()) { - log.error("[Databus Client] 重试次数已达上限,放弃处理, msgId={}", msg.getMsgId()); - return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; - } - return ConsumeConcurrentlyStatus.RECONSUME_LATER; - } - } - return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; - }); - - consumer.start(); - consumers.add(consumer); - - log.info("[Databus Client] 消费者启动成功, topic={}, consumerGroup={}, event={}", - topic, consumerGroup, eventType.getName()); - } - - @PreDestroy - public void destroy() { - for (DefaultMQPushConsumer consumer : consumers) { - try { - consumer.shutdown(); - } catch (Exception e) { - log.error("[Databus Client] 关闭消费者失败", e); - } - } - consumers.clear(); - log.info("[Databus Client] 所有消费者已关闭"); - } - -} diff --git a/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/core/registry/HandlerRegistry.java b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/core/registry/HandlerRegistry.java index a1baa90d..1a6ad276 100644 --- a/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/core/registry/HandlerRegistry.java +++ b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/core/registry/HandlerRegistry.java @@ -66,13 +66,15 @@ public class HandlerRegistry { public void init() { log.info("[HandlerRegistry] 开始初始化 Handler 注册表..."); - // 注册增量同步 Handler + // 注册增量同步 Handler(支持一个Handler注册多个事件类型) if (syncEventHandlers != null) { for (SyncEventHandler handler : syncEventHandlers) { - DatabusEventType eventType = handler.getSupportedEventType(); - incrementalHandlers.put(eventType, handler); - log.info("[HandlerRegistry] 注册增量Handler: {} -> {}", - eventType, handler.getClass().getSimpleName()); + List eventTypes = handler.getSupportedEventTypes(); + for (DatabusEventType eventType : eventTypes) { + incrementalHandlers.put(eventType, handler); + log.info("[HandlerRegistry] 注册增量Handler: {} -> {}", + eventType, handler.getClass().getSimpleName()); + } } } diff --git a/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/SyncEventHandler.java b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/SyncEventHandler.java index f89906d0..573853da 100644 --- a/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/SyncEventHandler.java +++ b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/SyncEventHandler.java @@ -5,22 +5,49 @@ import com.zt.plat.module.databus.enums.DatabusEventType; import java.lang.reflect.ParameterizedType; import java.lang.reflect.Type; +import java.util.Collections; +import java.util.List; /** * 同步事件处理器接口 *

* 业务系统需要实现此接口来处理增量数据同步 + *

+ * 支持单事件和多事件处理: + * - 单事件:实现 getSupportedEventType() + * - 多事件:实现 getSupportedEventTypes()(推荐,一个Handler处理多个操作) * * @author ZT */ public interface SyncEventHandler { /** - * 获取支持的事件类型 + * 获取支持的事件类型(单个) + *

+ * 默认实现:返回 getSupportedEventTypes() 的第一个元素 + *

+ * 建议:新Handler优先实现 getSupportedEventTypes() * * @return 事件类型枚举 */ - DatabusEventType getSupportedEventType(); + default DatabusEventType getSupportedEventType() { + List types = getSupportedEventTypes(); + return types.isEmpty() ? null : types.get(0); + } + + /** + * 获取支持的事件类型列表(多个) + *

+ * 默认实现:返回包含 getSupportedEventType() 的单元素列表 + *

+ * 子类可以覆盖此方法返回多个事件类型,从而一个Handler处理多个操作 + * + * @return 事件类型列表 + */ + default List getSupportedEventTypes() { + DatabusEventType type = getSupportedEventType(); + return type == null ? Collections.emptyList() : Collections.singletonList(type); + } /** * 处理同步消息 diff --git a/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/dept/SystemDeptCreateHandler.java b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/dept/SystemDeptCreateHandler.java deleted file mode 100644 index 7bcad72f..00000000 --- a/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/dept/SystemDeptCreateHandler.java +++ /dev/null @@ -1,50 +0,0 @@ -package com.zt.plat.framework.databus.client.handler.dept; - -import com.zt.plat.framework.databus.client.handler.SyncEventHandler; -import com.zt.plat.module.databus.api.data.DatabusDeptData; -import com.zt.plat.module.databus.api.message.DatabusMessage; -import com.zt.plat.module.databus.enums.DatabusEventType; -import jakarta.annotation.Resource; -import lombok.extern.slf4j.Slf4j; -import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; -import org.springframework.stereotype.Component; - -/** - * 部门创建事件处理器 - *

- * 使用条件: - * 1. zt.databus.sync.client.enabled=true - * 2. 存在 DeptSyncService Bean - * - * @author ZT - */ -@Slf4j -@Component -@ConditionalOnProperty(prefix = "zt.databus.sync.client", name = "enabled", havingValue = "true") -@ConditionalOnBean(DeptSyncService.class) -public class SystemDeptCreateHandler implements SyncEventHandler { - - @Resource - private DeptSyncService deptSyncService; - - @Override - public DatabusEventType getSupportedEventType() { - return DatabusEventType.SYSTEM_DEPT_CREATE; - } - - @Override - public void handle(DatabusMessage message) { - DatabusDeptData data = message.getData(); - log.info("[DeptSync] 收到部门创建事件, id={}, name={}, parentId={}", - data.getId(), data.getName(), data.getParentId()); - - try { - deptSyncService.create(data); - log.info("[DeptSync] 部门创建成功, id={}", data.getId()); - } catch (Exception e) { - log.error("[DeptSync] 部门创建失败, id={}", data.getId(), e); - throw e; // 抛出异常触发重试 - } - } -} diff --git a/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/dept/SystemDeptDeleteHandler.java b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/dept/SystemDeptDeleteHandler.java deleted file mode 100644 index 4ac8c687..00000000 --- a/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/dept/SystemDeptDeleteHandler.java +++ /dev/null @@ -1,49 +0,0 @@ -package com.zt.plat.framework.databus.client.handler.dept; - -import com.zt.plat.framework.databus.client.handler.SyncEventHandler; -import com.zt.plat.module.databus.api.data.DatabusDeptData; -import com.zt.plat.module.databus.api.message.DatabusMessage; -import com.zt.plat.module.databus.enums.DatabusEventType; -import jakarta.annotation.Resource; -import lombok.extern.slf4j.Slf4j; -import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; -import org.springframework.stereotype.Component; - -/** - * 部门删除事件处理器 - *

- * 使用条件: - * 1. zt.databus.sync.client.enabled=true - * 2. 存在 DeptSyncService Bean - * - * @author ZT - */ -@Slf4j -@Component -@ConditionalOnProperty(prefix = "zt.databus.sync.client", name = "enabled", havingValue = "true") -@ConditionalOnBean(DeptSyncService.class) -public class SystemDeptDeleteHandler implements SyncEventHandler { - - @Resource - private DeptSyncService deptSyncService; - - @Override - public DatabusEventType getSupportedEventType() { - return DatabusEventType.SYSTEM_DEPT_DELETE; - } - - @Override - public void handle(DatabusMessage message) { - DatabusDeptData data = message.getData(); - log.info("[DeptSync] 收到部门删除事件, id={}", data.getId()); - - try { - deptSyncService.delete(data.getId()); - log.info("[DeptSync] 部门删除成功, id={}", data.getId()); - } catch (Exception e) { - log.error("[DeptSync] 部门删除失败, id={}", data.getId(), e); - throw e; // 抛出异常触发重试 - } - } -} diff --git a/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/dept/SystemDeptHandler.java b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/dept/SystemDeptHandler.java new file mode 100644 index 00000000..cc1e8ab6 --- /dev/null +++ b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/dept/SystemDeptHandler.java @@ -0,0 +1,81 @@ +package com.zt.plat.framework.databus.client.handler.dept; + +import com.zt.plat.framework.databus.client.handler.SyncEventHandler; +import com.zt.plat.module.databus.api.data.DatabusDeptData; +import com.zt.plat.module.databus.api.message.DatabusMessage; +import com.zt.plat.module.databus.enums.DatabusEventType; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.stereotype.Component; + +import java.util.Arrays; +import java.util.List; + +/** + * 部门增量同步事件处理器(合并版) + *

+ * 处理事件类型: + * - SYSTEM_DEPT_CREATE:部门创建 + * - SYSTEM_DEPT_UPDATE:部门更新 + * - SYSTEM_DEPT_DELETE:部门删除 + *

+ * 使用条件: + * 1. zt.databus.sync.client.enabled=true + * 2. 存在 DeptSyncService Bean + * + * @author ZT + */ +@Slf4j +@Component +@ConditionalOnProperty(prefix = "zt.databus.sync.client", name = "enabled", havingValue = "true") +@ConditionalOnBean(DeptSyncService.class) +public class SystemDeptHandler implements SyncEventHandler { + + @Resource + private DeptSyncService deptSyncService; + + @Override + public List getSupportedEventTypes() { + return Arrays.asList( + DatabusEventType.SYSTEM_DEPT_CREATE, + DatabusEventType.SYSTEM_DEPT_UPDATE, + DatabusEventType.SYSTEM_DEPT_DELETE + ); + } + + @Override + public void handle(DatabusMessage message) { + DatabusDeptData data = message.getData(); + DatabusEventType eventType = message.getEventType(); + + log.info("[DeptSync] 收到部门{}事件, id={}, name={}, parentId={}", + eventType.getAction(), data.getId(), data.getName(), data.getParentId()); + + try { + switch (eventType) { + case SYSTEM_DEPT_CREATE: + deptSyncService.create(data); + log.info("[DeptSync] 部门创建成功, id={}", data.getId()); + break; + + case SYSTEM_DEPT_UPDATE: + deptSyncService.update(data); + log.info("[DeptSync] 部门更新成功, id={}", data.getId()); + break; + + case SYSTEM_DEPT_DELETE: + deptSyncService.delete(data.getId()); + log.info("[DeptSync] 部门删除成功, id={}", data.getId()); + break; + + default: + log.warn("[DeptSync] 未知事件类型: {}", eventType); + } + } catch (Exception e) { + log.error("[DeptSync] 部门{}失败, id={}", eventType.getAction(), data.getId(), e); + throw e; // 抛出异常触发重试 + } + } +} diff --git a/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/dept/SystemDeptUpdateHandler.java b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/dept/SystemDeptUpdateHandler.java deleted file mode 100644 index 8e7f68ed..00000000 --- a/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/dept/SystemDeptUpdateHandler.java +++ /dev/null @@ -1,49 +0,0 @@ -package com.zt.plat.framework.databus.client.handler.dept; - -import com.zt.plat.framework.databus.client.handler.SyncEventHandler; -import com.zt.plat.module.databus.api.data.DatabusDeptData; -import com.zt.plat.module.databus.api.message.DatabusMessage; -import com.zt.plat.module.databus.enums.DatabusEventType; -import jakarta.annotation.Resource; -import lombok.extern.slf4j.Slf4j; -import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; -import org.springframework.stereotype.Component; - -/** - * 部门更新事件处理器 - *

- * 使用条件: - * 1. zt.databus.sync.client.enabled=true - * 2. 存在 DeptSyncService Bean - * - * @author ZT - */ -@Slf4j -@Component -@ConditionalOnProperty(prefix = "zt.databus.sync.client", name = "enabled", havingValue = "true") -@ConditionalOnBean(DeptSyncService.class) -public class SystemDeptUpdateHandler implements SyncEventHandler { - - @Resource - private DeptSyncService deptSyncService; - - @Override - public DatabusEventType getSupportedEventType() { - return DatabusEventType.SYSTEM_DEPT_UPDATE; - } - - @Override - public void handle(DatabusMessage message) { - DatabusDeptData data = message.getData(); - log.info("[DeptSync] 收到部门更新事件, id={}, name={}", data.getId(), data.getName()); - - try { - deptSyncService.update(data); - log.info("[DeptSync] 部门更新成功, id={}", data.getId()); - } catch (Exception e) { - log.error("[DeptSync] 部门更新失败, id={}", data.getId(), e); - throw e; // 抛出异常触发重试 - } - } -} diff --git a/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/post/SystemPostCreateHandler.java b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/post/SystemPostCreateHandler.java deleted file mode 100644 index 98eb55df..00000000 --- a/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/post/SystemPostCreateHandler.java +++ /dev/null @@ -1,50 +0,0 @@ -package com.zt.plat.framework.databus.client.handler.post; - -import com.zt.plat.framework.databus.client.handler.SyncEventHandler; -import com.zt.plat.module.databus.api.data.DatabusPostData; -import com.zt.plat.module.databus.api.message.DatabusMessage; -import com.zt.plat.module.databus.enums.DatabusEventType; -import jakarta.annotation.Resource; -import lombok.extern.slf4j.Slf4j; -import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; -import org.springframework.stereotype.Component; - -/** - * 岗位创建事件处理器 - *

- * 使用条件: - * 1. zt.databus.sync.client.enabled=true - * 2. 存在 PostSyncService Bean - * - * @author ZT - */ -@Slf4j -@Component -@ConditionalOnProperty(prefix = "zt.databus.sync.client", name = "enabled", havingValue = "true") -@ConditionalOnBean(PostSyncService.class) -public class SystemPostCreateHandler implements SyncEventHandler { - - @Resource - private PostSyncService postSyncService; - - @Override - public DatabusEventType getSupportedEventType() { - return DatabusEventType.SYSTEM_POST_CREATE; - } - - @Override - public void handle(DatabusMessage message) { - DatabusPostData data = message.getData(); - log.info("[PostSync] 收到岗位创建事件, id={}, name={}, code={}", - data.getId(), data.getName(), data.getCode()); - - try { - postSyncService.create(data); - log.info("[PostSync] 岗位创建成功, id={}", data.getId()); - } catch (Exception e) { - log.error("[PostSync] 岗位创建失败, id={}", data.getId(), e); - throw e; // 抛出异常触发重试 - } - } -} diff --git a/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/post/SystemPostDeleteHandler.java b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/post/SystemPostDeleteHandler.java deleted file mode 100644 index 645648a2..00000000 --- a/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/post/SystemPostDeleteHandler.java +++ /dev/null @@ -1,49 +0,0 @@ -package com.zt.plat.framework.databus.client.handler.post; - -import com.zt.plat.framework.databus.client.handler.SyncEventHandler; -import com.zt.plat.module.databus.api.data.DatabusPostData; -import com.zt.plat.module.databus.api.message.DatabusMessage; -import com.zt.plat.module.databus.enums.DatabusEventType; -import jakarta.annotation.Resource; -import lombok.extern.slf4j.Slf4j; -import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; -import org.springframework.stereotype.Component; - -/** - * 岗位删除事件处理器 - *

- * 使用条件: - * 1. zt.databus.sync.client.enabled=true - * 2. 存在 PostSyncService Bean - * - * @author ZT - */ -@Slf4j -@Component -@ConditionalOnProperty(prefix = "zt.databus.sync.client", name = "enabled", havingValue = "true") -@ConditionalOnBean(PostSyncService.class) -public class SystemPostDeleteHandler implements SyncEventHandler { - - @Resource - private PostSyncService postSyncService; - - @Override - public DatabusEventType getSupportedEventType() { - return DatabusEventType.SYSTEM_POST_DELETE; - } - - @Override - public void handle(DatabusMessage message) { - DatabusPostData data = message.getData(); - log.info("[PostSync] 收到岗位删除事件, id={}", data.getId()); - - try { - postSyncService.delete(data.getId()); - log.info("[PostSync] 岗位删除成功, id={}", data.getId()); - } catch (Exception e) { - log.error("[PostSync] 岗位删除失败, id={}", data.getId(), e); - throw e; // 抛出异常触发重试 - } - } -} diff --git a/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/post/SystemPostHandler.java b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/post/SystemPostHandler.java new file mode 100644 index 00000000..c7a4e787 --- /dev/null +++ b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/post/SystemPostHandler.java @@ -0,0 +1,81 @@ +package com.zt.plat.framework.databus.client.handler.post; + +import com.zt.plat.framework.databus.client.handler.SyncEventHandler; +import com.zt.plat.module.databus.api.data.DatabusPostData; +import com.zt.plat.module.databus.api.message.DatabusMessage; +import com.zt.plat.module.databus.enums.DatabusEventType; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.stereotype.Component; + +import java.util.Arrays; +import java.util.List; + +/** + * 岗位增量同步事件处理器(合并版) + *

+ * 处理事件类型: + * - SYSTEM_POST_CREATE:岗位创建 + * - SYSTEM_POST_UPDATE:岗位更新 + * - SYSTEM_POST_DELETE:岗位删除 + *

+ * 使用条件: + * 1. zt.databus.sync.client.enabled=true + * 2. 存在 PostSyncService Bean + * + * @author ZT + */ +@Slf4j +@Component +@ConditionalOnProperty(prefix = "zt.databus.sync.client", name = "enabled", havingValue = "true") +@ConditionalOnBean(PostSyncService.class) +public class SystemPostHandler implements SyncEventHandler { + + @Resource + private PostSyncService postSyncService; + + @Override + public List getSupportedEventTypes() { + return Arrays.asList( + DatabusEventType.SYSTEM_POST_CREATE, + DatabusEventType.SYSTEM_POST_UPDATE, + DatabusEventType.SYSTEM_POST_DELETE + ); + } + + @Override + public void handle(DatabusMessage message) { + DatabusPostData data = message.getData(); + DatabusEventType eventType = message.getEventType(); + + log.info("[PostSync] 收到岗位{}事件, id={}, name={}, code={}", + eventType.getAction(), data.getId(), data.getName(), data.getCode()); + + try { + switch (eventType) { + case SYSTEM_POST_CREATE: + postSyncService.create(data); + log.info("[PostSync] 岗位创建成功, id={}", data.getId()); + break; + + case SYSTEM_POST_UPDATE: + postSyncService.update(data); + log.info("[PostSync] 岗位更新成功, id={}", data.getId()); + break; + + case SYSTEM_POST_DELETE: + postSyncService.delete(data.getId()); + log.info("[PostSync] 岗位删除成功, id={}", data.getId()); + break; + + default: + log.warn("[PostSync] 未知事件类型: {}", eventType); + } + } catch (Exception e) { + log.error("[PostSync] 岗位{}失败, id={}", eventType.getAction(), data.getId(), e); + throw e; // 抛出异常触发重试 + } + } +} diff --git a/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/post/SystemPostUpdateHandler.java b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/post/SystemPostUpdateHandler.java deleted file mode 100644 index e8d40e82..00000000 --- a/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/post/SystemPostUpdateHandler.java +++ /dev/null @@ -1,50 +0,0 @@ -package com.zt.plat.framework.databus.client.handler.post; - -import com.zt.plat.framework.databus.client.handler.SyncEventHandler; -import com.zt.plat.module.databus.api.data.DatabusPostData; -import com.zt.plat.module.databus.api.message.DatabusMessage; -import com.zt.plat.module.databus.enums.DatabusEventType; -import jakarta.annotation.Resource; -import lombok.extern.slf4j.Slf4j; -import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; -import org.springframework.stereotype.Component; - -/** - * 岗位更新事件处理器 - *

- * 使用条件: - * 1. zt.databus.sync.client.enabled=true - * 2. 存在 PostSyncService Bean - * - * @author ZT - */ -@Slf4j -@Component -@ConditionalOnProperty(prefix = "zt.databus.sync.client", name = "enabled", havingValue = "true") -@ConditionalOnBean(PostSyncService.class) -public class SystemPostUpdateHandler implements SyncEventHandler { - - @Resource - private PostSyncService postSyncService; - - @Override - public DatabusEventType getSupportedEventType() { - return DatabusEventType.SYSTEM_POST_UPDATE; - } - - @Override - public void handle(DatabusMessage message) { - DatabusPostData data = message.getData(); - log.info("[PostSync] 收到岗位更新事件, id={}, name={}, code={}", - data.getId(), data.getName(), data.getCode()); - - try { - postSyncService.update(data); - log.info("[PostSync] 岗位更新成功, id={}", data.getId()); - } catch (Exception e) { - log.error("[PostSync] 岗位更新失败, id={}", data.getId(), e); - throw e; // 抛出异常触发重试 - } - } -} diff --git a/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/user/SystemUserCreateHandler.java b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/user/SystemUserCreateHandler.java deleted file mode 100644 index 875043e7..00000000 --- a/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/user/SystemUserCreateHandler.java +++ /dev/null @@ -1,49 +0,0 @@ -package com.zt.plat.framework.databus.client.handler.user; - -import com.zt.plat.framework.databus.client.handler.SyncEventHandler; -import com.zt.plat.module.databus.api.data.DatabusAdminUserData; -import com.zt.plat.module.databus.api.message.DatabusMessage; -import com.zt.plat.module.databus.enums.DatabusEventType; -import jakarta.annotation.Resource; -import lombok.extern.slf4j.Slf4j; -import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; -import org.springframework.stereotype.Component; - -/** - * 用户创建事件处理器 - *

- * 使用条件: - * 1. zt.databus.sync.client.enabled=true - * 2. 存在 AdminUserSyncService Bean - * - * @author ZT - */ -@Slf4j -@Component -@ConditionalOnProperty(prefix = "zt.databus.sync.client", name = "enabled", havingValue = "true") -@ConditionalOnBean(AdminUserSyncService.class) -public class SystemUserCreateHandler implements SyncEventHandler { - - @Resource - private AdminUserSyncService adminUserSyncService; - - @Override - public DatabusEventType getSupportedEventType() { - return DatabusEventType.SYSTEM_USER_CREATE; - } - - @Override - public void handle(DatabusMessage message) { - DatabusAdminUserData data = message.getData(); - log.info("[UserSync] 收到用户创建事件, id={}, username={}", data.getId(), data.getUsername()); - - try { - adminUserSyncService.create(data); - log.info("[UserSync] 用户创建成功, id={}", data.getId()); - } catch (Exception e) { - log.error("[UserSync] 用户创建失败, id={}", data.getId(), e); - throw e; // 抛出异常触发重试 - } - } -} diff --git a/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/user/SystemUserDeleteHandler.java b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/user/SystemUserDeleteHandler.java deleted file mode 100644 index 139286ae..00000000 --- a/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/user/SystemUserDeleteHandler.java +++ /dev/null @@ -1,49 +0,0 @@ -package com.zt.plat.framework.databus.client.handler.user; - -import com.zt.plat.framework.databus.client.handler.SyncEventHandler; -import com.zt.plat.module.databus.api.data.DatabusAdminUserData; -import com.zt.plat.module.databus.api.message.DatabusMessage; -import com.zt.plat.module.databus.enums.DatabusEventType; -import jakarta.annotation.Resource; -import lombok.extern.slf4j.Slf4j; -import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; -import org.springframework.stereotype.Component; - -/** - * 用户删除事件处理器 - *

- * 使用条件: - * 1. zt.databus.sync.client.enabled=true - * 2. 存在 AdminUserSyncService Bean - * - * @author ZT - */ -@Slf4j -@Component -@ConditionalOnProperty(prefix = "zt.databus.sync.client", name = "enabled", havingValue = "true") -@ConditionalOnBean(AdminUserSyncService.class) -public class SystemUserDeleteHandler implements SyncEventHandler { - - @Resource - private AdminUserSyncService adminUserSyncService; - - @Override - public DatabusEventType getSupportedEventType() { - return DatabusEventType.SYSTEM_USER_DELETE; - } - - @Override - public void handle(DatabusMessage message) { - DatabusAdminUserData data = message.getData(); - log.info("[UserSync] 收到用户删除事件, id={}", data.getId()); - - try { - adminUserSyncService.delete(data.getId()); - log.info("[UserSync] 用户删除成功, id={}", data.getId()); - } catch (Exception e) { - log.error("[UserSync] 用户删除失败, id={}", data.getId(), e); - throw e; // 抛出异常触发重试 - } - } -} diff --git a/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/user/SystemUserHandler.java b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/user/SystemUserHandler.java new file mode 100644 index 00000000..3af1fd2a --- /dev/null +++ b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/user/SystemUserHandler.java @@ -0,0 +1,81 @@ +package com.zt.plat.framework.databus.client.handler.user; + +import com.zt.plat.framework.databus.client.handler.SyncEventHandler; +import com.zt.plat.module.databus.api.data.DatabusAdminUserData; +import com.zt.plat.module.databus.api.message.DatabusMessage; +import com.zt.plat.module.databus.enums.DatabusEventType; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.stereotype.Component; + +import java.util.Arrays; +import java.util.List; + +/** + * 用户增量同步事件处理器(合并版) + *

+ * 处理事件类型: + * - SYSTEM_USER_CREATE:用户创建 + * - SYSTEM_USER_UPDATE:用户更新 + * - SYSTEM_USER_DELETE:用户删除 + *

+ * 使用条件: + * 1. zt.databus.sync.client.enabled=true + * 2. 存在 AdminUserSyncService Bean + * + * @author ZT + */ +@Slf4j +@Component +@ConditionalOnProperty(prefix = "zt.databus.sync.client", name = "enabled", havingValue = "true") +@ConditionalOnBean(AdminUserSyncService.class) +public class SystemUserHandler implements SyncEventHandler { + + @Resource + private AdminUserSyncService adminUserSyncService; + + @Override + public List getSupportedEventTypes() { + return Arrays.asList( + DatabusEventType.SYSTEM_USER_CREATE, + DatabusEventType.SYSTEM_USER_UPDATE, + DatabusEventType.SYSTEM_USER_DELETE + ); + } + + @Override + public void handle(DatabusMessage message) { + DatabusAdminUserData data = message.getData(); + DatabusEventType eventType = message.getEventType(); + + log.info("[UserSync] 收到用户{}事件, id={}, username={}, nickname={}", + eventType.getAction(), data.getId(), data.getUsername(), data.getNickname()); + + try { + switch (eventType) { + case SYSTEM_USER_CREATE: + adminUserSyncService.create(data); + log.info("[UserSync] 用户创建成功, id={}", data.getId()); + break; + + case SYSTEM_USER_UPDATE: + adminUserSyncService.update(data); + log.info("[UserSync] 用户更新成功, id={}", data.getId()); + break; + + case SYSTEM_USER_DELETE: + adminUserSyncService.delete(data.getId()); + log.info("[UserSync] 用户删除成功, id={}", data.getId()); + break; + + default: + log.warn("[UserSync] 未知事件类型: {}", eventType); + } + } catch (Exception e) { + log.error("[UserSync] 用户{}失败, id={}", eventType.getAction(), data.getId(), e); + throw e; // 抛出异常触发重试 + } + } +} diff --git a/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/user/SystemUserUpdateHandler.java b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/user/SystemUserUpdateHandler.java deleted file mode 100644 index 46704f92..00000000 --- a/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/user/SystemUserUpdateHandler.java +++ /dev/null @@ -1,49 +0,0 @@ -package com.zt.plat.framework.databus.client.handler.user; - -import com.zt.plat.framework.databus.client.handler.SyncEventHandler; -import com.zt.plat.module.databus.api.data.DatabusAdminUserData; -import com.zt.plat.module.databus.api.message.DatabusMessage; -import com.zt.plat.module.databus.enums.DatabusEventType; -import jakarta.annotation.Resource; -import lombok.extern.slf4j.Slf4j; -import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; -import org.springframework.stereotype.Component; - -/** - * 用户更新事件处理器 - *

- * 使用条件: - * 1. zt.databus.sync.client.enabled=true - * 2. 存在 AdminUserSyncService Bean - * - * @author ZT - */ -@Slf4j -@Component -@ConditionalOnProperty(prefix = "zt.databus.sync.client", name = "enabled", havingValue = "true") -@ConditionalOnBean(AdminUserSyncService.class) -public class SystemUserUpdateHandler implements SyncEventHandler { - - @Resource - private AdminUserSyncService adminUserSyncService; - - @Override - public DatabusEventType getSupportedEventType() { - return DatabusEventType.SYSTEM_USER_UPDATE; - } - - @Override - public void handle(DatabusMessage message) { - DatabusAdminUserData data = message.getData(); - log.info("[UserSync] 收到用户更新事件, id={}, username={}", data.getId(), data.getUsername()); - - try { - adminUserSyncService.update(data); - log.info("[UserSync] 用户更新成功, id={}", data.getId()); - } catch (Exception e) { - log.error("[UserSync] 用户更新失败, id={}", data.getId(), e); - throw e; // 抛出异常触发重试 - } - } -} diff --git a/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/userdept/SystemUserDeptFullHandler.java b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/userdept/SystemUserDeptFullHandler.java new file mode 100644 index 00000000..42934904 --- /dev/null +++ b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/userdept/SystemUserDeptFullHandler.java @@ -0,0 +1,70 @@ +package com.zt.plat.framework.databus.client.handler.userdept; + +import com.zt.plat.framework.databus.client.handler.BatchSyncEventHandler; +import com.zt.plat.module.databus.api.data.DatabusUserDeptData; +import com.zt.plat.module.databus.api.message.DatabusBatchMessage; +import com.zt.plat.module.databus.enums.DatabusEventType; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.stereotype.Component; + +/** + * 用户-部门关系全量同步事件处理器(批量处理) + *

+ * 使用条件: + * 1. zt.databus.sync.client.enabled=true + * 2. 存在 UserDeptSyncService Bean + * + * @author ZT + */ +@Slf4j +@Component +@ConditionalOnProperty(prefix = "zt.databus.sync.client", name = "enabled", havingValue = "true") +@ConditionalOnBean(UserDeptSyncService.class) +public class SystemUserDeptFullHandler implements BatchSyncEventHandler { + + @Resource + private UserDeptSyncService userDeptSyncService; + + @Override + public DatabusEventType getSupportedEventType() { + return DatabusEventType.SYSTEM_USER_DEPT_FULL; + } + + @Override + public void onFullSyncStart(DatabusBatchMessage message) { + log.info("[UserDeptSync] 开始用户-部门关系全量同步, taskId={}, totalBatch={}", + message.getTaskId(), message.getTotalBatch()); + } + + @Override + public void handleBatch(DatabusBatchMessage message) { + log.info("[UserDeptSync] 处理用户-部门关系批次数据, taskId={}, batchNo={}/{}, size={}", + message.getTaskId(), message.getBatchNo(), message.getTotalBatch(), + message.getDataList().size()); + + // 逐条处理全量同步数据 + for (DatabusUserDeptData data : message.getDataList()) { + try { + userDeptSyncService.fullSync(data); + log.debug("[UserDeptSync] 用户-部门关系全量同步成功, userId={}, deptId={}", + data.getUserId(), data.getDeptId()); + } catch (Exception e) { + log.error("[UserDeptSync] 用户-部门关系全量同步失败, userId={}, deptId={}", + data.getUserId(), data.getDeptId(), e); + // 单条失败不影响其他数据,继续处理 + } + } + + log.info("[UserDeptSync] 用户-部门关系批次处理完成, taskId={}, batchNo={}/{}", + message.getTaskId(), message.getBatchNo(), message.getTotalBatch()); + } + + @Override + public void onFullSyncComplete(DatabusBatchMessage message) { + log.info("[UserDeptSync] 用户-部门关系全量同步完成, taskId={}, totalBatch={}", + message.getTaskId(), message.getTotalBatch()); + } +} diff --git a/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/userdept/SystemUserDeptHandler.java b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/userdept/SystemUserDeptHandler.java new file mode 100644 index 00000000..9a81c18b --- /dev/null +++ b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/userdept/SystemUserDeptHandler.java @@ -0,0 +1,87 @@ +package com.zt.plat.framework.databus.client.handler.userdept; + +import com.zt.plat.framework.databus.client.handler.SyncEventHandler; +import com.zt.plat.module.databus.api.data.DatabusUserDeptData; +import com.zt.plat.module.databus.api.message.DatabusMessage; +import com.zt.plat.module.databus.enums.DatabusEventType; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.stereotype.Component; + +import java.util.Arrays; +import java.util.List; + +/** + * 用户-部门关系增量同步事件处理器(合并版) + *

+ * 处理事件类型: + * - SYSTEM_USER_DEPT_CREATE:用户-部门关系创建 + * - SYSTEM_USER_DEPT_UPDATE:用户-部门关系更新 + * - SYSTEM_USER_DEPT_DELETE:用户-部门关系删除 + *

+ * 使用条件: + * 1. zt.databus.sync.client.enabled=true + * 2. 存在 UserDeptSyncService Bean + * + * @author ZT + */ +@Slf4j +@Component +@ConditionalOnProperty(prefix = "zt.databus.sync.client", name = "enabled", havingValue = "true") +@ConditionalOnBean(UserDeptSyncService.class) +public class SystemUserDeptHandler implements SyncEventHandler { + + @Resource + private UserDeptSyncService userDeptSyncService; + + @Override + public List getSupportedEventTypes() { + return Arrays.asList( + DatabusEventType.SYSTEM_USER_DEPT_CREATE, + DatabusEventType.SYSTEM_USER_DEPT_UPDATE, + DatabusEventType.SYSTEM_USER_DEPT_DELETE + ); + } + + @Override + public void handle(DatabusMessage message) { + DatabusUserDeptData data = message.getData(); + DatabusEventType eventType = message.getEventType(); + + log.info("[UserDeptSync] 收到用户-部门关系{}事件, userId={}, deptId={}", + eventType.getAction(), data != null ? data.getUserId() : null, + data != null ? data.getDeptId() : null); + + try { + switch (eventType) { + case SYSTEM_USER_DEPT_CREATE: + userDeptSyncService.create(data); + log.info("[UserDeptSync] 用户-部门关系创建成功, userId={}, deptId={}", + data.getUserId(), data.getDeptId()); + break; + + case SYSTEM_USER_DEPT_UPDATE: + userDeptSyncService.update(data); + log.info("[UserDeptSync] 用户-部门关系更新成功, userId={}, deptId={}", + data.getUserId(), data.getDeptId()); + break; + + case SYSTEM_USER_DEPT_DELETE: + userDeptSyncService.delete(data.getId()); + log.info("[UserDeptSync] 用户-部门关系删除成功, id={}", data.getId()); + break; + + default: + log.warn("[UserDeptSync] 未知事件类型: {}", eventType); + } + } catch (Exception e) { + log.error("[UserDeptSync] 用户-部门关系{}失败, userId={}, deptId={}", + eventType.getAction(), + data != null ? data.getUserId() : null, + data != null ? data.getDeptId() : null, e); + throw e; // 抛出异常触发重试 + } + } +} diff --git a/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/userdept/UserDeptSyncService.java b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/userdept/UserDeptSyncService.java new file mode 100644 index 00000000..26023a40 --- /dev/null +++ b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/userdept/UserDeptSyncService.java @@ -0,0 +1,44 @@ +package com.zt.plat.framework.databus.client.handler.userdept; + +import com.zt.plat.module.databus.api.data.DatabusUserDeptData; + +/** + * 用户-部门关系同步服务接口 + *

+ * 分公司需要实现此接口,完成数据的本地持久化 + * 或通过默认实现 {@link UserDeptSyncServiceImpl} 使用 Feign 调用远程 API + * + * @author ZT + */ +public interface UserDeptSyncService { + + /** + * 创建用户-部门关系(增量同步) + * + * @param data 用户-部门关系数据 + */ + void create(DatabusUserDeptData data); + + /** + * 更新用户-部门关系(增量同步) + * + * @param data 用户-部门关系数据 + */ + void update(DatabusUserDeptData data); + + /** + * 删除用户-部门关系(增量同步) + * + * @param id 关系ID + */ + void delete(Long id); + + /** + * 全量同步单条数据 + *

+ * 逻辑:存在则更新,不存在则插入 + * + * @param data 用户-部门关系数据 + */ + void fullSync(DatabusUserDeptData data); +} diff --git a/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/userdept/UserDeptSyncServiceImpl.java b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/userdept/UserDeptSyncServiceImpl.java new file mode 100644 index 00000000..9027e379 --- /dev/null +++ b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/userdept/UserDeptSyncServiceImpl.java @@ -0,0 +1,56 @@ +package com.zt.plat.framework.databus.client.handler.userdept; + +import com.zt.plat.module.databus.api.data.DatabusUserDeptData; +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.stereotype.Service; + +/** + * 用户-部门关系同步服务实现 + *

+ * 使用条件: + * 1. zt.databus.sync.client.enabled=true + *

+ * 注意:由于用户-部门关系通常集成在用户管理中,此实现为占位符。 + * 分公司可以根据实际情况: + * 1. 自定义实现此接口,直接操作本地数据库 + * 2. 或者通过用户管理 API 间接处理关联关系 + * + * @author ZT + */ +@Slf4j +@Service +@ConditionalOnProperty(prefix = "zt.databus.sync.client", name = "enabled", havingValue = "true") +public class UserDeptSyncServiceImpl implements UserDeptSyncService { + + @Override + public void create(DatabusUserDeptData data) { + log.info("[UserDeptSync] 收到创建用户-部门关系请求, userId={}, deptId={}", + data.getUserId(), data.getDeptId()); + log.warn("[UserDeptSync] 用户-部门关系同步服务需要分公司自定义实现,当前为占位符实现"); + // TODO: 分公司需要实现此方法,通过本地 API 或直接数据库操作完成同步 + } + + @Override + public void update(DatabusUserDeptData data) { + log.info("[UserDeptSync] 收到更新用户-部门关系请求, userId={}, deptId={}", + data.getUserId(), data.getDeptId()); + log.warn("[UserDeptSync] 用户-部门关系同步服务需要分公司自定义实现,当前为占位符实现"); + // TODO: 分公司需要实现此方法 + } + + @Override + public void delete(Long id) { + log.info("[UserDeptSync] 收到删除用户-部门关系请求, id={}", id); + log.warn("[UserDeptSync] 用户-部门关系同步服务需要分公司自定义实现,当前为占位符实现"); + // TODO: 分公司需要实现此方法 + } + + @Override + public void fullSync(DatabusUserDeptData data) { + log.info("[UserDeptSync] 收到全量同步用户-部门关系请求, userId={}, deptId={}", + data.getUserId(), data.getDeptId()); + log.warn("[UserDeptSync] 用户-部门关系同步服务需要分公司自定义实现,当前为占位符实现"); + // TODO: 分公司需要实现此方法,逻辑:存在则更新,不存在则插入 + } +} diff --git a/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/userpost/SystemUserPostFullHandler.java b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/userpost/SystemUserPostFullHandler.java new file mode 100644 index 00000000..486c1a8f --- /dev/null +++ b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/userpost/SystemUserPostFullHandler.java @@ -0,0 +1,70 @@ +package com.zt.plat.framework.databus.client.handler.userpost; + +import com.zt.plat.framework.databus.client.handler.BatchSyncEventHandler; +import com.zt.plat.module.databus.api.data.DatabusUserPostData; +import com.zt.plat.module.databus.api.message.DatabusBatchMessage; +import com.zt.plat.module.databus.enums.DatabusEventType; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.stereotype.Component; + +/** + * 用户-岗位关系全量同步事件处理器(批量处理) + *

+ * 使用条件: + * 1. zt.databus.sync.client.enabled=true + * 2. 存在 UserPostSyncService Bean + * + * @author ZT + */ +@Slf4j +@Component +@ConditionalOnProperty(prefix = "zt.databus.sync.client", name = "enabled", havingValue = "true") +@ConditionalOnBean(UserPostSyncService.class) +public class SystemUserPostFullHandler implements BatchSyncEventHandler { + + @Resource + private UserPostSyncService userPostSyncService; + + @Override + public DatabusEventType getSupportedEventType() { + return DatabusEventType.SYSTEM_USER_POST_FULL; + } + + @Override + public void onFullSyncStart(DatabusBatchMessage message) { + log.info("[UserPostSync] 开始用户-岗位关系全量同步, taskId={}, totalBatch={}", + message.getTaskId(), message.getTotalBatch()); + } + + @Override + public void handleBatch(DatabusBatchMessage message) { + log.info("[UserPostSync] 处理用户-岗位关系批次数据, taskId={}, batchNo={}/{}, size={}", + message.getTaskId(), message.getBatchNo(), message.getTotalBatch(), + message.getDataList().size()); + + // 逐条处理全量同步数据 + for (DatabusUserPostData data : message.getDataList()) { + try { + userPostSyncService.fullSync(data); + log.debug("[UserPostSync] 用户-岗位关系全量同步成功, userId={}, postId={}", + data.getUserId(), data.getPostId()); + } catch (Exception e) { + log.error("[UserPostSync] 用户-岗位关系全量同步失败, userId={}, postId={}", + data.getUserId(), data.getPostId(), e); + // 单条失败不影响其他数据,继续处理 + } + } + + log.info("[UserPostSync] 用户-岗位关系批次处理完成, taskId={}, batchNo={}/{}", + message.getTaskId(), message.getBatchNo(), message.getTotalBatch()); + } + + @Override + public void onFullSyncComplete(DatabusBatchMessage message) { + log.info("[UserPostSync] 用户-岗位关系全量同步完成, taskId={}, totalBatch={}", + message.getTaskId(), message.getTotalBatch()); + } +} diff --git a/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/userpost/SystemUserPostHandler.java b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/userpost/SystemUserPostHandler.java new file mode 100644 index 00000000..cfdcc8d8 --- /dev/null +++ b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/userpost/SystemUserPostHandler.java @@ -0,0 +1,87 @@ +package com.zt.plat.framework.databus.client.handler.userpost; + +import com.zt.plat.framework.databus.client.handler.SyncEventHandler; +import com.zt.plat.module.databus.api.data.DatabusUserPostData; +import com.zt.plat.module.databus.api.message.DatabusMessage; +import com.zt.plat.module.databus.enums.DatabusEventType; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.stereotype.Component; + +import java.util.Arrays; +import java.util.List; + +/** + * 用户-岗位关系增量同步事件处理器(合并版) + *

+ * 处理事件类型: + * - SYSTEM_USER_POST_CREATE:用户-岗位关系创建 + * - SYSTEM_USER_POST_UPDATE:用户-岗位关系更新 + * - SYSTEM_USER_POST_DELETE:用户-岗位关系删除 + *

+ * 使用条件: + * 1. zt.databus.sync.client.enabled=true + * 2. 存在 UserPostSyncService Bean + * + * @author ZT + */ +@Slf4j +@Component +@ConditionalOnProperty(prefix = "zt.databus.sync.client", name = "enabled", havingValue = "true") +@ConditionalOnBean(UserPostSyncService.class) +public class SystemUserPostHandler implements SyncEventHandler { + + @Resource + private UserPostSyncService userPostSyncService; + + @Override + public List getSupportedEventTypes() { + return Arrays.asList( + DatabusEventType.SYSTEM_USER_POST_CREATE, + DatabusEventType.SYSTEM_USER_POST_UPDATE, + DatabusEventType.SYSTEM_USER_POST_DELETE + ); + } + + @Override + public void handle(DatabusMessage message) { + DatabusUserPostData data = message.getData(); + DatabusEventType eventType = message.getEventType(); + + log.info("[UserPostSync] 收到用户-岗位关系{}事件, userId={}, postId={}", + eventType.getAction(), data != null ? data.getUserId() : null, + data != null ? data.getPostId() : null); + + try { + switch (eventType) { + case SYSTEM_USER_POST_CREATE: + userPostSyncService.create(data); + log.info("[UserPostSync] 用户-岗位关系创建成功, userId={}, postId={}", + data.getUserId(), data.getPostId()); + break; + + case SYSTEM_USER_POST_UPDATE: + userPostSyncService.update(data); + log.info("[UserPostSync] 用户-岗位关系更新成功, userId={}, postId={}", + data.getUserId(), data.getPostId()); + break; + + case SYSTEM_USER_POST_DELETE: + userPostSyncService.delete(data.getId()); + log.info("[UserPostSync] 用户-岗位关系删除成功, id={}", data.getId()); + break; + + default: + log.warn("[UserPostSync] 未知事件类型: {}", eventType); + } + } catch (Exception e) { + log.error("[UserPostSync] 用户-岗位关系{}失败, userId={}, postId={}", + eventType.getAction(), + data != null ? data.getUserId() : null, + data != null ? data.getPostId() : null, e); + throw e; // 抛出异常触发重试 + } + } +} diff --git a/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/userpost/UserPostSyncService.java b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/userpost/UserPostSyncService.java new file mode 100644 index 00000000..f6d6b780 --- /dev/null +++ b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/userpost/UserPostSyncService.java @@ -0,0 +1,44 @@ +package com.zt.plat.framework.databus.client.handler.userpost; + +import com.zt.plat.module.databus.api.data.DatabusUserPostData; + +/** + * 用户-岗位关系同步服务接口 + *

+ * 分公司需要实现此接口,完成数据的本地持久化 + * 或通过默认实现 {@link UserPostSyncServiceImpl} 使用 Feign 调用远程 API + * + * @author ZT + */ +public interface UserPostSyncService { + + /** + * 创建用户-岗位关系(增量同步) + * + * @param data 用户-岗位关系数据 + */ + void create(DatabusUserPostData data); + + /** + * 更新用户-岗位关系(增量同步) + * + * @param data 用户-岗位关系数据 + */ + void update(DatabusUserPostData data); + + /** + * 删除用户-岗位关系(增量同步) + * + * @param id 关系ID + */ + void delete(Long id); + + /** + * 全量同步单条数据 + *

+ * 逻辑:存在则更新,不存在则插入 + * + * @param data 用户-岗位关系数据 + */ + void fullSync(DatabusUserPostData data); +} diff --git a/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/userpost/UserPostSyncServiceImpl.java b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/userpost/UserPostSyncServiceImpl.java new file mode 100644 index 00000000..b41d6ac8 --- /dev/null +++ b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/handler/userpost/UserPostSyncServiceImpl.java @@ -0,0 +1,56 @@ +package com.zt.plat.framework.databus.client.handler.userpost; + +import com.zt.plat.module.databus.api.data.DatabusUserPostData; +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.stereotype.Service; + +/** + * 用户-岗位关系同步服务实现 + *

+ * 使用条件: + * 1. zt.databus.sync.client.enabled=true + *

+ * 注意:由于用户-岗位关系通常集成在用户管理中,此实现为占位符。 + * 分公司可以根据实际情况: + * 1. 自定义实现此接口,直接操作本地数据库 + * 2. 或者通过用户管理 API 间接处理关联关系 + * + * @author ZT + */ +@Slf4j +@Service +@ConditionalOnProperty(prefix = "zt.databus.sync.client", name = "enabled", havingValue = "true") +public class UserPostSyncServiceImpl implements UserPostSyncService { + + @Override + public void create(DatabusUserPostData data) { + log.info("[UserPostSync] 收到创建用户-岗位关系请求, userId={}, postId={}", + data.getUserId(), data.getPostId()); + log.warn("[UserPostSync] 用户-岗位关系同步服务需要分公司自定义实现,当前为占位符实现"); + // TODO: 分公司需要实现此方法,通过本地 API 或直接数据库操作完成同步 + } + + @Override + public void update(DatabusUserPostData data) { + log.info("[UserPostSync] 收到更新用户-岗位关系请求, userId={}, postId={}", + data.getUserId(), data.getPostId()); + log.warn("[UserPostSync] 用户-岗位关系同步服务需要分公司自定义实现,当前为占位符实现"); + // TODO: 分公司需要实现此方法 + } + + @Override + public void delete(Long id) { + log.info("[UserPostSync] 收到删除用户-岗位关系请求, id={}", id); + log.warn("[UserPostSync] 用户-岗位关系同步服务需要分公司自定义实现,当前为占位符实现"); + // TODO: 分公司需要实现此方法 + } + + @Override + public void fullSync(DatabusUserPostData data) { + log.info("[UserPostSync] 收到全量同步用户-岗位关系请求, userId={}, postId={}", + data.getUserId(), data.getPostId()); + log.warn("[UserPostSync] 用户-岗位关系同步服务需要分公司自定义实现,当前为占位符实现"); + // TODO: 分公司需要实现此方法,逻辑:存在则更新,不存在则插入 + } +} diff --git a/zt-module-databus/zt-module-databus-api/src/main/java/com/zt/plat/module/databus/api/data/DatabusUserDeptData.java b/zt-module-databus/zt-module-databus-api/src/main/java/com/zt/plat/module/databus/api/data/DatabusUserDeptData.java new file mode 100644 index 00000000..b5409176 --- /dev/null +++ b/zt-module-databus/zt-module-databus-api/src/main/java/com/zt/plat/module/databus/api/data/DatabusUserDeptData.java @@ -0,0 +1,48 @@ +package com.zt.plat.module.databus.api.data; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.io.Serializable; + +/** + * 用户与组织机构关联关系数据对象 + *

+ * 对应表:system_user_dept + * + * @author ZT + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class DatabusUserDeptData implements Serializable { + + /** + * 关联关系ID + */ + private Long id; + + /** + * 用户ID + */ + private Long userId; + + /** + * 部门ID + */ + private Long deptId; + + /** + * 租户ID + */ + private Long tenantId; + + /** + * 备注 + */ + private String remark; + +} diff --git a/zt-module-databus/zt-module-databus-api/src/main/java/com/zt/plat/module/databus/api/data/DatabusUserPostData.java b/zt-module-databus/zt-module-databus-api/src/main/java/com/zt/plat/module/databus/api/data/DatabusUserPostData.java new file mode 100644 index 00000000..b27ac361 --- /dev/null +++ b/zt-module-databus/zt-module-databus-api/src/main/java/com/zt/plat/module/databus/api/data/DatabusUserPostData.java @@ -0,0 +1,38 @@ +package com.zt.plat.module.databus.api.data; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.io.Serializable; + +/** + * 用户与岗位关联关系数据对象 + *

+ * 对应表:system_user_post + * + * @author ZT + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class DatabusUserPostData implements Serializable { + + /** + * 关联关系ID + */ + private Long id; + + /** + * 用户ID + */ + private Long userId; + + /** + * 岗位ID + */ + private Long postId; + +} diff --git a/zt-module-databus/zt-module-databus-api/src/main/java/com/zt/plat/module/databus/enums/DatabusEventType.java b/zt-module-databus/zt-module-databus-api/src/main/java/com/zt/plat/module/databus/enums/DatabusEventType.java index bb853bfe..80886ad3 100644 --- a/zt-module-databus/zt-module-databus-api/src/main/java/com/zt/plat/module/databus/enums/DatabusEventType.java +++ b/zt-module-databus/zt-module-databus-api/src/main/java/com/zt/plat/module/databus/enums/DatabusEventType.java @@ -41,49 +41,45 @@ public enum DatabusEventType { SYSTEM_USER_FULL("system", "user", "full", "用户全量同步"), /** - * 部门-创建 + * 组织机构-创建 + */ + SYSTEM_ORG_CREATE("system", "org", "create", "组织机构创建"), + + /** + * 组织机构-更新 + */ + SYSTEM_ORG_UPDATE("system", "org", "update", "组织机构更新"), + + /** + * 组织机构-删除 + */ + SYSTEM_ORG_DELETE("system", "org", "delete", "组织机构删除"), + + /** + * 组织机构-全量同步 + */ + SYSTEM_ORG_FULL("system", "org", "full", "组织机构全量同步"), + + /** + * 部门-创建(新命名,推荐使用) */ SYSTEM_DEPT_CREATE("system", "dept", "create", "部门创建"), /** - * 部门-更新 + * 部门-更新(新命名,推荐使用) */ SYSTEM_DEPT_UPDATE("system", "dept", "update", "部门更新"), /** - * 部门-删除 + * 部门-删除(新命名,推荐使用) */ SYSTEM_DEPT_DELETE("system", "dept", "delete", "部门删除"), /** - * 部门-全量同步 + * 部门-全量同步(新命名,推荐使用) */ SYSTEM_DEPT_FULL("system", "dept", "full", "部门全量同步"), - /** - * 组织机构-创建(兼容老代码,保留但不推荐使用) - */ - @Deprecated - SYSTEM_ORG_CREATE("system", "org", "create", "组织机构创建"), - - /** - * 组织机构-更新(兼容老代码,保留但不推荐使用) - */ - @Deprecated - SYSTEM_ORG_UPDATE("system", "org", "update", "组织机构更新"), - - /** - * 组织机构-删除(兼容老代码,保留但不推荐使用) - */ - @Deprecated - SYSTEM_ORG_DELETE("system", "org", "delete", "组织机构删除"), - - /** - * 组织机构-全量同步(兼容老代码,保留但不推荐使用) - */ - @Deprecated - SYSTEM_ORG_FULL("system", "org", "full", "组织机构全量同步"), - /** * 岗位-创建 */ @@ -104,6 +100,84 @@ public enum DatabusEventType { */ SYSTEM_POST_FULL("system", "post", "full", "岗位全量同步"), + /** + * 用户-部门关系-创建 + */ + SYSTEM_USER_DEPT_CREATE("system", "user-dept", "create", "用户-部门关系创建"), + + /** + * 用户-部门关系-更新 + */ + SYSTEM_USER_DEPT_UPDATE("system", "user-dept", "update", "用户-部门关系更新"), + + /** + * 用户-部门关系-删除 + */ + SYSTEM_USER_DEPT_DELETE("system", "user-dept", "delete", "用户-部门关系删除"), + + /** + * 用户-部门关系-全量同步 + */ + SYSTEM_USER_DEPT_FULL("system", "user-dept", "full", "用户-部门关系全量同步"), + + /** + * 用户-岗位关系-创建 + */ + SYSTEM_USER_POST_CREATE("system", "user-post", "create", "用户-岗位关系创建"), + + /** + * 用户-岗位关系-更新 + */ + SYSTEM_USER_POST_UPDATE("system", "user-post", "update", "用户-岗位关系更新"), + + /** + * 用户-岗位关系-删除 + */ + SYSTEM_USER_POST_DELETE("system", "user-post", "delete", "用户-岗位关系删除"), + + /** + * 用户-岗位关系-全量同步 + */ + SYSTEM_USER_POST_FULL("system", "user-post", "full", "用户-岗位关系全量同步"), + + /** + * @deprecated 已拆分为 SYSTEM_USER_DEPT_* 和 SYSTEM_USER_POST_*,保留用于向后兼容 + */ + @Deprecated + SYSTEM_USER_RELATION_CREATE("system", "user-relation", "create", "用户关联创建"), + + /** + * @deprecated 已拆分为 SYSTEM_USER_DEPT_* 和 SYSTEM_USER_POST_*,保留用于向后兼容 + */ + @Deprecated + SYSTEM_USER_RELATION_UPDATE("system", "user-relation", "update", "用户关联更新"), + + /** + * @deprecated 已拆分为 SYSTEM_USER_DEPT_* 和 SYSTEM_USER_POST_*,保留用于向后兼容 + */ + @Deprecated + SYSTEM_USER_RELATION_DELETE("system", "user-relation", "delete", "用户关联删除"), + + /** + * @deprecated 已拆分为 SYSTEM_USER_DEPT_* 和 SYSTEM_USER_POST_*,保留用于向后兼容 + */ + @Deprecated + SYSTEM_USER_RELATION_FULL("system", "user-relation", "full", "用户关联全量同步"), + + /** + * 基础数据-全量同步(组合编排事件) + *

+ * 按依赖顺序触发以下全量同步: + * 1. 组织机构(SYSTEM_DEPT_FULL) + * 2. 岗位(SYSTEM_POST_FULL) + * 3. 用户(SYSTEM_USER_FULL) + * 4. 用户-部门关系(SYSTEM_USER_DEPT_FULL) + * 5. 用户-岗位关系(SYSTEM_USER_POST_FULL) + *

+ * 注意:本事件只负责触发,实际数据同步由5个独立的全量同步任务完成 + */ + SYSTEM_BASE_DATA_FULL("system", "base-data", "full", "基础数据全量同步"), + /** * 角色-创建 */ @@ -167,7 +241,7 @@ public enum DatabusEventType { BASE_MATERIAL_FULL("base", "material", "full", "物料全量同步"), /** - * 供应��-创建 + * 供应商-创建 */ BASE_SUPPLIER_CREATE("base", "supplier", "create", "供应商创建"), @@ -238,13 +312,10 @@ public enum DatabusEventType { /** * 获取完整Topic名称(服务端转发用) - * 格式: {topicBase}-{clientCode}(简化版,所有事件共用一个 Topic) - * 示例: databus-sync-branch-001 - * - * 注意:不再为每个事件创建独立 Topic,而是通过消息体中的 eventType 字段路由 + * 格式: {topicBase}-{module}-{entity}-{action}-{clientCode} */ public String getTopic(String topicBase, String clientCode) { - return String.format("%s-%s", topicBase, clientCode); + return String.format("%s-%s-%s-%s-%s", topicBase, module, entity, action, clientCode); } /** @@ -273,31 +344,6 @@ public enum DatabusEventType { return null; } - /** - * 根据事件类型字符串获取枚举(支持大写下划线格式) - * 例如:SYSTEM_POST_FULL → DatabusEventType.SYSTEM_POST_FULL - * - * @param eventType 事件类型字符串(格式: MODULE_ENTITY_ACTION) - * @return 枚举值,未找到返回null - */ - public static DatabusEventType getByEventType(String eventType) { - if (eventType == null) { - return null; - } - // 先尝试直接匹配枚举名称 - try { - return DatabusEventType.valueOf(eventType.toUpperCase()); - } catch (IllegalArgumentException e) { - // 如果失败,尝试转换格式后匹配(如 system-post-full → SYSTEM_POST_FULL) - String normalized = eventType.toUpperCase().replace("-", "_"); - try { - return DatabusEventType.valueOf(normalized); - } catch (IllegalArgumentException ex) { - return null; - } - } - } - /** * 根据模块、实体、操作获取枚举 * @@ -317,6 +363,25 @@ public enum DatabusEventType { return null; } + /** + * 根据事件类型名称获取枚举 + *

+ * 支持大写下划线格式,如: SYSTEM_POST_FULL + * + * @param eventType 事件类型名称(枚举常量名) + * @return 枚举值,未找到返回null + */ + public static DatabusEventType getByEventType(String eventType) { + if (eventType == null || eventType.isEmpty()) { + return null; + } + try { + return DatabusEventType.valueOf(eventType); + } catch (IllegalArgumentException e) { + return null; + } + } + /** * 判断是否为全量同步事件 */ diff --git a/zt-module-databus/zt-module-databus-server/src/main/java/com/zt/plat/module/databus/framework/rpc/config/RpcConfiguration.java b/zt-module-databus/zt-module-databus-server/src/main/java/com/zt/plat/module/databus/framework/rpc/config/RpcConfiguration.java index 5d974e23..32e911e1 100644 --- a/zt-module-databus/zt-module-databus-server/src/main/java/com/zt/plat/module/databus/framework/rpc/config/RpcConfiguration.java +++ b/zt-module-databus/zt-module-databus-server/src/main/java/com/zt/plat/module/databus/framework/rpc/config/RpcConfiguration.java @@ -4,6 +4,8 @@ import com.zt.plat.framework.common.biz.system.oauth2.OAuth2TokenCommonApi; import com.zt.plat.module.databus.api.provider.DatabusDeptProviderApi; import com.zt.plat.module.databus.api.provider.DatabusPostProviderApi; import com.zt.plat.module.databus.api.provider.DatabusUserProviderApi; +import com.zt.plat.module.system.api.dept.DeptApi; +import com.zt.plat.module.system.api.dept.PostApi; import com.zt.plat.module.system.api.user.AdminUserApi; import org.springframework.cloud.openfeign.EnableFeignClients; import org.springframework.context.annotation.Configuration; @@ -18,7 +20,10 @@ import org.springframework.context.annotation.Configuration; // DataBus 数据提供者 Feign 客户端 DatabusDeptProviderApi.class, DatabusUserProviderApi.class, - DatabusPostProviderApi.class + DatabusPostProviderApi.class, + PostApi.class, + DeptApi.class, + AdminUserApi.class, }) public class RpcConfiguration { } diff --git a/zt-module-system/zt-module-system-server/src/main/java/com/zt/plat/module/system/mq/producer/databus/DatabusUserDeptChangeProducer.java b/zt-module-system/zt-module-system-server/src/main/java/com/zt/plat/module/system/mq/producer/databus/DatabusUserDeptChangeProducer.java new file mode 100644 index 00000000..d5f75058 --- /dev/null +++ b/zt-module-system/zt-module-system-server/src/main/java/com/zt/plat/module/system/mq/producer/databus/DatabusUserDeptChangeProducer.java @@ -0,0 +1,110 @@ +package com.zt.plat.module.system.mq.producer.databus; + +import cn.hutool.core.util.IdUtil; +import com.zt.plat.module.databus.api.data.DatabusUserDeptData; +import com.zt.plat.module.databus.api.message.DatabusMessage; +import com.zt.plat.module.databus.enums.DatabusEventType; +import com.zt.plat.module.system.dal.dataobject.userdept.UserDeptDO; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.core.RocketMQTemplate; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +/** + * 用户-部门关系变更消息 Producer + *

+ * 负责发送用户与部门的关联关系变更事件 + * + * @author ZT + */ +@Slf4j +@Component +public class DatabusUserDeptChangeProducer { + + @Resource + private RocketMQTemplate rocketMQTemplate; + + @Value("${zt.databus.change.topic-prefix:databus-change}") + private String topicPrefix; + + private static final String TOPIC_SUFFIX = "-system-user-dept"; + + /** + * 发送用户-部门关系创建消息 + * + * @param userDept 用户-部门关系对象 + */ + public void sendUserDeptCreatedMessage(UserDeptDO userDept) { + try { + DatabusUserDeptData data = buildUserDeptData(userDept); + sendMessage(DatabusEventType.SYSTEM_USER_DEPT_CREATE, data); + log.info("[DatabusUserDeptChange] 发送用户-部门关系创建消息, userId={}, deptId={}", + userDept.getUserId(), userDept.getDeptId()); + } catch (Exception e) { + log.error("[DatabusUserDeptChange] 发送用户-部门关系创建消息失败, userId={}, deptId={}", + userDept.getUserId(), userDept.getDeptId(), e); + } + } + + /** + * 发送用户-部门关系更新消息 + * + * @param userDept 用户-部门关系对象 + */ + public void sendUserDeptUpdatedMessage(UserDeptDO userDept) { + try { + DatabusUserDeptData data = buildUserDeptData(userDept); + sendMessage(DatabusEventType.SYSTEM_USER_DEPT_UPDATE, data); + log.info("[DatabusUserDeptChange] 发送用户-部门关系更新消息, userId={}, deptId={}", + userDept.getUserId(), userDept.getDeptId()); + } catch (Exception e) { + log.error("[DatabusUserDeptChange] 发送用户-部门关系更新消息失败, userId={}, deptId={}", + userDept.getUserId(), userDept.getDeptId(), e); + } + } + + /** + * 发送用户-部门关系删除消息 + * + * @param userDept 用户-部门关系对象(删除前查询的) + */ + public void sendUserDeptDeletedMessage(UserDeptDO userDept) { + try { + DatabusUserDeptData data = buildUserDeptData(userDept); + sendMessage(DatabusEventType.SYSTEM_USER_DEPT_DELETE, data); + log.info("[DatabusUserDeptChange] 发送用户-部门关系删除消息, userId={}, deptId={}", + userDept.getUserId(), userDept.getDeptId()); + } catch (Exception e) { + log.error("[DatabusUserDeptChange] 发送用户-部门关系删除消息失败, userId={}, deptId={}", + userDept.getUserId(), userDept.getDeptId(), e); + } + } + + /** + * 构建用户-部门关系数据 + */ + private DatabusUserDeptData buildUserDeptData(UserDeptDO userDept) { + return DatabusUserDeptData.builder() + .id(userDept.getId()) + .userId(userDept.getUserId()) + .deptId(userDept.getDeptId()) + .tenantId(userDept.getTenantId()) + .remark(userDept.getRemark()) + .build(); + } + + /** + * 发送消息到 MQ + */ + private void sendMessage(DatabusEventType eventType, DatabusUserDeptData data) { + DatabusMessage message = new DatabusMessage<>(); + message.setEventType(eventType); + message.setData(data); + message.setTimestamp(java.time.LocalDateTime.now()); + message.setMessageId(IdUtil.fastSimpleUUID()); + + String topic = topicPrefix + TOPIC_SUFFIX; + rocketMQTemplate.syncSend(topic, message); + } +} diff --git a/zt-module-system/zt-module-system-server/src/main/java/com/zt/plat/module/system/mq/producer/databus/DatabusUserPostChangeProducer.java b/zt-module-system/zt-module-system-server/src/main/java/com/zt/plat/module/system/mq/producer/databus/DatabusUserPostChangeProducer.java new file mode 100644 index 00000000..b6201f5d --- /dev/null +++ b/zt-module-system/zt-module-system-server/src/main/java/com/zt/plat/module/system/mq/producer/databus/DatabusUserPostChangeProducer.java @@ -0,0 +1,108 @@ +package com.zt.plat.module.system.mq.producer.databus; + +import cn.hutool.core.util.IdUtil; +import com.zt.plat.module.databus.api.data.DatabusUserPostData; +import com.zt.plat.module.databus.api.message.DatabusMessage; +import com.zt.plat.module.databus.enums.DatabusEventType; +import com.zt.plat.module.system.dal.dataobject.dept.UserPostDO; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.core.RocketMQTemplate; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +/** + * 用户-岗位关系变更消息 Producer + *

+ * 负责发送用户与岗位的关联关系变更事件 + * + * @author ZT + */ +@Slf4j +@Component +public class DatabusUserPostChangeProducer { + + @Resource + private RocketMQTemplate rocketMQTemplate; + + @Value("${zt.databus.change.topic-prefix:databus-change}") + private String topicPrefix; + + private static final String TOPIC_SUFFIX = "-system-user-post"; + + /** + * 发送用户-岗位关系创建消息 + * + * @param userPost 用户-岗位关系对象 + */ + public void sendUserPostCreatedMessage(UserPostDO userPost) { + try { + DatabusUserPostData data = buildUserPostData(userPost); + sendMessage(DatabusEventType.SYSTEM_USER_POST_CREATE, data); + log.info("[DatabusUserPostChange] 发送用户-岗位关系创建消息, userId={}, postId={}", + userPost.getUserId(), userPost.getPostId()); + } catch (Exception e) { + log.error("[DatabusUserPostChange] 发送用户-岗位关系创建消息失败, userId={}, postId={}", + userPost.getUserId(), userPost.getPostId(), e); + } + } + + /** + * 发送用户-岗位关系更新消息 + * + * @param userPost 用户-岗位关系对象 + */ + public void sendUserPostUpdatedMessage(UserPostDO userPost) { + try { + DatabusUserPostData data = buildUserPostData(userPost); + sendMessage(DatabusEventType.SYSTEM_USER_POST_UPDATE, data); + log.info("[DatabusUserPostChange] 发送用户-岗位关系更新消息, userId={}, postId={}", + userPost.getUserId(), userPost.getPostId()); + } catch (Exception e) { + log.error("[DatabusUserPostChange] 发送用户-岗位关系更新消息失败, userId={}, postId={}", + userPost.getUserId(), userPost.getPostId(), e); + } + } + + /** + * 发送用户-岗位关系删除消息 + * + * @param userPost 用户-岗位关系对象(删除前查询的) + */ + public void sendUserPostDeletedMessage(UserPostDO userPost) { + try { + DatabusUserPostData data = buildUserPostData(userPost); + sendMessage(DatabusEventType.SYSTEM_USER_POST_DELETE, data); + log.info("[DatabusUserPostChange] 发送用户-岗位关系删除消息, userId={}, postId={}", + userPost.getUserId(), userPost.getPostId()); + } catch (Exception e) { + log.error("[DatabusUserPostChange] 发送用户-岗位关系删除消息失败, userId={}, postId={}", + userPost.getUserId(), userPost.getPostId(), e); + } + } + + /** + * 构建用户-岗位关系数据 + */ + private DatabusUserPostData buildUserPostData(UserPostDO userPost) { + return DatabusUserPostData.builder() + .id(userPost.getId()) + .userId(userPost.getUserId()) + .postId(userPost.getPostId()) + .build(); + } + + /** + * 发送消息到 MQ + */ + private void sendMessage(DatabusEventType eventType, DatabusUserPostData data) { + DatabusMessage message = new DatabusMessage<>(); + message.setEventType(eventType); + message.setData(data); + message.setTimestamp(java.time.LocalDateTime.now()); + message.setMessageId(IdUtil.fastSimpleUUID()); + + String topic = topicPrefix + TOPIC_SUFFIX; + rocketMQTemplate.syncSend(topic, message); + } +} diff --git a/zt-module-system/zt-module-system-server/src/main/java/com/zt/plat/module/system/service/user/AdminUserServiceImpl.java b/zt-module-system/zt-module-system-server/src/main/java/com/zt/plat/module/system/service/user/AdminUserServiceImpl.java index 91bf4b7d..cbec4789 100644 --- a/zt-module-system/zt-module-system-server/src/main/java/com/zt/plat/module/system/service/user/AdminUserServiceImpl.java +++ b/zt-module-system/zt-module-system-server/src/main/java/com/zt/plat/module/system/service/user/AdminUserServiceImpl.java @@ -29,6 +29,7 @@ import com.zt.plat.module.system.dal.mysql.dept.UserPostMapper; import com.zt.plat.module.system.dal.mysql.user.AdminUserMapper; import com.zt.plat.module.system.enums.user.PasswordStrategyEnum; import com.zt.plat.module.system.enums.user.UserSourceEnum; +import com.zt.plat.module.system.mq.producer.databus.DatabusChangeProducer; import com.zt.plat.module.system.service.dept.DeptService; import com.zt.plat.module.system.service.dept.PostService; import com.zt.plat.module.system.service.permission.PermissionService; @@ -93,11 +94,10 @@ public class AdminUserServiceImpl implements AdminUserService { @Resource private ConfigApi configApi; - - @Resource - private com.zt.plat.module.system.mq.producer.databus.DatabusChangeProducer databusChangeProducer; @Resource private UserDeptService userDeptService; + @Resource + private DatabusChangeProducer databusChangeProducer; @Override @GlobalTransactional(rollbackFor = Exception.class) @@ -128,7 +128,6 @@ public class AdminUserServiceImpl implements AdminUserService { if (user.getUserSource() == null) { user.setUserSource(UserSourceEnum.EXTERNAL.getSource()); } - user.setWorkcode(normalizeWorkcode(createReqVO.getWorkcode())); PasswordStrategyEnum passwordStrategy = determinePasswordStrategy(user.getUserSource()); user.setAvatar(normalizeAvatarValue(createReqVO.getAvatar())); user.setPassword(encodePassword(createReqVO.getPassword(), passwordStrategy)); @@ -144,10 +143,10 @@ public class AdminUserServiceImpl implements AdminUserService { postId -> new UserPostDO().setUserId(user.getId()).setPostId(postId))); } - // 2.4 发布用户创建事件 + // 3. 发送用户创建消息到MQ(供Databus消费转发) databusChangeProducer.sendUserCreatedMessage(user); - // 3. 记录操作日志上下文 + // 4. 记录操作日志上下文 LogRecordContext.putVariable("user", user); return user.getId(); } @@ -238,13 +237,13 @@ public class AdminUserServiceImpl implements AdminUserService { // 2.3 更新岗位 updateUserPost(updateReqVO, updateObj); - // 2.4 发布用户更新事件(重新查询获取完整数据) + // 3. 发送用户更新消息到MQ(供Databus消费转发) AdminUserDO updatedUser = userMapper.selectById(updateObj.getId()); if (updatedUser != null) { databusChangeProducer.sendUserUpdatedMessage(updatedUser); } - // 3. 记录操作日志上下文 + // 4. 记录操作日志上下文 LogRecordContext.putVariable(DiffParseFunction.OLD_OBJECT, BeanUtils.toBean(oldUser, UserSaveReqVO.class)); LogRecordContext.putVariable("user", oldUser); } @@ -333,10 +332,10 @@ public class AdminUserServiceImpl implements AdminUserService { // 2.2 删除用户岗位 userPostMapper.deleteByUserId(id); - // 2.3 发布用户删除事件 + // 3. 发送用户删除消息到MQ(供Databus消费转发) databusChangeProducer.sendUserDeletedMessage(id, user.getTenantId()); - // 3. 记录操作日志上下文 + // 4. 记录操作日志上下文 LogRecordContext.putVariable("user", user); } diff --git a/zt-module-system/zt-module-system-server/src/main/java/com/zt/plat/module/system/service/userdept/UserDeptServiceImpl.java b/zt-module-system/zt-module-system-server/src/main/java/com/zt/plat/module/system/service/userdept/UserDeptServiceImpl.java index f7e4943f..3b699c91 100644 --- a/zt-module-system/zt-module-system-server/src/main/java/com/zt/plat/module/system/service/userdept/UserDeptServiceImpl.java +++ b/zt-module-system/zt-module-system-server/src/main/java/com/zt/plat/module/system/service/userdept/UserDeptServiceImpl.java @@ -6,6 +6,7 @@ import com.zt.plat.framework.security.core.LoginUser; import com.zt.plat.module.system.dal.dataobject.userdept.UserDeptDO; import com.zt.plat.module.system.dal.mysql.userdept.UserDeptMapper; import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import org.springframework.validation.annotation.Validated; @@ -23,18 +24,29 @@ import static com.zt.plat.module.system.enums.ErrorCodeConstants.USER_DEPT_NOT_E * * @author 管理员 */ +@Slf4j @Service @Validated public class UserDeptServiceImpl implements UserDeptService { @Resource private UserDeptMapper userDeptMapper; + @Resource + private com.zt.plat.module.system.mq.producer.databus.DatabusUserDeptChangeProducer databusUserDeptChangeProducer; @Override public Long createUserDept(UserDeptDO createReqVO) { // 插入 UserDeptDO userDept = BeanUtils.toBean(createReqVO, UserDeptDO.class); userDeptMapper.insert(userDept); + + // 发送用户-部门关系创建消息 + try { + databusUserDeptChangeProducer.sendUserDeptCreatedMessage(userDept); + } catch (Exception e) { + log.error("[createUserDept] 发送用户-部门关系创建消息失败", e); + } + // 返回 return userDept.getId(); } @@ -46,14 +58,34 @@ public class UserDeptServiceImpl implements UserDeptService { // 更新 UserDeptDO updateObj = BeanUtils.toBean(updateReqVO, UserDeptDO.class); userDeptMapper.updateById(updateObj); + + // 发送用户-部门关系更新消息 + try { + databusUserDeptChangeProducer.sendUserDeptUpdatedMessage(updateObj); + } catch (Exception e) { + log.error("[updateUserDept] 发送用户-部门关系更新消息失败", e); + } } @Override public void deleteUserDept(Long id) { // 校验存在 validateUserDeptExists(id); + + // 查询完整对象(删除前查询,删除后无法查询) + UserDeptDO userDept = userDeptMapper.selectById(id); + // 删除 userDeptMapper.deleteById(id); + + // 发送用户-部门关系删除消息 + if (userDept != null) { + try { + databusUserDeptChangeProducer.sendUserDeptDeletedMessage(userDept); + } catch (Exception e) { + log.error("[deleteUserDept] 发送用户-部门关系删除消息失败", e); + } + } } @Override @@ -111,6 +143,16 @@ public class UserDeptServiceImpl implements UserDeptService { Long tenantId = Optional.ofNullable(getLoginUser()).orElse(new LoginUser()).getTenantId(); list.forEach(item -> item.setTenantId(tenantId)); userDeptMapper.insertBatch(list); + + // 发送用户-部门关系创建消息(为每个关系发送) + for (UserDeptDO userDept : list) { + try { + databusUserDeptChangeProducer.sendUserDeptCreatedMessage(userDept); + } catch (Exception e) { + log.error("[batchCreateUserDept] 发送用户-部门关系创建消息失败, userId={}, deptId={}", + userDept.getUserId(), userDept.getDeptId(), e); + } + } } } \ No newline at end of file From 72fe903447b447f53a88ff375169173fa0c524f7 Mon Sep 17 00:00:00 2001 From: hewencai <2357300448@qq.com> Date: Tue, 16 Dec 2025 14:34:13 +0800 Subject: [PATCH 3/6] =?UTF-8?q?update=EF=BC=9A=E6=95=B0=E6=8D=AE=E5=90=8C?= =?UTF-8?q?=E6=AD=A5=E5=88=86=E5=8F=91=E6=9C=BA=E6=9E=84=E5=B2=97=E4=BD=8D?= =?UTF-8?q?=E7=BB=91=E5=AE=9A=E5=85=B3=E7=B3=BB?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../core/sync/DatabusFullSyncService.java | 2 + .../impl/DatabusFullSyncServiceImpl.java | 200 ++++++++++++++++++ 2 files changed, 202 insertions(+) diff --git a/zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/core/sync/DatabusFullSyncService.java b/zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/core/sync/DatabusFullSyncService.java index 62d00b16..ee18651c 100644 --- a/zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/core/sync/DatabusFullSyncService.java +++ b/zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/core/sync/DatabusFullSyncService.java @@ -2,6 +2,8 @@ package com.zt.plat.framework.databus.server.core.sync; import com.zt.plat.framework.databus.server.dal.dataobject.DatabusSyncFullTaskDO; +import java.util.List; + /** * Databus 全量同步服务接口 * diff --git a/zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/service/impl/DatabusFullSyncServiceImpl.java b/zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/service/impl/DatabusFullSyncServiceImpl.java index 3d13c7f8..d2f277fb 100644 --- a/zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/service/impl/DatabusFullSyncServiceImpl.java +++ b/zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/service/impl/DatabusFullSyncServiceImpl.java @@ -113,6 +113,13 @@ public class DatabusFullSyncServiceImpl implements DatabusFullSyncService { DatabusSyncClientDO client = clientMapper.selectById(subscription.getClientId()); DatabusSyncEventDO event = eventMapper.selectById(subscription.getEventId()); + // 检查是否为基础数据全量同步事件(组合事件) + if ("SYSTEM_BASE_DATA_FULL".equals(event.getEventType())) { + log.info("[Databus] 检测到基础数据全量同步事件,开始按依赖顺序执行子任务, taskId={}", taskId); + executeBaseDataFullSync(task, client); + return; + } + String providerType = event.getDataProviderMethod(); if (providerType == null) { throw new RuntimeException("Event data provider type not configured"); @@ -325,4 +332,197 @@ public class DatabusFullSyncServiceImpl implements DatabusFullSyncService { return fullTaskMapper.selectByTaskNo(taskNo); } + /** + * 执行基础数据全量同步(组合事件) + *

+ * 按依赖顺序同步执行5个子任务: + * 1. 组织机构(SYSTEM_DEPT_FULL) + * 2. 岗位(SYSTEM_POST_FULL) + * 3. 用户(SYSTEM_USER_FULL) + * 4. 用户-部门关系(SYSTEM_USER_DEPT_FULL) + * 5. 用户-岗位关系(SYSTEM_USER_POST_FULL) + * + * @param parentTask 父任务(SYSTEM_BASE_DATA_FULL) + * @param client 客户端配置 + */ + private void executeBaseDataFullSync(DatabusSyncFullTaskDO parentTask, DatabusSyncClientDO client) { + // 按依赖顺序定义事件类型 + String[] eventTypes = { + "SYSTEM_DEPT_FULL", // 1. 组织机构 + "SYSTEM_POST_FULL", // 2. 岗位 + "SYSTEM_USER_FULL", // 3. 用户 + "SYSTEM_USER_DEPT_FULL", // 4. 用户-部门关系 + "SYSTEM_USER_POST_FULL" // 5. 用户-岗位关系 + }; + + int successTaskCount = 0; // 成功的任务数 + int failTaskCount = 0; // 失败的任务数 + StringBuilder errorMessages = new StringBuilder(); + + // 用于聚合所有子任务的进度 + long totalDataCount = 0L; // 总数据量 + long processedDataCount = 0L; // 已处理数据量 + long successDataCount = 0L; // 成功数据量 + long failDataCount = 0L; // 失败数据量 + int totalBatchCount = 0; // 总批次数 + + List subTaskIds = new ArrayList<>(); + + try { + // 顺序执行每个子任务 + for (int i = 0; i < eventTypes.length; i++) { + String eventType = eventTypes[i]; + try { + log.info("[Databus] 开始执行子任务 {}/{}: eventType={}, parentTaskId={}", + i + 1, eventTypes.length, eventType, parentTask.getId()); + + // 查询事件定义 + DatabusSyncEventDO event = eventMapper.selectByEventType(eventType); + if (event == null) { + log.warn("[Databus] 事件定义不存在,跳过: eventType={}", eventType); + failTaskCount++; + errorMessages.append(String.format("[%s: 事件定义不存在] ", eventType)); + continue; + } + if (event.getEnabled() != 1) { + log.warn("[Databus] 事件未启用,跳过: eventType={}", eventType); + failTaskCount++; + errorMessages.append(String.format("[%s: 事件未启用] ", eventType)); + continue; + } + + // 查询订阅关系 + DatabusSyncSubscriptionDO subscription = subscriptionMapper.selectByClientIdAndEventId( + client.getId(), event.getId()); + if (subscription == null) { + log.warn("[Databus] 订阅关系不存在,跳过: clientCode={}, eventType={}", + client.getClientCode(), eventType); + failTaskCount++; + errorMessages.append(String.format("[%s: 订阅关系不存在] ", eventType)); + continue; + } + if (subscription.getEnabled() != 1) { + log.warn("[Databus] 订阅关系未启用,跳过: clientCode={}, eventType={}", + client.getClientCode(), eventType); + failTaskCount++; + errorMessages.append(String.format("[%s: 订阅关系未启用] ", eventType)); + continue; + } + + // 创建子任务(同步执行,非异步) + String taskRemark = String.format("[基础数据全量同步-子任务 %d/%d] %s", + i + 1, eventTypes.length, event.getEventName()); + Long subTaskId = createFullSyncTask(subscription.getId(), taskRemark); + subTaskIds.add(subTaskId); + + // 同步执行子任务(注意:这里不用异步,确保按顺序执行) + DatabusSyncFullTaskDO subTask = fullTaskMapper.selectById(subTaskId); + executeSubTaskSync(subTask, subscription, client, event); + + // 重新查询子任务获取最新统计数据 + subTask = fullTaskMapper.selectById(subTaskId); + + // 聚合子任务进度到父任务 + totalDataCount += (subTask.getTotalCount() != null ? subTask.getTotalCount() : 0L); + processedDataCount += (subTask.getProcessedCount() != null ? subTask.getProcessedCount() : 0L); + successDataCount += (subTask.getSuccessCount() != null ? subTask.getSuccessCount() : 0L); + failDataCount += (subTask.getFailCount() != null ? subTask.getFailCount() : 0L); + totalBatchCount += (subTask.getTotalBatch() != null ? subTask.getTotalBatch() : 0); + + if (FullTaskStatusEnum.isCompleted(subTask.getStatus())) { + successTaskCount++; + } else { + failTaskCount++; + } + + // 实时更新父任务进度(每完成一个子任务就更新一次) + parentTask.setTotalCount(totalDataCount); + parentTask.setProcessedCount(processedDataCount); + parentTask.setSuccessCount(successDataCount); + parentTask.setFailCount(failDataCount); + parentTask.setTotalBatch(totalBatchCount); + parentTask.setCurrentBatch(i + 1); // 当前完成的子任务数 + fullTaskMapper.updateById(parentTask); + + log.info("[Databus] 子任务执行完成 {}/{}: eventType={}, subTaskId={}, status={}, " + + "数据统计[total={}, success={}, fail={}]", + i + 1, eventTypes.length, eventType, subTaskId, subTask.getStatus(), + subTask.getTotalCount(), subTask.getSuccessCount(), subTask.getFailCount()); + + } catch (Exception e) { + log.error("[Databus] 子任务执行失败 {}/{}: eventType={}", i + 1, eventTypes.length, eventType, e); + failTaskCount++; + errorMessages.append(String.format("[%s: %s] ", eventType, e.getMessage())); + // 继续执行下一个任务 + } + } + + // 最终更新父任务状态 + parentTask.setStatus(failTaskCount == 0 ? FullTaskStatusEnum.COMPLETED.getStatus() + : FullTaskStatusEnum.FAILED.getStatus()); + parentTask.setEndTime(LocalDateTime.now()); + parentTask.setTotalCount(totalDataCount); + parentTask.setProcessedCount(processedDataCount); + parentTask.setSuccessCount(successDataCount); + parentTask.setFailCount(failDataCount); + parentTask.setTotalBatch(totalBatchCount); + parentTask.setCurrentBatch(eventTypes.length); + if (errorMessages.length() > 0) { + parentTask.setLastErrorMessage(errorMessages.toString()); + } + fullTaskMapper.updateById(parentTask); + + log.info("[Databus] 基础数据全量同步完成, parentTaskId={}, " + + "任务统计[成功={}/{}, 失败={}], " + + "数据统计[总量={}, 已处理={}, 成功={}, 失败={}]", + parentTask.getId(), + successTaskCount, eventTypes.length, failTaskCount, + totalDataCount, processedDataCount, successDataCount, failDataCount); + + } catch (Exception e) { + log.error("[Databus] 基础数据全量同步异常, parentTaskId={}", parentTask.getId(), e); + parentTask.setStatus(FullTaskStatusEnum.FAILED.getStatus()); + parentTask.setEndTime(LocalDateTime.now()); + parentTask.setLastErrorMessage(e.getMessage()); + fullTaskMapper.updateById(parentTask); + } + } + + /** + * 同步执行子任务(非异步) + */ + private void executeSubTaskSync(DatabusSyncFullTaskDO task, + DatabusSyncSubscriptionDO subscription, + DatabusSyncClientDO client, + DatabusSyncEventDO event) { + task.setStatus(FullTaskStatusEnum.RUNNING.getStatus()); + task.setStartTime(LocalDateTime.now()); + fullTaskMapper.updateById(task); + + try { + String providerType = event.getDataProviderMethod(); + if (providerType == null) { + throw new RuntimeException("Event data provider type not configured"); + } + + DataProvider dataProvider = dataProviderRegistry.getProvider(providerType); + if (dataProvider == null) { + throw new RuntimeException("Data provider not found: " + providerType); + } + + executeGenericFullSync(task, subscription, client, event, dataProvider); + + task.setStatus(FullTaskStatusEnum.COMPLETED.getStatus()); + task.setEndTime(LocalDateTime.now()); + fullTaskMapper.updateById(task); + + } catch (Exception e) { + task.setStatus(FullTaskStatusEnum.FAILED.getStatus()); + task.setEndTime(LocalDateTime.now()); + task.setLastErrorMessage(e.getMessage()); + fullTaskMapper.updateById(task); + throw e; + } + } + } From 13403ea027e91605ce97463b230df5355c8c6834 Mon Sep 17 00:00:00 2001 From: hewencai <2357300448@qq.com> Date: Tue, 16 Dec 2025 18:43:08 +0800 Subject: [PATCH 4/6] =?UTF-8?q?fix=EF=BC=9A=E4=BF=AE=E5=A4=8D=E6=95=B0?= =?UTF-8?q?=E6=8D=AE=E5=90=8C=E6=AD=A5=E9=85=8D=E7=BD=AE=E6=A0=A1=E9=AA=8C?= =?UTF-8?q?=E7=BC=BA=E5=A4=B1=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../DatabusUserDeptChangeProducer.java | 19 +++++++++++++++++++ .../DatabusUserPostChangeProducer.java | 19 +++++++++++++++++++ 2 files changed, 38 insertions(+) diff --git a/zt-module-system/zt-module-system-server/src/main/java/com/zt/plat/module/system/mq/producer/databus/DatabusUserDeptChangeProducer.java b/zt-module-system/zt-module-system-server/src/main/java/com/zt/plat/module/system/mq/producer/databus/DatabusUserDeptChangeProducer.java index d5f75058..a1f5007a 100644 --- a/zt-module-system/zt-module-system-server/src/main/java/com/zt/plat/module/system/mq/producer/databus/DatabusUserDeptChangeProducer.java +++ b/zt-module-system/zt-module-system-server/src/main/java/com/zt/plat/module/system/mq/producer/databus/DatabusUserDeptChangeProducer.java @@ -15,6 +15,9 @@ import org.springframework.stereotype.Component; * 用户-部门关系变更消息 Producer *

* 负责发送用户与部门的关联关系变更事件 + *

+ * 注意:客户端系统(分公司)应该禁用此功能,避免形成消息循环 + * 配置项:zt.databus.change.producer.enabled=false * * @author ZT */ @@ -25,6 +28,16 @@ public class DatabusUserDeptChangeProducer { @Resource private RocketMQTemplate rocketMQTemplate; + /** + * 是否启用变更消息发送 + *

+ * 默认值:false(安全优先,避免未配置时导致消息循环) + * 集团侧(数据源):必须显式设置为 true,发送变更消息 + * 分公司侧(客户端):保持 false 或不配置,禁用变更消息,避免循环 + */ + @Value("${zt.databus.change.producer.enabled:false}") + private boolean enabled; + @Value("${zt.databus.change.topic-prefix:databus-change}") private String topicPrefix; @@ -98,6 +111,12 @@ public class DatabusUserDeptChangeProducer { * 发送消息到 MQ */ private void sendMessage(DatabusEventType eventType, DatabusUserDeptData data) { + if (!enabled) { + log.debug("[Databus] 变更消息发送已禁用, 跳过用户-部门关系变更消息, eventType={}, userId={}, deptId={}", + eventType, data.getUserId(), data.getDeptId()); + return; + } + DatabusMessage message = new DatabusMessage<>(); message.setEventType(eventType); message.setData(data); diff --git a/zt-module-system/zt-module-system-server/src/main/java/com/zt/plat/module/system/mq/producer/databus/DatabusUserPostChangeProducer.java b/zt-module-system/zt-module-system-server/src/main/java/com/zt/plat/module/system/mq/producer/databus/DatabusUserPostChangeProducer.java index b6201f5d..e275fe62 100644 --- a/zt-module-system/zt-module-system-server/src/main/java/com/zt/plat/module/system/mq/producer/databus/DatabusUserPostChangeProducer.java +++ b/zt-module-system/zt-module-system-server/src/main/java/com/zt/plat/module/system/mq/producer/databus/DatabusUserPostChangeProducer.java @@ -15,6 +15,9 @@ import org.springframework.stereotype.Component; * 用户-岗位关系变更消息 Producer *

* 负责发送用户与岗位的关联关系变更事件 + *

+ * 注意:客户端系统(分公司)应该禁用此功能,避免形成消息循环 + * 配置项:zt.databus.change.producer.enabled=false * * @author ZT */ @@ -25,6 +28,16 @@ public class DatabusUserPostChangeProducer { @Resource private RocketMQTemplate rocketMQTemplate; + /** + * 是否启用变更消息发送 + *

+ * 默认值:false(安全优先,避免未配置时导致消息循环) + * 集团侧(数据源):必须显式设置为 true,发送变更消息 + * 分公司侧(客户端):保持 false 或不配置,禁用变更消息,避免循环 + */ + @Value("${zt.databus.change.producer.enabled:false}") + private boolean enabled; + @Value("${zt.databus.change.topic-prefix:databus-change}") private String topicPrefix; @@ -96,6 +109,12 @@ public class DatabusUserPostChangeProducer { * 发送消息到 MQ */ private void sendMessage(DatabusEventType eventType, DatabusUserPostData data) { + if (!enabled) { + log.debug("[Databus] 变更消息发送已禁用, 跳过用户-岗位关系变更消息, eventType={}, userId={}, postId={}", + eventType, data.getUserId(), data.getPostId()); + return; + } + DatabusMessage message = new DatabusMessage<>(); message.setEventType(eventType); message.setData(data); From 7e25d6d1060083896a7efcdca7136b5941e36962 Mon Sep 17 00:00:00 2001 From: hewencai <2357300448@qq.com> Date: Tue, 16 Dec 2025 18:43:08 +0800 Subject: [PATCH 5/6] =?UTF-8?q?fix=EF=BC=9A=E4=BF=AE=E5=A4=8D=E6=95=B0?= =?UTF-8?q?=E6=8D=AE=E5=90=8C=E6=AD=A5=E9=85=8D=E7=BD=AE=E6=A0=A1=E9=AA=8C?= =?UTF-8?q?=E7=BC=BA=E5=A4=B1=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit (cherry picked from commit 13403ea027e91605ce97463b230df5355c8c6834) --- .../DatabusUserDeptChangeProducer.java | 19 +++++++++++++++++++ .../DatabusUserPostChangeProducer.java | 19 +++++++++++++++++++ 2 files changed, 38 insertions(+) diff --git a/zt-module-system/zt-module-system-server/src/main/java/com/zt/plat/module/system/mq/producer/databus/DatabusUserDeptChangeProducer.java b/zt-module-system/zt-module-system-server/src/main/java/com/zt/plat/module/system/mq/producer/databus/DatabusUserDeptChangeProducer.java index d5f75058..a1f5007a 100644 --- a/zt-module-system/zt-module-system-server/src/main/java/com/zt/plat/module/system/mq/producer/databus/DatabusUserDeptChangeProducer.java +++ b/zt-module-system/zt-module-system-server/src/main/java/com/zt/plat/module/system/mq/producer/databus/DatabusUserDeptChangeProducer.java @@ -15,6 +15,9 @@ import org.springframework.stereotype.Component; * 用户-部门关系变更消息 Producer *

* 负责发送用户与部门的关联关系变更事件 + *

+ * 注意:客户端系统(分公司)应该禁用此功能,避免形成消息循环 + * 配置项:zt.databus.change.producer.enabled=false * * @author ZT */ @@ -25,6 +28,16 @@ public class DatabusUserDeptChangeProducer { @Resource private RocketMQTemplate rocketMQTemplate; + /** + * 是否启用变更消息发送 + *

+ * 默认值:false(安全优先,避免未配置时导致消息循环) + * 集团侧(数据源):必须显式设置为 true,发送变更消息 + * 分公司侧(客户端):保持 false 或不配置,禁用变更消息,避免循环 + */ + @Value("${zt.databus.change.producer.enabled:false}") + private boolean enabled; + @Value("${zt.databus.change.topic-prefix:databus-change}") private String topicPrefix; @@ -98,6 +111,12 @@ public class DatabusUserDeptChangeProducer { * 发送消息到 MQ */ private void sendMessage(DatabusEventType eventType, DatabusUserDeptData data) { + if (!enabled) { + log.debug("[Databus] 变更消息发送已禁用, 跳过用户-部门关系变更消息, eventType={}, userId={}, deptId={}", + eventType, data.getUserId(), data.getDeptId()); + return; + } + DatabusMessage message = new DatabusMessage<>(); message.setEventType(eventType); message.setData(data); diff --git a/zt-module-system/zt-module-system-server/src/main/java/com/zt/plat/module/system/mq/producer/databus/DatabusUserPostChangeProducer.java b/zt-module-system/zt-module-system-server/src/main/java/com/zt/plat/module/system/mq/producer/databus/DatabusUserPostChangeProducer.java index b6201f5d..e275fe62 100644 --- a/zt-module-system/zt-module-system-server/src/main/java/com/zt/plat/module/system/mq/producer/databus/DatabusUserPostChangeProducer.java +++ b/zt-module-system/zt-module-system-server/src/main/java/com/zt/plat/module/system/mq/producer/databus/DatabusUserPostChangeProducer.java @@ -15,6 +15,9 @@ import org.springframework.stereotype.Component; * 用户-岗位关系变更消息 Producer *

* 负责发送用户与岗位的关联关系变更事件 + *

+ * 注意:客户端系统(分公司)应该禁用此功能,避免形成消息循环 + * 配置项:zt.databus.change.producer.enabled=false * * @author ZT */ @@ -25,6 +28,16 @@ public class DatabusUserPostChangeProducer { @Resource private RocketMQTemplate rocketMQTemplate; + /** + * 是否启用变更消息发送 + *

+ * 默认值:false(安全优先,避免未配置时导致消息循环) + * 集团侧(数据源):必须显式设置为 true,发送变更消息 + * 分公司侧(客户端):保持 false 或不配置,禁用变更消息,避免循环 + */ + @Value("${zt.databus.change.producer.enabled:false}") + private boolean enabled; + @Value("${zt.databus.change.topic-prefix:databus-change}") private String topicPrefix; @@ -96,6 +109,12 @@ public class DatabusUserPostChangeProducer { * 发送消息到 MQ */ private void sendMessage(DatabusEventType eventType, DatabusUserPostData data) { + if (!enabled) { + log.debug("[Databus] 变更消息发送已禁用, 跳过用户-岗位关系变更消息, eventType={}, userId={}, postId={}", + eventType, data.getUserId(), data.getPostId()); + return; + } + DatabusMessage message = new DatabusMessage<>(); message.setEventType(eventType); message.setData(data); From 12157d5dcbbda99b86b8c0f6cfd7ca194e4eea44 Mon Sep 17 00:00:00 2001 From: chenbowen Date: Tue, 16 Dec 2025 21:34:58 +0800 Subject: [PATCH 6/6] =?UTF-8?q?1.=20=E4=BF=AE=E5=A4=8D=20databus=20?= =?UTF-8?q?=E5=9C=A8=E5=A4=9A=E5=B1=82=E5=B5=8C=E5=A5=97=E7=9A=84=20json?= =?UTF-8?q?=20=E6=8A=A5=E6=96=87=EF=BC=8C=E7=AD=BE=E5=90=8D=E5=AD=98?= =?UTF-8?q?=E5=9C=A8=E5=BC=82=E5=B8=B8=E7=9A=84=20bug?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../common/util/security/CryptoSignatureUtils.java | 7 ++++++- .../gateway/security/GatewaySecurityFilter.java | 3 +-- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/zt-framework/zt-common/src/main/java/com/zt/plat/framework/common/util/security/CryptoSignatureUtils.java b/zt-framework/zt-common/src/main/java/com/zt/plat/framework/common/util/security/CryptoSignatureUtils.java index 6fb4808c..b0d9be83 100644 --- a/zt-framework/zt-common/src/main/java/com/zt/plat/framework/common/util/security/CryptoSignatureUtils.java +++ b/zt-framework/zt-common/src/main/java/com/zt/plat/framework/common/util/security/CryptoSignatureUtils.java @@ -1,6 +1,7 @@ package com.zt.plat.framework.common.util.security; import cn.hutool.crypto.SecureUtil; +import com.zt.plat.framework.common.util.json.JsonUtils; import javax.crypto.Cipher; import javax.crypto.KeyGenerator; @@ -126,7 +127,11 @@ public final class CryptoSignatureUtils { continue; } sb.append(key).append('='); - sb.append(value); + if (value instanceof String || value instanceof Number || value instanceof Boolean) { + sb.append(value); + } else { + sb.append(JsonUtils.toJsonString(value)); + } sb.append('&'); } if (sb.length() > 0) { diff --git a/zt-module-databus/zt-module-databus-server/src/main/java/com/zt/plat/module/databus/framework/integration/gateway/security/GatewaySecurityFilter.java b/zt-module-databus/zt-module-databus-server/src/main/java/com/zt/plat/module/databus/framework/integration/gateway/security/GatewaySecurityFilter.java index 6692f40d..48c2a627 100644 --- a/zt-module-databus/zt-module-databus-server/src/main/java/com/zt/plat/module/databus/framework/integration/gateway/security/GatewaySecurityFilter.java +++ b/zt-module-databus/zt-module-databus-server/src/main/java/com/zt/plat/module/databus/framework/integration/gateway/security/GatewaySecurityFilter.java @@ -286,8 +286,7 @@ public class GatewaySecurityFilter extends OncePerRequestFilter { try { boolean valid = CryptoSignatureUtils.verifySignature(signaturePayload, signatureType); if (!valid) { - log.error("[API-PORTAL] 签名校验失败"); - return; + throw new SecurityValidationException(HttpStatus.UNAUTHORIZED, "签名校验失败"); } } catch (IllegalArgumentException ex) { throw new SecurityValidationException(HttpStatus.INTERNAL_SERVER_ERROR, "签名算法配置异常");