feat(databus-client): 完成核心组件及编译验证(任务 89-92)
新增核心组件(任务 89-90):
1. HandlerRegistry.java - Handler 策略注册表
- 自动注册所有 SyncEventHandler 和 BatchSyncEventHandler
- 根据 eventType 路由到对应 Handler
- 提供 getIncrementalHandler/getBatchHandler 方法
- 支持 hasIncrementalHandler/hasBatchHandler 检查
2. DatabusClientConsumer.java - 统一消费者
- 监听 databus-sync-{clientCode} Topic(简化版)
- 根据消息字段判断增量/批量消息
- 调用 HandlerRegistry 路由到具体 Handler
- 支持全量同步生命周期回调(onFullSyncStart/onFullSyncComplete)
已存在接口(任务 91-92):
1. SyncEventHandler.java - 增量同步 Handler 接口
2. BatchSyncEventHandler.java - 全量同步 Handler 接口
架构设计:
- 策略模式:通过 HandlerRegistry 动态路由
- Topic 简化:databus-sync-{clientCode}(所有事件共用)
- 消息路由:通过 eventType 字段区分事件类型
- 条件装配:@ConditionalOnProperty 灵活启用/禁用
编译结果:✅ BUILD SUCCESS(30个源文件)
Ref: docs/databus/implementation-checklist.md 任务 89-92
This commit is contained in:
@@ -0,0 +1,157 @@
|
||||
package com.zt.plat.framework.databus.client.core.consumer;
|
||||
|
||||
import com.alibaba.fastjson2.JSON;
|
||||
import com.zt.plat.framework.databus.client.core.registry.HandlerRegistry;
|
||||
import com.zt.plat.framework.databus.client.handler.BatchSyncEventHandler;
|
||||
import com.zt.plat.framework.databus.client.handler.SyncEventHandler;
|
||||
import com.zt.plat.module.databus.api.message.DatabusBatchMessage;
|
||||
import com.zt.plat.module.databus.api.message.DatabusMessage;
|
||||
import com.zt.plat.module.databus.enums.DatabusEventType;
|
||||
import jakarta.annotation.Resource;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
|
||||
import org.apache.rocketmq.spring.core.RocketMQListener;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* DataBus 客户端统一消费者
|
||||
* <p>
|
||||
* 架构设计:
|
||||
* 1. 监听客户端专属 Topic: databus-sync-{clientCode}
|
||||
* 2. 消息体包含 eventType 字段,用于路由到具体 Handler
|
||||
* 3. 根据 eventType 从 HandlerRegistry 获取对应 Handler
|
||||
* 4. 区分增量消息和批量消息,调用不同的 Handler
|
||||
* <p>
|
||||
* Topic 简化前后对比:
|
||||
* - 旧格式:databus-sync-system-dept-create-branch-001(每个事件一个 Topic)
|
||||
* - 新格式:databus-sync-branch-001(所有事件共用一个 Topic)
|
||||
*
|
||||
* @author ZT
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
@ConditionalOnProperty(prefix = "zt.databus.sync.client", name = "enabled", havingValue = "true")
|
||||
@RocketMQMessageListener(
|
||||
topic = "${zt.databus.sync.client.mq.topic:databus-sync}-${zt.databus.sync.client.client-code}",
|
||||
consumerGroup = "${zt.databus.sync.client.mq.consumer-group:databus-client-consumer}-${zt.databus.sync.client.client-code}"
|
||||
)
|
||||
public class DatabusClientConsumer implements RocketMQListener<String> {
|
||||
|
||||
@Resource
|
||||
private HandlerRegistry handlerRegistry;
|
||||
|
||||
@Override
|
||||
public void onMessage(String body) {
|
||||
log.debug("[DatabusClient] 收到消息, body={}", body);
|
||||
|
||||
try {
|
||||
// 1. 解析消息获取 eventType
|
||||
DatabusEventType eventType = parseEventType(body);
|
||||
if (eventType == null) {
|
||||
log.error("[DatabusClient] 无法解析 eventType, body={}", body);
|
||||
return;
|
||||
}
|
||||
|
||||
log.info("[DatabusClient] 收到消息, eventType={}", eventType);
|
||||
|
||||
// 2. 根据 eventType 判断消息类型并分发
|
||||
if (isBatchMessage(body)) {
|
||||
// 批量消息(全量同步)
|
||||
handleBatchMessage(body, eventType);
|
||||
} else {
|
||||
// 增量消息
|
||||
handleIncrementalMessage(body, eventType);
|
||||
}
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("[DatabusClient] 消息处理失败, body={}", body, e);
|
||||
throw e; // 抛出异常触发重试
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理批量消息(全量同步)
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
private void handleBatchMessage(String body, DatabusEventType eventType) {
|
||||
// 1. 获取 BatchHandler
|
||||
BatchSyncEventHandler handler = handlerRegistry.getBatchHandler(eventType);
|
||||
if (handler == null) {
|
||||
log.warn("[DatabusClient] 未找到全量Handler, eventType={}", eventType);
|
||||
return;
|
||||
}
|
||||
|
||||
// 2. 解析批量消息
|
||||
DatabusBatchMessage<?> message = JSON.parseObject(body, DatabusBatchMessage.class);
|
||||
|
||||
// 3. 全量同步开始回调(第一批)
|
||||
if (message.getBatchNo() == 1) {
|
||||
handler.onFullSyncStart(message);
|
||||
}
|
||||
|
||||
// 4. 处理批次数据
|
||||
handler.handleBatch(message);
|
||||
|
||||
// 5. 全量同步完成回调(最后一批)
|
||||
if (message.getBatchNo().equals(message.getTotalBatch())) {
|
||||
handler.onFullSyncComplete(message);
|
||||
}
|
||||
|
||||
log.info("[DatabusClient] 批量消息处理完成, eventType={}, taskId={}, batchNo={}/{}",
|
||||
eventType, message.getTaskId(), message.getBatchNo(), message.getTotalBatch());
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理增量消息
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
private void handleIncrementalMessage(String body, DatabusEventType eventType) {
|
||||
// 1. 获取 SyncEventHandler
|
||||
SyncEventHandler handler = handlerRegistry.getIncrementalHandler(eventType);
|
||||
if (handler == null) {
|
||||
log.warn("[DatabusClient] 未找到增量Handler, eventType={}", eventType);
|
||||
return;
|
||||
}
|
||||
|
||||
// 2. 解析增量消息
|
||||
DatabusMessage<?> message = JSON.parseObject(body, DatabusMessage.class);
|
||||
|
||||
// 3. 处理消息
|
||||
handler.handle(message);
|
||||
|
||||
log.info("[DatabusClient] 增量消息处理完成, eventType={}, messageId={}",
|
||||
eventType, message.getMessageId());
|
||||
}
|
||||
|
||||
/**
|
||||
* 解析 eventType
|
||||
*/
|
||||
private DatabusEventType parseEventType(String body) {
|
||||
try {
|
||||
// 先尝试从 JSON 中提取 eventType 字段
|
||||
String eventTypeStr = JSON.parseObject(body).getString("eventType");
|
||||
if (eventTypeStr != null) {
|
||||
return DatabusEventType.valueOf(eventTypeStr);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("[DatabusClient] 解析 eventType 失败", e);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* 判断是否为批量消息
|
||||
*/
|
||||
private boolean isBatchMessage(String body) {
|
||||
try {
|
||||
// 批量消息包含 taskId, batchNo, totalBatch 字段
|
||||
var json = JSON.parseObject(body);
|
||||
return json.containsKey("taskId")
|
||||
&& json.containsKey("batchNo")
|
||||
&& json.containsKey("totalBatch");
|
||||
} catch (Exception e) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,136 @@
|
||||
package com.zt.plat.framework.databus.client.core.registry;
|
||||
|
||||
import com.zt.plat.framework.databus.client.handler.BatchSyncEventHandler;
|
||||
import com.zt.plat.framework.databus.client.handler.SyncEventHandler;
|
||||
import com.zt.plat.module.databus.enums.DatabusEventType;
|
||||
import jakarta.annotation.PostConstruct;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Handler 策略注册表
|
||||
* <p>
|
||||
* 负责管理所有 Handler 实例,根<EFBC8C><E6A0B9> eventType 路由到对应的 Handler
|
||||
* <p>
|
||||
* 架构设计:
|
||||
* - 增量同步:使用 SyncEventHandler 处理单条消息
|
||||
* - 全量同步:使用 BatchSyncEventHandler 处理批量消息
|
||||
*
|
||||
* @author ZT
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
@ConditionalOnProperty(prefix = "zt.databus.sync.client", name = "enabled", havingValue = "true")
|
||||
public class HandlerRegistry {
|
||||
|
||||
/**
|
||||
* 增量同步 Handler 映射表
|
||||
* <p>
|
||||
* Key: DatabusEventType(事件类型)
|
||||
* Value: SyncEventHandler(增量同步处理器)
|
||||
*/
|
||||
private final Map<DatabusEventType, SyncEventHandler<?>> incrementalHandlers = new HashMap<>();
|
||||
|
||||
/**
|
||||
* 全量同步 Handler 映射表
|
||||
* <p>
|
||||
* Key: DatabusEventType(事件类型)
|
||||
* Value: BatchSyncEventHandler(批量同步处理器)
|
||||
*/
|
||||
private final Map<DatabusEventType, BatchSyncEventHandler<?>> batchHandlers = new HashMap<>();
|
||||
|
||||
/**
|
||||
* 自动注入所有 SyncEventHandler Bean
|
||||
*/
|
||||
@Autowired(required = false)
|
||||
private List<SyncEventHandler<?>> syncEventHandlers;
|
||||
|
||||
/**
|
||||
* 自动注入所有 BatchSyncEventHandler Bean
|
||||
*/
|
||||
@Autowired(required = false)
|
||||
private List<BatchSyncEventHandler<?>> batchSyncEventHandlers;
|
||||
|
||||
/**
|
||||
* 初始化注册表
|
||||
* <p>
|
||||
* 在 Bean 创建后自动调用,注册所有 Handler
|
||||
*/
|
||||
@PostConstruct
|
||||
public void init() {
|
||||
log.info("[HandlerRegistry] 开始初始化 Handler 注册表...");
|
||||
|
||||
// 注册增量同步 Handler
|
||||
if (syncEventHandlers != null) {
|
||||
for (SyncEventHandler<?> handler : syncEventHandlers) {
|
||||
DatabusEventType eventType = handler.getSupportedEventType();
|
||||
incrementalHandlers.put(eventType, handler);
|
||||
log.info("[HandlerRegistry] 注册增量Handler: {} -> {}",
|
||||
eventType, handler.getClass().getSimpleName());
|
||||
}
|
||||
}
|
||||
|
||||
// 注册全量同步 Handler
|
||||
if (batchSyncEventHandlers != null) {
|
||||
for (BatchSyncEventHandler<?> handler : batchSyncEventHandlers) {
|
||||
DatabusEventType eventType = handler.getSupportedEventType();
|
||||
batchHandlers.put(eventType, handler);
|
||||
log.info("[HandlerRegistry] 注册全量Handler: {} -> {}",
|
||||
eventType, handler.getClass().getSimpleName());
|
||||
}
|
||||
}
|
||||
|
||||
log.info("[HandlerRegistry] 初始化完成, 增量Handler={}个, 全量Handler={}个",
|
||||
incrementalHandlers.size(), batchHandlers.size());
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取增量同步 Handler
|
||||
*
|
||||
* @param eventType 事件类型
|
||||
* @param <T> 数据类型
|
||||
* @return 对应的 Handler,不存在则返回 null
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
public <T> SyncEventHandler<T> getIncrementalHandler(DatabusEventType eventType) {
|
||||
return (SyncEventHandler<T>) incrementalHandlers.get(eventType);
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取全量同步 Handler
|
||||
*
|
||||
* @param eventType 事件类型
|
||||
* @param <T> 数据类型
|
||||
* @return 对应的 Handler,不存在则返回 null
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
public <T> BatchSyncEventHandler<T> getBatchHandler(DatabusEventType eventType) {
|
||||
return (BatchSyncEventHandler<T>) batchHandlers.get(eventType);
|
||||
}
|
||||
|
||||
/**
|
||||
* 检查是否存在增量同步 Handler
|
||||
*
|
||||
* @param eventType 事件类型
|
||||
* @return 是否存在
|
||||
*/
|
||||
public boolean hasIncrementalHandler(DatabusEventType eventType) {
|
||||
return incrementalHandlers.containsKey(eventType);
|
||||
}
|
||||
|
||||
/**
|
||||
* 检查是否存在全量同步 Handler
|
||||
*
|
||||
* @param eventType 事件类型
|
||||
* @return 是否存在
|
||||
*/
|
||||
public boolean hasBatchHandler(DatabusEventType eventType) {
|
||||
return batchHandlers.containsKey(eventType);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user