refactor(databus): 代码审查修复和TODO完成

修复硬编码配置:
- 修改日志路径为环境变量:${LOG_PATH:./logs}/${spring.application.name}.log
- 修改 RocketMQ 地址为环境变量:${ROCKETMQ_NAME_SERVER:172.16.46.63:30876}
- 还原 Nacos namespace 从 hwc 到 dev

完成 TODO 功能:
- TODO #1: 实现死信重试逻辑(重新投递消息到 MQ)
- TODO #2: 实现日志重试逻辑(根据事件记录重新推送)
- TODO #3: 实现全量同步触发(创建任务并异步执行)
- TODO #4: 实现事件 ID 查询(通过 eventType 查询事件定义)

涉及文件:
- pom.xml: 还原 Nacos namespace 到 dev
- DatabusSyncDeadLetterServiceImpl: 实现死信重试
- DatabusSyncLogServiceImpl: 实现日志重试
- DatabusSyncSubscriptionServiceImpl: 实现全量同步触发
- DatabusEventPublisherImpl: 实现事件 ID 查询
- application-*.yml/yaml: 修复硬编码配置

Ref: 代码审查报告
This commit is contained in:
hewencai
2025-12-03 15:59:59 +08:00
parent e093157bb5
commit ffc7d0247d
9 changed files with 146 additions and 25 deletions

View File

@@ -235,7 +235,7 @@
<env.name>dev</env.name>
<!--Nacos 配置-->
<config.server-addr>172.16.46.63:30848</config.server-addr>
<config.namespace>hwc</config.namespace>
<config.namespace>dev</config.namespace>
<config.group>DEFAULT_GROUP</config.group>
<config.username>nacos</config.username>
<config.password>P@ssword25</config.password>

View File

@@ -2,7 +2,9 @@ package com.zt.plat.framework.databus.server.core.publisher;
import cn.hutool.core.bean.BeanUtil;
import com.zt.plat.framework.databus.server.core.event.DatabusEvent;
import com.zt.plat.framework.databus.server.dal.dataobject.DatabusSyncEventDO;
import com.zt.plat.framework.databus.server.dal.dataobject.DatabusSyncEventRecordDO;
import com.zt.plat.framework.databus.server.dal.mapper.DatabusSyncEventMapper;
import com.zt.plat.framework.databus.server.dal.mapper.DatabusSyncEventRecordMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@@ -21,6 +23,7 @@ import org.springframework.transaction.annotation.Transactional;
public class DatabusEventPublisherImpl implements DatabusEventPublisher {
private final DatabusSyncEventRecordMapper eventRecordMapper;
private final DatabusSyncEventMapper eventMapper;
@Async
@Override
@@ -43,16 +46,24 @@ public class DatabusEventPublisherImpl implements DatabusEventPublisher {
* 保存事件记录到流水表
*/
private Long saveEventRecord(DatabusEvent event) {
// 查询事件定义ID这里简化处理实际应该注入EventMapper查询
// TODO: 根据 eventType 查询 event_id
// 根据 eventType 查询事件定义 ID
DatabusSyncEventDO eventDO = eventMapper.selectByEventType(event.getEventType());
Long eventId = (eventDO != null) ? eventDO.getId() : null;
if (eventId == null) {
log.warn("[Databus] 事件定义不存在, eventType={}, 仍然保存事件记录", event.getEventType());
}
// 构建事件记录对象
DatabusSyncEventRecordDO record = new DatabusSyncEventRecordDO();
BeanUtil.copyProperties(event, record);
record.setEventId(eventId); // 设置事件定义 ID
// 保存到流水表
eventRecordMapper.insert(record);
log.info("[Databus] 事件记录已保存, id={}, eventType={}, eventAction={}",
record.getId(), event.getEventType(), event.getEventAction());
log.info("[Databus] 事件记录已保存, id={}, eventId={}, eventType={}, eventAction={}",
record.getId(), eventId, event.getEventType(), event.getEventAction());
return record.getId();
}

View File

@@ -1,7 +1,10 @@
package com.zt.plat.framework.databus.server.service.impl;
import com.alibaba.fastjson2.JSON;
import com.zt.plat.framework.common.pojo.PageResult;
import com.zt.plat.framework.databus.server.controller.admin.vo.deadletter.DatabusSyncDeadLetterPageReqVO;
import com.zt.plat.framework.databus.server.core.message.SyncMessage;
import com.zt.plat.framework.databus.server.core.pusher.MessagePusher;
import com.zt.plat.framework.databus.server.dal.dataobject.DatabusSyncDeadLetterDO;
import com.zt.plat.framework.databus.server.dal.mapper.DatabusSyncDeadLetterMapper;
import com.zt.plat.framework.databus.server.enums.DeadLetterStatusEnum;
@@ -31,6 +34,9 @@ public class DatabusSyncDeadLetterServiceImpl implements DatabusSyncDeadLetterSe
@Resource
private DatabusSyncDeadLetterMapper deadLetterMapper;
@Resource
private MessagePusher messagePusher;
@Override
public DatabusSyncDeadLetterDO getDeadLetter(Long id) {
return deadLetterMapper.selectById(id);
@@ -50,16 +56,33 @@ public class DatabusSyncDeadLetterServiceImpl implements DatabusSyncDeadLetterSe
throw exception(DEAD_LETTER_NOT_EXISTS);
}
// TODO: 实现重新投递逻辑将消息重新发送到 MQ 或 HTTP
log.info("[reprocessDeadLetter] 重新投递死信消息ID: {}, 同步ID: {}", id, deadLetter.getSyncId());
// 重新投递逻辑将消息重新发送到 MQ
log.info("[reprocessDeadLetter] 重新投递死信消息ID: {}, 同步ID: {}, 客户端: {}",
id, deadLetter.getSyncId(), deadLetter.getClientCode());
// 更新状态为已重新投递
DatabusSyncDeadLetterDO updateObj = new DatabusSyncDeadLetterDO();
updateObj.setId(id);
updateObj.setStatus(DeadLetterStatusEnum.REDELIVERED.getStatus());
updateObj.setHandled(1);
updateObj.setHandleTime(LocalDateTime.now());
deadLetterMapper.updateById(updateObj);
try {
// 1. 解析消息内容
SyncMessage message = JSON.parseObject(deadLetter.getMessageBody(), SyncMessage.class);
// 2. 构建 Topic格式databus-sync-{clientCode}
String topic = "databus-sync-" + deadLetter.getClientCode();
// 3. 重新推送到 MQ
String messageId = messagePusher.pushByMQ(topic, message);
log.info("[reprocessDeadLetter] 死信消息重新投递成功ID: {}, MQ消息ID: {}", id, messageId);
// 4. 更新状态为已重新投递
DatabusSyncDeadLetterDO updateObj = new DatabusSyncDeadLetterDO();
updateObj.setId(id);
updateObj.setStatus(DeadLetterStatusEnum.REDELIVERED.getStatus());
updateObj.setHandled(1);
updateObj.setHandleTime(LocalDateTime.now());
deadLetterMapper.updateById(updateObj);
} catch (Exception e) {
log.error("[reprocessDeadLetter] 死信消息重新投递失败ID: {}", id, e);
throw new RuntimeException("死信消息重新投递失败: " + e.getMessage(), e);
}
}
@Override

View File

@@ -1,15 +1,23 @@
package com.zt.plat.framework.databus.server.service.impl;
import com.zt.plat.framework.common.pojo.PageResult;
import com.zt.plat.framework.common.util.date.LocalDateTimeUtils;
import com.zt.plat.framework.databus.server.controller.admin.vo.pushlog.DatabusSyncPushLogPageReqVO;
import com.zt.plat.framework.databus.server.core.message.SyncMessage;
import com.zt.plat.framework.databus.server.core.pusher.MessagePusher;
import com.zt.plat.framework.databus.server.dal.dataobject.DatabusSyncEventRecordDO;
import com.zt.plat.framework.databus.server.dal.dataobject.DatabusSyncLogDO;
import com.zt.plat.framework.databus.server.dal.mapper.DatabusSyncEventRecordMapper;
import com.zt.plat.framework.databus.server.dal.mapper.DatabusSyncLogMapper;
import com.zt.plat.framework.databus.server.service.DatabusSyncLogService;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.validation.annotation.Validated;
import java.time.LocalDateTime;
import static com.zt.plat.framework.common.exception.util.ServiceExceptionUtil.exception;
import static com.zt.plat.framework.databus.server.enums.ErrorCodeConstants.PUSH_LOG_NOT_EXISTS;
@@ -26,6 +34,12 @@ public class DatabusSyncLogServiceImpl implements DatabusSyncLogService {
@Resource
private DatabusSyncLogMapper pushLogMapper;
@Resource
private DatabusSyncEventRecordMapper eventRecordMapper;
@Resource
private MessagePusher messagePusher;
@Override
public DatabusSyncLogDO getPushLog(Long id) {
return pushLogMapper.selectById(id);
@@ -37,14 +51,52 @@ public class DatabusSyncLogServiceImpl implements DatabusSyncLogService {
}
@Override
@Transactional(rollbackFor = Exception.class)
public void retryPush(Long id) {
// 校验存在
DatabusSyncLogDO pushLog = pushLogMapper.selectById(id);
if (pushLog == null) {
throw exception(PUSH_LOG_NOT_EXISTS);
}
// TODO: 实现重试逻辑,将日志重新入队或触发推送
log.info("[retryPush] 重试推送日志ID: {}, 同步ID: {}", id, pushLog.getSyncId());
// 重试推送逻辑根据事件记录ID查询原始数据重新推送
log.info("[retryPush] 重试推送日志ID: {}, 同步ID: {}, 客户端: {}",
id, pushLog.getSyncId(), pushLog.getClientCode());
try {
// 1. 查询原始事件记录
DatabusSyncEventRecordDO eventRecord = eventRecordMapper.selectById(pushLog.getEventRecordId());
if (eventRecord == null) {
log.error("[retryPush] 事件记录不存在无法重试推送eventRecordId: {}", pushLog.getEventRecordId());
throw new RuntimeException("事件记录不存在,无法重试推送");
}
// 2. 构建同步消息
SyncMessage message = new SyncMessage();
message.setSyncId(pushLog.getSyncId());
message.setEventRecordId(eventRecord.getId());
message.setEventType(pushLog.getEventType());
message.setEventAction(eventRecord.getEventAction());
message.setDataSnapshot(eventRecord.getDataSnapshot());
message.setDataVersion(eventRecord.getDataVersion());
message.setTimestamp(System.currentTimeMillis());
// 3. 重新推送到 MQ
String messageId = messagePusher.pushByMQ(pushLog.getMqTopic(), message);
log.info("[retryPush] 日志重新推送成功日志ID: {}, MQ消息ID: {}", id, messageId);
// 4. 更新推送日志状态
DatabusSyncLogDO updateObj = new DatabusSyncLogDO();
updateObj.setId(id);
updateObj.setStatus(3); // 状态改为重试中
updateObj.setRetryCount(pushLog.getRetryCount() + 1);
updateObj.setMqMsgId(messageId);
pushLogMapper.updateById(updateObj);
} catch (Exception e) {
log.error("[retryPush] 日志重试推送失败日志ID: {}", id, e);
throw new RuntimeException("日志重试推送失败: " + e.getMessage(), e);
}
}
}

View File

@@ -4,11 +4,13 @@ import com.zt.plat.framework.common.pojo.PageResult;
import com.zt.plat.framework.databus.server.controller.admin.vo.subscription.DatabusSyncSubscriptionPageReqVO;
import com.zt.plat.framework.databus.server.controller.admin.vo.subscription.DatabusSyncSubscriptionSaveReqVO;
import com.zt.plat.framework.databus.server.convert.DatabusSyncSubscriptionConvert;
import com.zt.plat.framework.databus.server.core.sync.DatabusFullSyncService;
import com.zt.plat.framework.databus.server.dal.dataobject.DatabusSyncSubscriptionDO;
import com.zt.plat.framework.databus.server.dal.mapper.DatabusSyncSubscriptionMapper;
import com.zt.plat.framework.databus.server.service.DatabusSyncSubscriptionService;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.validation.annotation.Validated;
@@ -29,6 +31,9 @@ public class DatabusSyncSubscriptionServiceImpl implements DatabusSyncSubscripti
@Resource
private DatabusSyncSubscriptionMapper subscriptionMapper;
@Resource
private DatabusFullSyncService fullSyncService;
@Override
@Transactional(rollbackFor = Exception.class)
public Long createSubscription(DatabusSyncSubscriptionSaveReqVO createReqVO) {
@@ -122,8 +127,38 @@ public class DatabusSyncSubscriptionServiceImpl implements DatabusSyncSubscripti
public void triggerSync(Long id) {
// 校验存在
validateSubscriptionExists(id);
// TODO: 发送消息到 MQ 或调用异步任务触发同步
log.info("[triggerSync] 手动触发同步订阅ID: {}", id);
// 手动触发全量同步:创建全量同步任务并异步执行
log.info("[triggerSync] 手动触发全量同步订阅ID: {}", id);
try {
// 1. 创建全量同步任务
Long taskId = fullSyncService.createFullSyncTask(id, "手动触发全量同步");
log.info("[triggerSync] 创建全量同步任务成功订阅ID: {}, 任务ID: {}", id, taskId);
// 2. 异步执行全量同步任务
executeFullSyncAsync(taskId);
} catch (Exception e) {
log.error("[triggerSync] 手动触发全量同步失败订阅ID: {}", id, e);
throw new RuntimeException("触发全量同步失败: " + e.getMessage(), e);
}
}
/**
* 异步执行全量同步任务
*
* @param taskId 任务ID
*/
@Async
protected void executeFullSyncAsync(Long taskId) {
try {
log.info("[executeFullSyncAsync] 开始执行全量同步任务任务ID: {}", taskId);
fullSyncService.executeFullSyncTask(taskId);
log.info("[executeFullSyncAsync] 全量同步任务执行完成任务ID: {}", taskId);
} catch (Exception e) {
log.error("[executeFullSyncAsync] 全量同步任务执行失败任务ID: {}", taskId, e);
}
}
@Override

View File

@@ -75,12 +75,12 @@ management:
# 日志文件配置
logging:
file:
name: D:/project/zhongtong/logs/${spring.application.name}.log # 日志文件名,路径
name: ${LOG_PATH:./logs}/${spring.application.name}.log # 日志文件名,使用环境变量或相对路径
# RocketMQ 配置项
rocketmq:
name-server: 172.16.240.64:9876 # RocketMQ Namesrv
name-server: ${ROCKETMQ_NAME_SERVER:172.16.46.63:30876} # RocketMQ Namesrv,使用环境变量
producer:
group: databus-server-producer-group # 生产者组名
send-message-timeout: 10000 # 发送消息超时时间,单位:毫秒
@@ -121,7 +121,7 @@ zt:
- company-b # 配置订阅的客户端与客户端的client-code一致
mq:
enabled: true
name-server: 172.16.240.64:9876 # RocketMQ NameServer 地址
name-server: ${ROCKETMQ_NAME_SERVER:172.16.46.63:30876} # RocketMQ NameServer 地址,使用环境变量
topic-base: databus-sync
producer-group: databus-server-producer
send-msg-timeout: 10000

View File

@@ -88,7 +88,7 @@ mybatis-plus:
# RocketMQ 配置项
rocketmq:
name-server: 172.16.46.63:9876 # RocketMQ Namesrv
name-server: ${ROCKETMQ_NAME_SERVER:172.16.46.63:30876} # RocketMQ Namesrv,使用环境变量
producer:
group: databus-server-producer-group # 生产者组名
send-message-timeout: 10000 # 发送消息超时时间,单位:毫秒

View File

@@ -59,7 +59,7 @@ spring:
# rocketmq 配置项,对应 RocketMQProperties 配置类
rocketmq:
name-server: 172.16.46.63:30876 # RocketMQ Namesrv
name-server: ${ROCKETMQ_NAME_SERVER:172.16.46.63:30876} # RocketMQ Namesrv,使用环境变量
spring:
# RabbitMQ 配置项,对应 RabbitProperties 配置类
@@ -109,7 +109,7 @@ spring:
# 日志文件配置
logging:
file:
name: ${user.home}/logs/${spring.application.name}.log # 日志文件名,路径
name: ${LOG_PATH:./logs}/${spring.application.name}.log # 日志文件名,使用环境变量或相对路径
--- #################### 微信公众号、小程序相关配置 ####################
wx:

View File

@@ -60,7 +60,7 @@ spring:
# rocketmq 配置项,对应 RocketMQProperties 配置类
rocketmq:
name-server: 172.16.46.63:30876 # RocketMQ Namesrv
name-server: ${ROCKETMQ_NAME_SERVER:172.16.46.63:30876} # RocketMQ Namesrv,使用环境变量
--- #################### 定时任务相关配置 ####################