From dfca38feb721ffd7dcc424322bb815eb109171d4 Mon Sep 17 00:00:00 2001 From: hewencai <2357300448@qq.com> Date: Tue, 2 Dec 2025 01:18:46 +0800 Subject: [PATCH] =?UTF-8?q?feat(databus-client):=20=E5=AE=8C=E6=88=90?= =?UTF-8?q?=E6=A0=B8=E5=BF=83=E7=BB=84=E4=BB=B6=E5=8F=8A=E7=BC=96=E8=AF=91?= =?UTF-8?q?=E9=AA=8C=E8=AF=81=EF=BC=88=E4=BB=BB=E5=8A=A1=2089-92=EF=BC=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 新增核心组件(任务 89-90): 1. HandlerRegistry.java - Handler 策略注册表 - 自动注册所有 SyncEventHandler 和 BatchSyncEventHandler - 根据 eventType 路由到对应 Handler - 提供 getIncrementalHandler/getBatchHandler 方法 - 支持 hasIncrementalHandler/hasBatchHandler 检查 2. DatabusClientConsumer.java - 统一消费者 - 监听 databus-sync-{clientCode} Topic(简化版) - 根据消息字段判断增量/批量消息 - 调用 HandlerRegistry 路由到具体 Handler - 支持全量同步生命周期回调(onFullSyncStart/onFullSyncComplete) 已存在接口(任务 91-92): 1. SyncEventHandler.java - 增量同步 Handler 接口 2. BatchSyncEventHandler.java - 全量同步 Handler 接口 架构设计: - 策略模式:通过 HandlerRegistry 动态路由 - Topic 简化:databus-sync-{clientCode}(所有事件共用) - 消息路由:通过 eventType 字段区分事件类型 - 条件装配:@ConditionalOnProperty 灵活启用/禁用 编译结果:✅ BUILD SUCCESS(30个源文件) Ref: docs/databus/implementation-checklist.md 任务 89-92 --- .../core/consumer/DatabusClientConsumer.java | 157 ++++++++++++++++++ .../client/core/registry/HandlerRegistry.java | 136 +++++++++++++++ 2 files changed, 293 insertions(+) create mode 100644 zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/core/consumer/DatabusClientConsumer.java create mode 100644 zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/core/registry/HandlerRegistry.java diff --git a/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/core/consumer/DatabusClientConsumer.java b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/core/consumer/DatabusClientConsumer.java new file mode 100644 index 00000000..087dbf63 --- /dev/null +++ b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/core/consumer/DatabusClientConsumer.java @@ -0,0 +1,157 @@ +package com.zt.plat.framework.databus.client.core.consumer; + +import com.alibaba.fastjson2.JSON; +import com.zt.plat.framework.databus.client.core.registry.HandlerRegistry; +import com.zt.plat.framework.databus.client.handler.BatchSyncEventHandler; +import com.zt.plat.framework.databus.client.handler.SyncEventHandler; +import com.zt.plat.module.databus.api.message.DatabusBatchMessage; +import com.zt.plat.module.databus.api.message.DatabusMessage; +import com.zt.plat.module.databus.enums.DatabusEventType; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; +import org.apache.rocketmq.spring.core.RocketMQListener; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.stereotype.Component; + +/** + * DataBus 客户端统一消费者 + *
+ * 架构设计: + * 1. 监听客户端专属 Topic: databus-sync-{clientCode} + * 2. 消息体包含 eventType 字段,用于路由到具体 Handler + * 3. 根据 eventType 从 HandlerRegistry 获取对应 Handler + * 4. 区分增量消息和批量消息,调用不同的 Handler + *
+ * Topic 简化前后对比:
+ * - 旧格式:databus-sync-system-dept-create-branch-001(每个事件一个 Topic)
+ * - 新格式:databus-sync-branch-001(所有事件共用一个 Topic)
+ *
+ * @author ZT
+ */
+@Slf4j
+@Component
+@ConditionalOnProperty(prefix = "zt.databus.sync.client", name = "enabled", havingValue = "true")
+@RocketMQMessageListener(
+ topic = "${zt.databus.sync.client.mq.topic:databus-sync}-${zt.databus.sync.client.client-code}",
+ consumerGroup = "${zt.databus.sync.client.mq.consumer-group:databus-client-consumer}-${zt.databus.sync.client.client-code}"
+)
+public class DatabusClientConsumer implements RocketMQListener
+ * 负责管理所有 Handler 实例,根�� eventType 路由到对应的 Handler
+ *
+ * 架构设计:
+ * - 增量同步:使用 SyncEventHandler 处理单条消息
+ * - 全量同步:使用 BatchSyncEventHandler 处理批量消息
+ *
+ * @author ZT
+ */
+@Slf4j
+@Component
+@ConditionalOnProperty(prefix = "zt.databus.sync.client", name = "enabled", havingValue = "true")
+public class HandlerRegistry {
+
+ /**
+ * 增量同步 Handler 映射表
+ *
+ * Key: DatabusEventType(事件类型)
+ * Value: SyncEventHandler(增量同步处理器)
+ */
+ private final Map
+ * Key: DatabusEventType(事件类型)
+ * Value: BatchSyncEventHandler(批量同步处理器)
+ */
+ private final Map
+ * 在 Bean 创建后自动调用,注册所有 Handler
+ */
+ @PostConstruct
+ public void init() {
+ log.info("[HandlerRegistry] 开始初始化 Handler 注册表...");
+
+ // 注册增量同步 Handler
+ if (syncEventHandlers != null) {
+ for (SyncEventHandler> handler : syncEventHandlers) {
+ DatabusEventType eventType = handler.getSupportedEventType();
+ incrementalHandlers.put(eventType, handler);
+ log.info("[HandlerRegistry] 注册增量Handler: {} -> {}",
+ eventType, handler.getClass().getSimpleName());
+ }
+ }
+
+ // 注册全量同步 Handler
+ if (batchSyncEventHandlers != null) {
+ for (BatchSyncEventHandler> handler : batchSyncEventHandlers) {
+ DatabusEventType eventType = handler.getSupportedEventType();
+ batchHandlers.put(eventType, handler);
+ log.info("[HandlerRegistry] 注册全量Handler: {} -> {}",
+ eventType, handler.getClass().getSimpleName());
+ }
+ }
+
+ log.info("[HandlerRegistry] 初始化完成, 增量Handler={}个, 全量Handler={}个",
+ incrementalHandlers.size(), batchHandlers.size());
+ }
+
+ /**
+ * 获取增量同步 Handler
+ *
+ * @param eventType 事件类型
+ * @param