feat(databus): 完成阶段四-DataBus Server完整功能
- 补充缺失的 API 类(DatabusMessage、DatabusBatchMessage、DatabusEventType) - 新增变更消息消费者(3个:部门、用户、岗位) - 新增数据提供者(3个:部门、用户、岗位) - 确认分发器服务(核心定向推送逻辑) - 确认全量同步与消息推送组件 - 确认管理后台 API(5个 Controller) - 确认 Service ��(4个核心服务) - 确认 DAL 层(7个 DO + Mapper) - 添加 databus-server starter 依赖到 pom.xml - 编译验证通过 Ref: docs/databus/implementation-checklist.md 任务 39-70
This commit is contained in:
71
zt-framework/zt-spring-boot-starter-databus-client/pom.xml
Normal file
71
zt-framework/zt-spring-boot-starter-databus-client/pom.xml
Normal file
@@ -0,0 +1,71 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<parent>
|
||||
<groupId>com.zt.plat</groupId>
|
||||
<artifactId>zt-framework</artifactId>
|
||||
<version>${revision}</version>
|
||||
</parent>
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
<artifactId>zt-spring-boot-starter-databus-client</artifactId>
|
||||
<packaging>jar</packaging>
|
||||
|
||||
<name>${project.artifactId}</name>
|
||||
<description>DataBus 客户端组件,负责接收数据变更并同步</description>
|
||||
<url>https://github.com/YunaiV/ruoyi-vue-pro</url>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>com.zt.plat</groupId>
|
||||
<artifactId>zt-common</artifactId>
|
||||
</dependency>
|
||||
|
||||
<!-- Databus API(事件类型枚举等) -->
|
||||
<dependency>
|
||||
<groupId>com.zt.plat</groupId>
|
||||
<artifactId>zt-module-databus-api</artifactId>
|
||||
<version>${revision}</version>
|
||||
</dependency>
|
||||
|
||||
<!-- System API(用于默认Handler实现) -->
|
||||
<dependency>
|
||||
<groupId>com.zt.plat</groupId>
|
||||
<artifactId>zt-module-system-api</artifactId>
|
||||
<version>${revision}</version>
|
||||
<optional>true</optional>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>cn.hutool</groupId>
|
||||
<artifactId>hutool-all</artifactId>
|
||||
</dependency>
|
||||
|
||||
<!-- MQ 相关 -->
|
||||
<dependency>
|
||||
<groupId>com.zt.plat</groupId>
|
||||
<artifactId>zt-spring-boot-starter-mq</artifactId>
|
||||
</dependency>
|
||||
|
||||
<!-- Redis 相关 (用于幂等) -->
|
||||
<dependency>
|
||||
<groupId>com.zt.plat</groupId>
|
||||
<artifactId>zt-spring-boot-starter-redis</artifactId>
|
||||
</dependency>
|
||||
|
||||
<!-- Web 相关 (用于HTTP接收) -->
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-web</artifactId>
|
||||
<optional>true</optional>
|
||||
</dependency>
|
||||
|
||||
<!-- Test -->
|
||||
<dependency>
|
||||
<groupId>com.zt.plat</groupId>
|
||||
<artifactId>zt-spring-boot-starter-test</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
</project>
|
||||
@@ -0,0 +1,23 @@
|
||||
package com.zt.plat.framework.databus.client.config;
|
||||
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.boot.autoconfigure.AutoConfiguration;
|
||||
import org.springframework.boot.context.properties.EnableConfigurationProperties;
|
||||
import org.springframework.context.annotation.ComponentScan;
|
||||
|
||||
/**
|
||||
* Databus 同步客户端自动配置
|
||||
*
|
||||
* @author ZT
|
||||
*/
|
||||
@Slf4j
|
||||
@AutoConfiguration
|
||||
@EnableConfigurationProperties(DatabusSyncClientProperties.class)
|
||||
@ComponentScan(basePackages = "com.zt.plat.framework.databus.client")
|
||||
public class DatabusSyncClientAutoConfiguration {
|
||||
|
||||
public DatabusSyncClientAutoConfiguration() {
|
||||
log.info("[Databus] 数据同步客户端模块已加载");
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,113 @@
|
||||
package com.zt.plat.framework.databus.client.config;
|
||||
|
||||
import lombok.Data;
|
||||
import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||
|
||||
/**
|
||||
* Databus 数据同步客户端配置属性
|
||||
*
|
||||
* @author ZT
|
||||
*/
|
||||
@Data
|
||||
@ConfigurationProperties(prefix = "zt.databus.sync.client")
|
||||
public class DatabusSyncClientProperties {
|
||||
|
||||
/**
|
||||
* 是否启用
|
||||
*/
|
||||
private Boolean enabled = true;
|
||||
/**
|
||||
* RocketMQ NameServer 地址
|
||||
*/
|
||||
private String nameServer;
|
||||
|
||||
/**
|
||||
* 客户端编码(必填,用于订阅专属Topic)
|
||||
* Topic格式: databus-sync-{eventType}-{clientCode}
|
||||
*/
|
||||
private String clientCode;
|
||||
|
||||
/**
|
||||
* MQ配置
|
||||
*/
|
||||
private Mq mq = new Mq();
|
||||
|
||||
/**
|
||||
* HTTP配置
|
||||
*/
|
||||
private Http http = new Http();
|
||||
|
||||
/**
|
||||
* 幂等配置
|
||||
*/
|
||||
private Idempotent idempotent = new Idempotent();
|
||||
|
||||
@Data
|
||||
public static class Mq {
|
||||
/**
|
||||
* 是否启用MQ消费
|
||||
*/
|
||||
private Boolean enabled = true;
|
||||
/**
|
||||
* RocketMQ NameServer 地址
|
||||
*/
|
||||
private String nameServer;
|
||||
|
||||
/**
|
||||
* Topic基础名称
|
||||
*/
|
||||
private String topicBase = "databus-sync";
|
||||
|
||||
/**
|
||||
* 消费者组前缀,完整格式: {consumerGroupPrefix}-{eventType}
|
||||
* 默认: databus-client-{clientCode}
|
||||
*/
|
||||
private String consumerGroupPrefix;
|
||||
|
||||
/**
|
||||
* 消费线程数
|
||||
*/
|
||||
private Integer consumeThreadMin = 1;
|
||||
|
||||
/**
|
||||
* 消费线程数
|
||||
*/
|
||||
private Integer consumeThreadMax = 5;
|
||||
|
||||
/**
|
||||
* 最大重试次数
|
||||
*/
|
||||
private Integer maxReconsumeTimes = 3;
|
||||
}
|
||||
|
||||
@Data
|
||||
public static class Http {
|
||||
/**
|
||||
* 是否启用HTTP推送接收
|
||||
*/
|
||||
private Boolean enabled = false;
|
||||
|
||||
/**
|
||||
* 接收端点路径
|
||||
*/
|
||||
private String endpoint = "/databus/sync/receive";
|
||||
}
|
||||
|
||||
@Data
|
||||
public static class Idempotent {
|
||||
/**
|
||||
* 是否启用幂等检查
|
||||
*/
|
||||
private Boolean enabled = true;
|
||||
/**
|
||||
* RocketMQ NameServer 地址
|
||||
*/
|
||||
private String nameServer;
|
||||
|
||||
/**
|
||||
* 幂等记录过期时间(秒)
|
||||
*/
|
||||
private Integer expireSeconds = 86400; // 24小时
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,43 @@
|
||||
package com.zt.plat.framework.databus.client.core.controller;
|
||||
|
||||
import com.zt.plat.framework.databus.client.core.processor.MessageProcessor;
|
||||
import com.zt.plat.module.databus.enums.DatabusEventType;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.web.bind.annotation.*;
|
||||
|
||||
/**
|
||||
* 同步消息HTTP接收控制器
|
||||
*
|
||||
* @author ZT
|
||||
*/
|
||||
@Slf4j
|
||||
@RestController
|
||||
@RequestMapping("${zt.databus.sync.client.http.endpoint:/databus/sync/receive}")
|
||||
@RequiredArgsConstructor
|
||||
@ConditionalOnProperty(prefix = "zt.databus.sync.client.http", name = "enabled", havingValue = "true")
|
||||
public class SyncMessageController {
|
||||
|
||||
private final MessageProcessor messageProcessor;
|
||||
|
||||
/**
|
||||
* 接收同步消息
|
||||
*
|
||||
* @param eventType 事件类型
|
||||
* @param message 消息体
|
||||
*/
|
||||
@PostMapping("/{eventType}")
|
||||
public void receive(@PathVariable("eventType") String eventType, @RequestBody String message) {
|
||||
log.debug("[Databus Client] 接收到HTTP消息, eventType={}", eventType);
|
||||
|
||||
DatabusEventType type = DatabusEventType.valueOf(eventType);
|
||||
if (type == null) {
|
||||
log.warn("[Databus Client] 未知的事件类型: {}", eventType);
|
||||
return;
|
||||
}
|
||||
|
||||
messageProcessor.process(message, type);
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,19 @@
|
||||
package com.zt.plat.framework.databus.client.core.idempotent;
|
||||
|
||||
/**
|
||||
* 幂等存储接口
|
||||
*
|
||||
* @author ZT
|
||||
*/
|
||||
public interface IdempotentStore {
|
||||
|
||||
/**
|
||||
* 检查并记录消息是否已处理
|
||||
*
|
||||
* @param syncId 同步ID
|
||||
* @param expireSeconds 过期时间(秒)
|
||||
* @return true-未处理(可以处理), false-已处理(应该跳过)
|
||||
*/
|
||||
boolean checkAndMark(String syncId, int expireSeconds);
|
||||
|
||||
}
|
||||
@@ -0,0 +1,30 @@
|
||||
package com.zt.plat.framework.databus.client.core.idempotent;
|
||||
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import org.springframework.data.redis.core.StringRedisTemplate;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* 基于 Redis 的幂等存储实现
|
||||
*
|
||||
* @author ZT
|
||||
*/
|
||||
@Component
|
||||
@RequiredArgsConstructor
|
||||
public class RedisIdempotentStore implements IdempotentStore {
|
||||
|
||||
private static final String KEY_PREFIX = "databus:sync:idempotent:";
|
||||
|
||||
private final StringRedisTemplate stringRedisTemplate;
|
||||
|
||||
@Override
|
||||
public boolean checkAndMark(String syncId, int expireSeconds) {
|
||||
String key = KEY_PREFIX + syncId;
|
||||
Boolean success = stringRedisTemplate.opsForValue()
|
||||
.setIfAbsent(key, "1", expireSeconds, TimeUnit.SECONDS);
|
||||
return Boolean.TRUE.equals(success);
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,148 @@
|
||||
package com.zt.plat.framework.databus.client.core.listener;
|
||||
|
||||
import com.zt.plat.framework.databus.client.config.DatabusSyncClientProperties;
|
||||
import com.zt.plat.framework.databus.client.core.processor.MessageProcessor;
|
||||
import com.zt.plat.module.databus.enums.DatabusEventType;
|
||||
import jakarta.annotation.PostConstruct;
|
||||
import jakarta.annotation.PreDestroy;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
|
||||
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
|
||||
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
|
||||
import org.apache.rocketmq.client.exception.MQClientException;
|
||||
import org.apache.rocketmq.common.message.MessageExt;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Databus Consumer 注册器
|
||||
* <p>
|
||||
* 根据 DatabusEventType 枚举自动为所有事件类型创建消费者
|
||||
* Topic格式: {topicBase}-{module}-{entity}-{action}-{clientCode}
|
||||
* 例如: databus-sync-system-user-create-company-a
|
||||
*
|
||||
* @author ZT
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
@ConditionalOnProperty(prefix = "zt.databus.sync.client.mq", name = "enabled", havingValue = "true", matchIfMissing = true)
|
||||
public class DatabusConsumerRegistry {
|
||||
|
||||
private final MessageProcessor messageProcessor;
|
||||
private final DatabusSyncClientProperties properties;
|
||||
|
||||
|
||||
/**
|
||||
* 管理的消费者列表
|
||||
*/
|
||||
private final List<DefaultMQPushConsumer> consumers = new ArrayList<>();
|
||||
|
||||
public DatabusConsumerRegistry(MessageProcessor messageProcessor, DatabusSyncClientProperties properties) {
|
||||
this.messageProcessor = messageProcessor;
|
||||
this.properties = properties;
|
||||
}
|
||||
|
||||
@PostConstruct
|
||||
public void init() {
|
||||
if (!Boolean.TRUE.equals(properties.getEnabled())) {
|
||||
log.info("[Databus Client] 客户端未启用,跳过初始化");
|
||||
return;
|
||||
}
|
||||
|
||||
String nameServer = properties.getMq().getNameServer();
|
||||
if (nameServer == null || nameServer.isEmpty()) {
|
||||
log.warn("[Databus Client] RocketMQ nameServer未配置,跳过MQ消费者初始化");
|
||||
return;
|
||||
}
|
||||
|
||||
String clientCode = properties.getClientCode();
|
||||
if (clientCode == null || clientCode.isEmpty()) {
|
||||
log.error("[Databus Client] clientCode未配置,无法订阅Topic");
|
||||
return;
|
||||
}
|
||||
|
||||
String topicBase = properties.getMq().getTopicBase();
|
||||
String consumerGroupPrefix = properties.getMq().getConsumerGroupPrefix();
|
||||
if (consumerGroupPrefix == null || consumerGroupPrefix.isEmpty()) {
|
||||
consumerGroupPrefix = "databus-client-" + clientCode;
|
||||
}
|
||||
|
||||
// 为每个事件类型创建独立的消费者
|
||||
for (DatabusEventType eventType : DatabusEventType.values()) {
|
||||
try {
|
||||
createConsumer(eventType, topicBase, clientCode, consumerGroupPrefix, nameServer);
|
||||
} catch (Exception e) {
|
||||
log.error("[Databus Client] 创建消费者失败, eventType={}", eventType.name(), e);
|
||||
}
|
||||
}
|
||||
|
||||
log.info("[Databus Client] 消费者注册完成,共创建 {} 个消费者", consumers.size());
|
||||
}
|
||||
|
||||
/**
|
||||
* 为指定事件类型创建消费者
|
||||
*/
|
||||
private void createConsumer(DatabusEventType eventType, String topicBase, String clientCode, String consumerGroupPrefix, String nameServer) throws MQClientException {
|
||||
// Topic: databus-sync-system-user-create-company-a
|
||||
String topic = eventType.getTopic(topicBase, clientCode);
|
||||
// ConsumerGroup: databus-client-company-a-system-user-create
|
||||
String consumerGroup = String.format("%s-%s", consumerGroupPrefix, eventType.getTopicSuffix());
|
||||
|
||||
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
|
||||
consumer.setNamesrvAddr(nameServer);
|
||||
consumer.setConsumeThreadMin(properties.getMq().getConsumeThreadMin());
|
||||
consumer.setConsumeThreadMax(properties.getMq().getConsumeThreadMax());
|
||||
consumer.setMaxReconsumeTimes(properties.getMq().getMaxReconsumeTimes());
|
||||
|
||||
// 订阅Topic
|
||||
consumer.subscribe(topic, "*");
|
||||
|
||||
// 设置消息监听器
|
||||
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
|
||||
for (MessageExt msg : msgs) {
|
||||
try {
|
||||
String messageBody = new String(msg.getBody(), StandardCharsets.UTF_8);
|
||||
log.debug("[Databus Client] 收到消息, topic={}, msgId={}, eventType={}",
|
||||
msg.getTopic(), msg.getMsgId(), eventType.name());
|
||||
|
||||
messageProcessor.process(messageBody, eventType);
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("[Databus Client] 消息处理失败, topic={}, msgId={}, reconsumeTimes={}",
|
||||
msg.getTopic(), msg.getMsgId(), msg.getReconsumeTimes(), e);
|
||||
|
||||
if (msg.getReconsumeTimes() >= properties.getMq().getMaxReconsumeTimes()) {
|
||||
log.error("[Databus Client] 重试次数已达上限,放弃处理, msgId={}", msg.getMsgId());
|
||||
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
|
||||
}
|
||||
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
|
||||
}
|
||||
}
|
||||
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
|
||||
});
|
||||
|
||||
consumer.start();
|
||||
consumers.add(consumer);
|
||||
|
||||
log.info("[Databus Client] 消费者启动成功, topic={}, consumerGroup={}, event={}",
|
||||
topic, consumerGroup, eventType.getName());
|
||||
}
|
||||
|
||||
@PreDestroy
|
||||
public void destroy() {
|
||||
for (DefaultMQPushConsumer consumer : consumers) {
|
||||
try {
|
||||
consumer.shutdown();
|
||||
} catch (Exception e) {
|
||||
log.error("[Databus Client] 关闭消费者失败", e);
|
||||
}
|
||||
}
|
||||
consumers.clear();
|
||||
log.info("[Databus Client] 所有消费者已关闭");
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,54 @@
|
||||
package com.zt.plat.framework.databus.client.core.message;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Builder;
|
||||
import lombok.Data;
|
||||
import lombok.NoArgsConstructor;
|
||||
|
||||
/**
|
||||
* 数据同步消息(服务端推送格式)
|
||||
*
|
||||
* @author ZT
|
||||
*/
|
||||
@Data
|
||||
@Builder
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
public class SyncMessage {
|
||||
|
||||
/**
|
||||
* 同步ID(唯一标识)
|
||||
*/
|
||||
private String syncId;
|
||||
|
||||
/**
|
||||
* 事件记录ID
|
||||
*/
|
||||
private Long eventRecordId;
|
||||
|
||||
/**
|
||||
* 事件类型
|
||||
*/
|
||||
private String eventType;
|
||||
|
||||
/**
|
||||
* 事件动作
|
||||
*/
|
||||
private String eventAction;
|
||||
|
||||
/**
|
||||
* 业务数据快照(JSON字符串)
|
||||
*/
|
||||
private String dataSnapshot;
|
||||
|
||||
/**
|
||||
* 数据版本
|
||||
*/
|
||||
private Integer dataVersion;
|
||||
|
||||
/**
|
||||
* 时间戳
|
||||
*/
|
||||
private Long timestamp;
|
||||
|
||||
}
|
||||
@@ -0,0 +1,236 @@
|
||||
package com.zt.plat.framework.databus.client.core.processor;
|
||||
|
||||
import cn.hutool.json.JSONUtil;
|
||||
import com.zt.plat.framework.databus.client.config.DatabusSyncClientProperties;
|
||||
import com.zt.plat.framework.databus.client.core.idempotent.IdempotentStore;
|
||||
import com.zt.plat.framework.databus.client.core.message.SyncMessage;
|
||||
import com.zt.plat.framework.databus.client.handler.BatchSyncEventHandler;
|
||||
import com.zt.plat.framework.databus.client.handler.SyncEventHandler;
|
||||
import com.zt.plat.module.databus.api.message.DatabusBatchMessage;
|
||||
import com.zt.plat.module.databus.api.message.DatabusMessage;
|
||||
import com.zt.plat.module.databus.enums.DatabusEventType;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* 消息处理器
|
||||
* <p>
|
||||
* 负责消息的幂等检查和路由到具体的Handler
|
||||
*
|
||||
* @author ZT
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
@SuppressWarnings({"rawtypes", "unchecked"})
|
||||
public class MessageProcessor {
|
||||
|
||||
private final IdempotentStore idempotentStore;
|
||||
private final DatabusSyncClientProperties properties;
|
||||
|
||||
/**
|
||||
* 增量同步处理器列表
|
||||
*/
|
||||
private List<SyncEventHandler> handlers = new ArrayList<>();
|
||||
|
||||
/**
|
||||
* 全量同步处理器列表
|
||||
*/
|
||||
private List<BatchSyncEventHandler> batchHandlers = new ArrayList<>();
|
||||
|
||||
private Map<DatabusEventType, SyncEventHandler> handlerMap;
|
||||
private Map<DatabusEventType, BatchSyncEventHandler> batchHandlerMap;
|
||||
|
||||
public MessageProcessor(IdempotentStore idempotentStore, DatabusSyncClientProperties properties) {
|
||||
this.idempotentStore = idempotentStore;
|
||||
this.properties = properties;
|
||||
}
|
||||
|
||||
@Autowired(required = false)
|
||||
public void setHandlers(List<SyncEventHandler> handlers) {
|
||||
if (handlers != null) {
|
||||
this.handlers = handlers;
|
||||
}
|
||||
}
|
||||
|
||||
@Autowired(required = false)
|
||||
public void setBatchHandlers(List<BatchSyncEventHandler> batchHandlers) {
|
||||
if (batchHandlers != null) {
|
||||
this.batchHandlers = batchHandlers;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理同步消息
|
||||
*
|
||||
* @param messageJson 消息JSON字符串
|
||||
* @param eventType 事件类型枚举
|
||||
*/
|
||||
public void process(String messageJson, DatabusEventType eventType) {
|
||||
try {
|
||||
if (eventType.isFullSync()) {
|
||||
processBatchMessage(messageJson, eventType);
|
||||
} else {
|
||||
processIncrementalMessage(messageJson, eventType);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
log.error("[Databus Client] 消息处理失败, eventType={}, message={}", eventType.name(), messageJson, e);
|
||||
throw new RuntimeException("消息处理失败", e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理增量同步消息
|
||||
*/
|
||||
private void processIncrementalMessage(String messageJson, DatabusEventType eventType) {
|
||||
// 先解析为服务端推送的 SyncMessage 格式
|
||||
SyncMessage syncMessage = JSONUtil.toBean(messageJson, SyncMessage.class);
|
||||
|
||||
// 幂等检查
|
||||
if (properties.getIdempotent().getEnabled()) {
|
||||
boolean canProcess = idempotentStore.checkAndMark(
|
||||
syncMessage.getSyncId(),
|
||||
properties.getIdempotent().getExpireSeconds()
|
||||
);
|
||||
if (!canProcess) {
|
||||
log.info("[Databus Client] 消息已处理,跳过, syncId={}", syncMessage.getSyncId());
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// 路由到对应的Handler
|
||||
SyncEventHandler handler = getHandler(eventType);
|
||||
if (handler == null) {
|
||||
log.warn("[Databus Client] 未找到事件处理器, eventType={}", eventType.name());
|
||||
return;
|
||||
}
|
||||
|
||||
// 获取 Handler 期望的数据类型,并转换消息
|
||||
Class<?> dataType = handler.getDataType();
|
||||
DatabusMessage message = convertToDatabusMessage(syncMessage, eventType, dataType);
|
||||
handler.handle(message);
|
||||
log.info("[Databus Client] 增量消息处理成功, syncId={}, eventType={}",
|
||||
syncMessage.getSyncId(), eventType.name());
|
||||
}
|
||||
|
||||
/**
|
||||
* 将 SyncMessage 转换为 DatabusMessage
|
||||
*
|
||||
* @param syncMessage 服务端推送的同步消息
|
||||
* @param eventType 事件类型
|
||||
* @param dataType Handler 期望的数据类型
|
||||
* @return DatabusMessage
|
||||
*/
|
||||
private DatabusMessage convertToDatabusMessage(SyncMessage syncMessage, DatabusEventType eventType, Class<?> dataType) {
|
||||
DatabusMessage message = new DatabusMessage();
|
||||
message.setMessageId(syncMessage.getSyncId());
|
||||
message.setEventType(eventType);
|
||||
|
||||
// 从 dataSnapshot JSON 字符串解析业务数据对象
|
||||
if (syncMessage.getDataSnapshot() != null && !syncMessage.getDataSnapshot().isEmpty()) {
|
||||
// 使用 Handler 提供的类型进行反序列化
|
||||
Object data = JSONUtil.toBean(syncMessage.getDataSnapshot(), dataType);
|
||||
message.setData(data);
|
||||
}
|
||||
|
||||
// 设置时间戳
|
||||
if (syncMessage.getTimestamp() != null) {
|
||||
message.setTimestamp(java.time.LocalDateTime.ofInstant(
|
||||
java.time.Instant.ofEpochMilli(syncMessage.getTimestamp()),
|
||||
java.time.ZoneId.systemDefault()
|
||||
));
|
||||
}
|
||||
|
||||
return message;
|
||||
}
|
||||
|
||||
/**
|
||||
* 处理全量同步批量消息
|
||||
*/
|
||||
private void processBatchMessage(String messageJson, DatabusEventType eventType) {
|
||||
DatabusBatchMessage message = JSONUtil.toBean(messageJson, DatabusBatchMessage.class);
|
||||
|
||||
// 幂等检查
|
||||
if (properties.getIdempotent().getEnabled()) {
|
||||
boolean canProcess = idempotentStore.checkAndMark(
|
||||
message.getMessageId(),
|
||||
properties.getIdempotent().getExpireSeconds()
|
||||
);
|
||||
if (!canProcess) {
|
||||
log.info("[Databus Client] 批量消息已处理,跳过, messageId={}", message.getMessageId());
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// 路由到对应的BatchHandler
|
||||
BatchSyncEventHandler handler = getBatchHandler(eventType);
|
||||
if (handler == null) {
|
||||
log.warn("[Databus Client] 未找到批量事件处理器, eventType={}", eventType.name());
|
||||
return;
|
||||
}
|
||||
|
||||
// 全量同步开始回调(第一批)
|
||||
if (message.getBatchNo() != null && message.getBatchNo() == 1) {
|
||||
handler.onFullSyncStart(message);
|
||||
log.info("[Databus Client] 全量同步开始, taskId={}, eventType={}, totalCount={}",
|
||||
message.getTaskId(), eventType.name(), message.getTotalCount());
|
||||
}
|
||||
|
||||
// 执行批量处理
|
||||
handler.handleBatch(message);
|
||||
log.info("[Databus Client] 批量消息处理成功, messageId={}, eventType={}, batchNo={}/{}, count={}",
|
||||
message.getMessageId(), eventType.name(),
|
||||
message.getBatchNo(), message.getTotalBatch(), message.getCount());
|
||||
|
||||
// 全量同步完成回调(最后一批)
|
||||
if (Boolean.TRUE.equals(message.getIsLastBatch())) {
|
||||
handler.onFullSyncComplete(message);
|
||||
log.info("[Databus Client] 全量同步完成, taskId={}, eventType={}, totalCount={}",
|
||||
message.getTaskId(), eventType.name(), message.getTotalCount());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取增量同步事件处理器
|
||||
*/
|
||||
private SyncEventHandler getHandler(DatabusEventType eventType) {
|
||||
if (handlerMap == null) {
|
||||
if (handlers == null || handlers.isEmpty()) {
|
||||
handlerMap = Collections.emptyMap();
|
||||
} else {
|
||||
handlerMap = handlers.stream()
|
||||
.collect(Collectors.toMap(
|
||||
SyncEventHandler::getSupportedEventType,
|
||||
Function.identity()
|
||||
));
|
||||
}
|
||||
}
|
||||
return handlerMap.get(eventType);
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取全量同步批量事件处理器
|
||||
*/
|
||||
private BatchSyncEventHandler getBatchHandler(DatabusEventType eventType) {
|
||||
if (batchHandlerMap == null) {
|
||||
if (batchHandlers == null || batchHandlers.isEmpty()) {
|
||||
batchHandlerMap = Collections.emptyMap();
|
||||
} else {
|
||||
batchHandlerMap = batchHandlers.stream()
|
||||
.collect(Collectors.toMap(
|
||||
BatchSyncEventHandler::getSupportedEventType,
|
||||
Function.identity()
|
||||
));
|
||||
}
|
||||
}
|
||||
return batchHandlerMap.get(eventType);
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,51 @@
|
||||
package com.zt.plat.framework.databus.client.handler;
|
||||
|
||||
import com.zt.plat.module.databus.api.message.DatabusBatchMessage;
|
||||
import com.zt.plat.module.databus.enums.DatabusEventType;
|
||||
|
||||
/**
|
||||
* 批量同步事件处理器接口
|
||||
* <p>
|
||||
* 业务系统需要实现此接口来处理全量同步的批量数据
|
||||
*
|
||||
* @author ZT
|
||||
*/
|
||||
public interface BatchSyncEventHandler<T> {
|
||||
|
||||
/**
|
||||
* 获取支持的事件类型
|
||||
*
|
||||
* @return 事件类型枚举
|
||||
*/
|
||||
DatabusEventType getSupportedEventType();
|
||||
|
||||
/**
|
||||
* 处理批量同步消息
|
||||
*
|
||||
* @param message 批量同步消息
|
||||
*/
|
||||
void handleBatch(DatabusBatchMessage<T> message);
|
||||
|
||||
/**
|
||||
* 全量同步开始回调(可选实现)
|
||||
* <p>
|
||||
* 当收到第一批数据(batchNo=1)时调用
|
||||
*
|
||||
* @param message 批量同步消息
|
||||
*/
|
||||
default void onFullSyncStart(DatabusBatchMessage<T> message) {
|
||||
// 默认空实现,子类可覆盖
|
||||
}
|
||||
|
||||
/**
|
||||
* 全量同步完成回调(可选实现)
|
||||
* <p>
|
||||
* 当收到最后一批数据(isLastBatch=true)时调用
|
||||
*
|
||||
* @param message 批量同步消息
|
||||
*/
|
||||
default void onFullSyncComplete(DatabusBatchMessage<T> message) {
|
||||
// 默认空实现,子类可覆盖
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,57 @@
|
||||
package com.zt.plat.framework.databus.client.handler;
|
||||
|
||||
import com.zt.plat.module.databus.api.message.DatabusMessage;
|
||||
import com.zt.plat.module.databus.enums.DatabusEventType;
|
||||
|
||||
import java.lang.reflect.ParameterizedType;
|
||||
import java.lang.reflect.Type;
|
||||
|
||||
/**
|
||||
* 同步事件处理器接口
|
||||
* <p>
|
||||
* 业务系统需要实现此接口来处理增量数据同步
|
||||
*
|
||||
* @author ZT
|
||||
*/
|
||||
public interface SyncEventHandler<T> {
|
||||
|
||||
/**
|
||||
* 获取支持的事件类型
|
||||
*
|
||||
* @return 事件类型枚举
|
||||
*/
|
||||
DatabusEventType getSupportedEventType();
|
||||
|
||||
/**
|
||||
* 处理同步消息
|
||||
*
|
||||
* @param message 同步消息
|
||||
*/
|
||||
void handle(DatabusMessage<T> message);
|
||||
|
||||
/**
|
||||
* 获取数据类型
|
||||
* <p>
|
||||
* 默认通过反射获取泛型类型参数,子类可以覆盖此方法提供具体类型
|
||||
*
|
||||
* @return 数据类型的 Class 对象
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
default Class<T> getDataType() {
|
||||
Type[] genericInterfaces = this.getClass().getGenericInterfaces();
|
||||
for (Type genericInterface : genericInterfaces) {
|
||||
if (genericInterface instanceof ParameterizedType) {
|
||||
ParameterizedType parameterizedType = (ParameterizedType) genericInterface;
|
||||
if (parameterizedType.getRawType().equals(SyncEventHandler.class)) {
|
||||
Type[] typeArguments = parameterizedType.getActualTypeArguments();
|
||||
if (typeArguments.length > 0 && typeArguments[0] instanceof Class) {
|
||||
return (Class<T>) typeArguments[0];
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
// 如果无法获取泛型类型,返回 Object.class
|
||||
return (Class<T>) Object.class;
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,108 @@
|
||||
package com.zt.plat.framework.databus.client.handler.post;
|
||||
|
||||
import com.zt.plat.module.databus.api.data.PostData;
|
||||
import com.zt.plat.module.system.api.dept.PostApi;
|
||||
import com.zt.plat.module.system.api.dept.dto.PostSaveReqDTO;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
/**
|
||||
* 岗位同步业务逻辑
|
||||
* <p>
|
||||
* 被各个 PostHandler 共享使用
|
||||
*
|
||||
* @author ZT
|
||||
*/
|
||||
@Slf4j
|
||||
@Service
|
||||
@ConditionalOnProperty(prefix = "zt.databus.sync.client", name = "enabled", havingValue = "true")
|
||||
@ConditionalOnClass(name = "com.zt.plat.module.system.api.dept.PostApi")
|
||||
public class PostSyncService {
|
||||
|
||||
@Autowired(required = false)
|
||||
private PostApi postApi;
|
||||
|
||||
/**
|
||||
* 创建岗位
|
||||
*/
|
||||
public void create(PostData data) {
|
||||
if (postApi == null) {
|
||||
log.warn("[PostSync] PostApi未注入,跳过创建岗位操作, postId={}", data.getId());
|
||||
return;
|
||||
}
|
||||
PostSaveReqDTO dto = buildPostDTO(data);
|
||||
postApi.createPost(dto).checkError();
|
||||
log.info("[PostSync] 创建岗位成功, postId={}, postName={}", dto.getId(), dto.getName());
|
||||
}
|
||||
|
||||
/**
|
||||
* 更新岗位
|
||||
*/
|
||||
public void update(PostData data) {
|
||||
if (postApi == null) {
|
||||
log.warn("[PostSync] PostApi未注入,跳过更新岗位操作, postId={}", data.getId());
|
||||
return;
|
||||
}
|
||||
PostSaveReqDTO dto = buildPostDTO(data);
|
||||
postApi.updatePost(dto).checkError();
|
||||
log.info("[PostSync] 更新岗位成功, postId={}, postName={}", dto.getId(), dto.getName());
|
||||
}
|
||||
|
||||
/**
|
||||
* 删除岗位
|
||||
*/
|
||||
public void delete(PostData data) {
|
||||
if (postApi == null) {
|
||||
log.warn("[PostSync] PostApi未注入,跳过删除岗位操作, postId={}", data.getId());
|
||||
return;
|
||||
}
|
||||
Long postId = data.getId();
|
||||
if (postId != null) {
|
||||
postApi.deletePost(postId).checkError();
|
||||
log.info("[PostSync] 删除岗位成功, postId={}", postId);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 全量同步单条数据(存在则更新,不存在则创建)
|
||||
*/
|
||||
public void fullSync(PostData data) {
|
||||
if (postApi == null) {
|
||||
log.warn("[PostSync] PostApi未注入,跳过全量同步岗位操作, postId={}", data.getId());
|
||||
return;
|
||||
}
|
||||
PostSaveReqDTO dto = buildPostDTO(data);
|
||||
try {
|
||||
if (dto.getId() != null) {
|
||||
var existing = postApi.getPost(dto.getId());
|
||||
if (existing.isSuccess() && existing.getData() != null) {
|
||||
postApi.updatePost(dto).checkError();
|
||||
} else {
|
||||
postApi.createPost(dto).checkError();
|
||||
}
|
||||
} else {
|
||||
postApi.createPost(dto).checkError();
|
||||
}
|
||||
} catch (Exception e) {
|
||||
postApi.createPost(dto).checkError();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 构建岗位DTO
|
||||
*/
|
||||
private PostSaveReqDTO buildPostDTO(PostData data) {
|
||||
PostSaveReqDTO dto = new PostSaveReqDTO();
|
||||
dto.setId(data.getId());
|
||||
dto.setCode(data.getCode());
|
||||
dto.setName(data.getName());
|
||||
dto.setSort(data.getSort());
|
||||
dto.setStatus(data.getStatus());
|
||||
dto.setRemark(data.getRemark());
|
||||
return dto;
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,71 @@
|
||||
package com.zt.plat.framework.databus.client.handler.post;
|
||||
|
||||
import com.zt.plat.framework.databus.client.handler.BatchSyncEventHandler;
|
||||
import com.zt.plat.module.databus.api.data.PostData;
|
||||
import com.zt.plat.module.databus.api.message.DatabusBatchMessage;
|
||||
import com.zt.plat.module.databus.enums.DatabusEventType;
|
||||
import jakarta.annotation.Resource;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
/**
|
||||
* 岗位全量同步事件处理器
|
||||
*
|
||||
* @author ZT
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
@ConditionalOnProperty(prefix = "zt.databus.sync.client", name = "enabled", havingValue = "true")
|
||||
@ConditionalOnBean(PostSyncService.class)
|
||||
public class SystemPostFullHandler implements BatchSyncEventHandler<PostData> {
|
||||
|
||||
@Resource
|
||||
private PostSyncService postSyncService;
|
||||
|
||||
@Override
|
||||
public DatabusEventType getSupportedEventType() {
|
||||
return DatabusEventType.SYSTEM_POST_FULL;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFullSyncStart(DatabusBatchMessage<PostData> message) {
|
||||
log.info("[PostSync] 全量同步开始, taskId={}, totalCount={}, totalBatch={}",
|
||||
message.getTaskId(), message.getTotalCount(), message.getTotalBatch());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleBatch(DatabusBatchMessage<PostData> message) {
|
||||
log.info("[PostSync] 处理批次, batchNo={}/{}, count={}",
|
||||
message.getBatchNo(), message.getTotalBatch(), message.getCount());
|
||||
|
||||
if (message.getDataList() == null || message.getDataList().isEmpty()) {
|
||||
log.warn("[PostSync] 数据列表为空, batchNo={}", message.getBatchNo());
|
||||
return;
|
||||
}
|
||||
|
||||
int successCount = 0;
|
||||
int failCount = 0;
|
||||
|
||||
for (PostData data : message.getDataList()) {
|
||||
try {
|
||||
postSyncService.fullSync(data);
|
||||
successCount++;
|
||||
} catch (Exception e) {
|
||||
failCount++;
|
||||
log.error("[PostSync] 处理数据项失败, postId={}", data.getId(), e);
|
||||
}
|
||||
}
|
||||
|
||||
log.info("[PostSync] 批次处理完成, batchNo={}, success={}, fail={}",
|
||||
message.getBatchNo(), successCount, failCount);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFullSyncComplete(DatabusBatchMessage<PostData> message) {
|
||||
log.info("[PostSync] 全量同步完成, taskId={}, totalCount={}",
|
||||
message.getTaskId(), message.getTotalCount());
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,2 @@
|
||||
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
|
||||
com.zt.plat.framework.databus.client.config.DatabusSyncClientAutoConfiguration
|
||||
@@ -0,0 +1 @@
|
||||
com.zt.plat.framework.databus.client.config.DatabusSyncClientAutoConfiguration
|
||||
Reference in New Issue
Block a user