feat(databus-client): 完成核心组件及编译验证(任务 89-92)

新增核心组件(任务 89-90):
1. HandlerRegistry.java - Handler 策略注册表
   - 自动注册所有 SyncEventHandler 和 BatchSyncEventHandler
   - 根据 eventType 路由到对应 Handler
   - 提供 getIncrementalHandler/getBatchHandler 方法
   - 支持 hasIncrementalHandler/hasBatchHandler 检查

2. DatabusClientConsumer.java - 统一消费者
   - 监听 databus-sync-{clientCode} Topic(简化版)
   - 根据消息字段判断增量/批量消息
   - 调用 HandlerRegistry 路由到具体 Handler
   - 支持全量同步生命周期回调(onFullSyncStart/onFullSyncComplete)

已存在接口(任务 91-92):
1. SyncEventHandler.java - 增量同步 Handler 接口
2. BatchSyncEventHandler.java - 全量同步 Handler 接口

架构设计:
- 策略模式:通过 HandlerRegistry 动态路由
- Topic 简化:databus-sync-{clientCode}(所有事件共用)
- 消息路由:通过 eventType 字段区分事件类型
- 条件装配:@ConditionalOnProperty 灵活启用/禁用

编译结果: BUILD SUCCESS(30个源文件)

Ref: docs/databus/implementation-checklist.md 任务 89-92
This commit is contained in:
hewencai
2025-12-02 01:18:46 +08:00
parent 8329f9c834
commit adf3ec601a
2 changed files with 293 additions and 0 deletions

View File

@@ -0,0 +1,157 @@
package com.zt.plat.framework.databus.client.core.consumer;
import com.alibaba.fastjson2.JSON;
import com.zt.plat.framework.databus.client.core.registry.HandlerRegistry;
import com.zt.plat.framework.databus.client.handler.BatchSyncEventHandler;
import com.zt.plat.framework.databus.client.handler.SyncEventHandler;
import com.zt.plat.module.databus.api.message.DatabusBatchMessage;
import com.zt.plat.module.databus.api.message.DatabusMessage;
import com.zt.plat.module.databus.enums.DatabusEventType;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
/**
* DataBus 客户端统一消费者
* <p>
* 架构设计:
* 1. 监听客户端专属 Topic: databus-sync-{clientCode}
* 2. 消息体包含 eventType 字段,用于路由到具体 Handler
* 3. 根据 eventType 从 HandlerRegistry 获取对应 Handler
* 4. 区分增量消息和批量消息,调用不同的 Handler
* <p>
* Topic 简化前后对比:
* - 旧格式databus-sync-system-dept-create-branch-001每个事件一个 Topic
* - 新格式databus-sync-branch-001所有事件共用一个 Topic
*
* @author ZT
*/
@Slf4j
@Component
@ConditionalOnProperty(prefix = "zt.databus.sync.client", name = "enabled", havingValue = "true")
@RocketMQMessageListener(
topic = "${zt.databus.sync.client.mq.topic:databus-sync}-${zt.databus.sync.client.client-code}",
consumerGroup = "${zt.databus.sync.client.mq.consumer-group:databus-client-consumer}-${zt.databus.sync.client.client-code}"
)
public class DatabusClientConsumer implements RocketMQListener<String> {
@Resource
private HandlerRegistry handlerRegistry;
@Override
public void onMessage(String body) {
log.debug("[DatabusClient] 收到消息, body={}", body);
try {
// 1. 解析消息获取 eventType
DatabusEventType eventType = parseEventType(body);
if (eventType == null) {
log.error("[DatabusClient] 无法解析 eventType, body={}", body);
return;
}
log.info("[DatabusClient] 收到消息, eventType={}", eventType);
// 2. 根据 eventType 判断消息类型并分发
if (isBatchMessage(body)) {
// 批量消息(全量同步)
handleBatchMessage(body, eventType);
} else {
// 增量消息
handleIncrementalMessage(body, eventType);
}
} catch (Exception e) {
log.error("[DatabusClient] 消息处理失败, body={}", body, e);
throw e; // 抛出异常触发重试
}
}
/**
* 处理批量消息(全量同步)
*/
@SuppressWarnings("unchecked")
private void handleBatchMessage(String body, DatabusEventType eventType) {
// 1. 获取 BatchHandler
BatchSyncEventHandler handler = handlerRegistry.getBatchHandler(eventType);
if (handler == null) {
log.warn("[DatabusClient] 未找到全量Handler, eventType={}", eventType);
return;
}
// 2. 解析批量消息
DatabusBatchMessage<?> message = JSON.parseObject(body, DatabusBatchMessage.class);
// 3. 全量同步开始回调(第一批)
if (message.getBatchNo() == 1) {
handler.onFullSyncStart(message);
}
// 4. 处理批次数据
handler.handleBatch(message);
// 5. 全量同步完成回调(最后一批)
if (message.getBatchNo().equals(message.getTotalBatch())) {
handler.onFullSyncComplete(message);
}
log.info("[DatabusClient] 批量消息处理完成, eventType={}, taskId={}, batchNo={}/{}",
eventType, message.getTaskId(), message.getBatchNo(), message.getTotalBatch());
}
/**
* 处理增量消息
*/
@SuppressWarnings("unchecked")
private void handleIncrementalMessage(String body, DatabusEventType eventType) {
// 1. 获取 SyncEventHandler
SyncEventHandler handler = handlerRegistry.getIncrementalHandler(eventType);
if (handler == null) {
log.warn("[DatabusClient] 未找到增量Handler, eventType={}", eventType);
return;
}
// 2. 解析增量消息
DatabusMessage<?> message = JSON.parseObject(body, DatabusMessage.class);
// 3. 处理消息
handler.handle(message);
log.info("[DatabusClient] 增量消息处理完成, eventType={}, messageId={}",
eventType, message.getMessageId());
}
/**
* 解析 eventType
*/
private DatabusEventType parseEventType(String body) {
try {
// 先尝试从 JSON 中提取 eventType 字段
String eventTypeStr = JSON.parseObject(body).getString("eventType");
if (eventTypeStr != null) {
return DatabusEventType.valueOf(eventTypeStr);
}
} catch (Exception e) {
log.error("[DatabusClient] 解析 eventType 失败", e);
}
return null;
}
/**
* 判断是否为批量消息
*/
private boolean isBatchMessage(String body) {
try {
// 批量消息包含 taskId, batchNo, totalBatch 字段
var json = JSON.parseObject(body);
return json.containsKey("taskId")
&& json.containsKey("batchNo")
&& json.containsKey("totalBatch");
} catch (Exception e) {
return false;
}
}
}

View File

@@ -0,0 +1,136 @@
package com.zt.plat.framework.databus.client.core.registry;
import com.zt.plat.framework.databus.client.handler.BatchSyncEventHandler;
import com.zt.plat.framework.databus.client.handler.SyncEventHandler;
import com.zt.plat.module.databus.enums.DatabusEventType;
import jakarta.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* Handler 策略注册表
* <p>
* 负责管理所有 Handler 实例<EFBC8C><E6A0B9> eventType 路由到对应的 Handler
* <p>
* 架构设计:
* - 增量同步:使用 SyncEventHandler 处理单条消息
* - 全量同步:使用 BatchSyncEventHandler 处理批量消息
*
* @author ZT
*/
@Slf4j
@Component
@ConditionalOnProperty(prefix = "zt.databus.sync.client", name = "enabled", havingValue = "true")
public class HandlerRegistry {
/**
* 增量同步 Handler 映射表
* <p>
* Key: DatabusEventType事件类型
* Value: SyncEventHandler增量同步处理器
*/
private final Map<DatabusEventType, SyncEventHandler<?>> incrementalHandlers = new HashMap<>();
/**
* 全量同步 Handler 映射表
* <p>
* Key: DatabusEventType事件类型
* Value: BatchSyncEventHandler批量同步处理器
*/
private final Map<DatabusEventType, BatchSyncEventHandler<?>> batchHandlers = new HashMap<>();
/**
* 自动注入所有 SyncEventHandler Bean
*/
@Autowired(required = false)
private List<SyncEventHandler<?>> syncEventHandlers;
/**
* 自动注入所有 BatchSyncEventHandler Bean
*/
@Autowired(required = false)
private List<BatchSyncEventHandler<?>> batchSyncEventHandlers;
/**
* 初始化注册表
* <p>
* 在 Bean 创建后自动调用,注册所有 Handler
*/
@PostConstruct
public void init() {
log.info("[HandlerRegistry] 开始初始化 Handler 注册表...");
// 注册增量同步 Handler
if (syncEventHandlers != null) {
for (SyncEventHandler<?> handler : syncEventHandlers) {
DatabusEventType eventType = handler.getSupportedEventType();
incrementalHandlers.put(eventType, handler);
log.info("[HandlerRegistry] 注册增量Handler: {} -> {}",
eventType, handler.getClass().getSimpleName());
}
}
// 注册全量同步 Handler
if (batchSyncEventHandlers != null) {
for (BatchSyncEventHandler<?> handler : batchSyncEventHandlers) {
DatabusEventType eventType = handler.getSupportedEventType();
batchHandlers.put(eventType, handler);
log.info("[HandlerRegistry] 注册全量Handler: {} -> {}",
eventType, handler.getClass().getSimpleName());
}
}
log.info("[HandlerRegistry] 初始化完成, 增量Handler={}个, 全量Handler={}个",
incrementalHandlers.size(), batchHandlers.size());
}
/**
* 获取增量同步 Handler
*
* @param eventType 事件类型
* @param <T> 数据类型
* @return 对应的 Handler不存在则返回 null
*/
@SuppressWarnings("unchecked")
public <T> SyncEventHandler<T> getIncrementalHandler(DatabusEventType eventType) {
return (SyncEventHandler<T>) incrementalHandlers.get(eventType);
}
/**
* 获取全量同步 Handler
*
* @param eventType 事件类型
* @param <T> 数据类型
* @return 对应的 Handler不存在则返回 null
*/
@SuppressWarnings("unchecked")
public <T> BatchSyncEventHandler<T> getBatchHandler(DatabusEventType eventType) {
return (BatchSyncEventHandler<T>) batchHandlers.get(eventType);
}
/**
* 检查是否存在增量同步 Handler
*
* @param eventType 事件类型
* @return 是否存在
*/
public boolean hasIncrementalHandler(DatabusEventType eventType) {
return incrementalHandlers.containsKey(eventType);
}
/**
* 检查是否存在全量同步 Handler
*
* @param eventType 事件类型
* @return 是否存在
*/
public boolean hasBatchHandler(DatabusEventType eventType) {
return batchHandlers.containsKey(eventType);
}
}