diff --git a/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/core/consumer/DatabusClientConsumer.java b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/core/consumer/DatabusClientConsumer.java new file mode 100644 index 00000000..087dbf63 --- /dev/null +++ b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/core/consumer/DatabusClientConsumer.java @@ -0,0 +1,157 @@ +package com.zt.plat.framework.databus.client.core.consumer; + +import com.alibaba.fastjson2.JSON; +import com.zt.plat.framework.databus.client.core.registry.HandlerRegistry; +import com.zt.plat.framework.databus.client.handler.BatchSyncEventHandler; +import com.zt.plat.framework.databus.client.handler.SyncEventHandler; +import com.zt.plat.module.databus.api.message.DatabusBatchMessage; +import com.zt.plat.module.databus.api.message.DatabusMessage; +import com.zt.plat.module.databus.enums.DatabusEventType; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; +import org.apache.rocketmq.spring.core.RocketMQListener; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.stereotype.Component; + +/** + * DataBus 客户端统一消费者 + *

+ * 架构设计: + * 1. 监听客户端专属 Topic: databus-sync-{clientCode} + * 2. 消息体包含 eventType 字段,用于路由到具体 Handler + * 3. 根据 eventType 从 HandlerRegistry 获取对应 Handler + * 4. 区分增量消息和批量消息,调用不同的 Handler + *

+ * Topic 简化前后对比: + * - 旧格式:databus-sync-system-dept-create-branch-001(每个事件一个 Topic) + * - 新格式:databus-sync-branch-001(所有事件共用一个 Topic) + * + * @author ZT + */ +@Slf4j +@Component +@ConditionalOnProperty(prefix = "zt.databus.sync.client", name = "enabled", havingValue = "true") +@RocketMQMessageListener( + topic = "${zt.databus.sync.client.mq.topic:databus-sync}-${zt.databus.sync.client.client-code}", + consumerGroup = "${zt.databus.sync.client.mq.consumer-group:databus-client-consumer}-${zt.databus.sync.client.client-code}" +) +public class DatabusClientConsumer implements RocketMQListener { + + @Resource + private HandlerRegistry handlerRegistry; + + @Override + public void onMessage(String body) { + log.debug("[DatabusClient] 收到消息, body={}", body); + + try { + // 1. 解析消息获取 eventType + DatabusEventType eventType = parseEventType(body); + if (eventType == null) { + log.error("[DatabusClient] 无法解析 eventType, body={}", body); + return; + } + + log.info("[DatabusClient] 收到消息, eventType={}", eventType); + + // 2. 根据 eventType 判断消息类型并分发 + if (isBatchMessage(body)) { + // 批量消息(全量同步) + handleBatchMessage(body, eventType); + } else { + // 增量消息 + handleIncrementalMessage(body, eventType); + } + + } catch (Exception e) { + log.error("[DatabusClient] 消息处理失败, body={}", body, e); + throw e; // 抛出异常触发重试 + } + } + + /** + * 处理批量消息(全量同步) + */ + @SuppressWarnings("unchecked") + private void handleBatchMessage(String body, DatabusEventType eventType) { + // 1. 获取 BatchHandler + BatchSyncEventHandler handler = handlerRegistry.getBatchHandler(eventType); + if (handler == null) { + log.warn("[DatabusClient] 未找到全量Handler, eventType={}", eventType); + return; + } + + // 2. 解析批量消息 + DatabusBatchMessage message = JSON.parseObject(body, DatabusBatchMessage.class); + + // 3. 全量同步开始回调(第一批) + if (message.getBatchNo() == 1) { + handler.onFullSyncStart(message); + } + + // 4. 处理批次数据 + handler.handleBatch(message); + + // 5. 全量同步完成回调(最后一批) + if (message.getBatchNo().equals(message.getTotalBatch())) { + handler.onFullSyncComplete(message); + } + + log.info("[DatabusClient] 批量消息处理完成, eventType={}, taskId={}, batchNo={}/{}", + eventType, message.getTaskId(), message.getBatchNo(), message.getTotalBatch()); + } + + /** + * 处理增量消息 + */ + @SuppressWarnings("unchecked") + private void handleIncrementalMessage(String body, DatabusEventType eventType) { + // 1. 获取 SyncEventHandler + SyncEventHandler handler = handlerRegistry.getIncrementalHandler(eventType); + if (handler == null) { + log.warn("[DatabusClient] 未找到增量Handler, eventType={}", eventType); + return; + } + + // 2. 解析增量消息 + DatabusMessage message = JSON.parseObject(body, DatabusMessage.class); + + // 3. 处理消息 + handler.handle(message); + + log.info("[DatabusClient] 增量消息处理完成, eventType={}, messageId={}", + eventType, message.getMessageId()); + } + + /** + * 解析 eventType + */ + private DatabusEventType parseEventType(String body) { + try { + // 先尝试从 JSON 中提取 eventType 字段 + String eventTypeStr = JSON.parseObject(body).getString("eventType"); + if (eventTypeStr != null) { + return DatabusEventType.valueOf(eventTypeStr); + } + } catch (Exception e) { + log.error("[DatabusClient] 解析 eventType 失败", e); + } + return null; + } + + /** + * 判断是否为批量消息 + */ + private boolean isBatchMessage(String body) { + try { + // 批量消息包含 taskId, batchNo, totalBatch 字段 + var json = JSON.parseObject(body); + return json.containsKey("taskId") + && json.containsKey("batchNo") + && json.containsKey("totalBatch"); + } catch (Exception e) { + return false; + } + } +} diff --git a/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/core/registry/HandlerRegistry.java b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/core/registry/HandlerRegistry.java new file mode 100644 index 00000000..a1baa90d --- /dev/null +++ b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/core/registry/HandlerRegistry.java @@ -0,0 +1,136 @@ +package com.zt.plat.framework.databus.client.core.registry; + +import com.zt.plat.framework.databus.client.handler.BatchSyncEventHandler; +import com.zt.plat.framework.databus.client.handler.SyncEventHandler; +import com.zt.plat.module.databus.enums.DatabusEventType; +import jakarta.annotation.PostConstruct; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.stereotype.Component; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Handler 策略注册表 + *

+ * 负责管理所有 Handler 实例,根�� eventType 路由到对应的 Handler + *

+ * 架构设计: + * - 增量同步:使用 SyncEventHandler 处理单条消息 + * - 全量同步:使用 BatchSyncEventHandler 处理批量消息 + * + * @author ZT + */ +@Slf4j +@Component +@ConditionalOnProperty(prefix = "zt.databus.sync.client", name = "enabled", havingValue = "true") +public class HandlerRegistry { + + /** + * 增量同步 Handler 映射表 + *

+ * Key: DatabusEventType(事件类型) + * Value: SyncEventHandler(增量同步处理器) + */ + private final Map> incrementalHandlers = new HashMap<>(); + + /** + * 全量同步 Handler 映射表 + *

+ * Key: DatabusEventType(事件类型) + * Value: BatchSyncEventHandler(批量同步处理器) + */ + private final Map> batchHandlers = new HashMap<>(); + + /** + * 自动注入所有 SyncEventHandler Bean + */ + @Autowired(required = false) + private List> syncEventHandlers; + + /** + * 自动注入所有 BatchSyncEventHandler Bean + */ + @Autowired(required = false) + private List> batchSyncEventHandlers; + + /** + * 初始化注册表 + *

+ * 在 Bean 创建后自动调用,注册所有 Handler + */ + @PostConstruct + public void init() { + log.info("[HandlerRegistry] 开始初始化 Handler 注册表..."); + + // 注册增量同步 Handler + if (syncEventHandlers != null) { + for (SyncEventHandler handler : syncEventHandlers) { + DatabusEventType eventType = handler.getSupportedEventType(); + incrementalHandlers.put(eventType, handler); + log.info("[HandlerRegistry] 注册增量Handler: {} -> {}", + eventType, handler.getClass().getSimpleName()); + } + } + + // 注册全量同步 Handler + if (batchSyncEventHandlers != null) { + for (BatchSyncEventHandler handler : batchSyncEventHandlers) { + DatabusEventType eventType = handler.getSupportedEventType(); + batchHandlers.put(eventType, handler); + log.info("[HandlerRegistry] 注册全量Handler: {} -> {}", + eventType, handler.getClass().getSimpleName()); + } + } + + log.info("[HandlerRegistry] 初始化完成, 增量Handler={}个, 全量Handler={}个", + incrementalHandlers.size(), batchHandlers.size()); + } + + /** + * 获取增量同步 Handler + * + * @param eventType 事件类型 + * @param 数据类型 + * @return 对应的 Handler,不存在则返回 null + */ + @SuppressWarnings("unchecked") + public SyncEventHandler getIncrementalHandler(DatabusEventType eventType) { + return (SyncEventHandler) incrementalHandlers.get(eventType); + } + + /** + * 获取全量同步 Handler + * + * @param eventType 事件类型 + * @param 数据类型 + * @return 对应的 Handler,不存在则返回 null + */ + @SuppressWarnings("unchecked") + public BatchSyncEventHandler getBatchHandler(DatabusEventType eventType) { + return (BatchSyncEventHandler) batchHandlers.get(eventType); + } + + /** + * 检查是否存在增量同步 Handler + * + * @param eventType 事件类型 + * @return 是否存在 + */ + public boolean hasIncrementalHandler(DatabusEventType eventType) { + return incrementalHandlers.containsKey(eventType); + } + + /** + * 检查是否存在全量同步 Handler + * + * @param eventType 事件类型 + * @return 是否存在 + */ + public boolean hasBatchHandler(DatabusEventType eventType) { + return batchHandlers.containsKey(eventType); + } +}