From 15580ace8fec7aec5c81c2c509ff5965c1d8cf9a Mon Sep 17 00:00:00 2001 From: hewencai <2357300448@qq.com> Date: Wed, 3 Dec 2025 15:59:59 +0800 Subject: [PATCH] =?UTF-8?q?refactor(databus):=20=E4=BB=A3=E7=A0=81?= =?UTF-8?q?=E5=AE=A1=E6=9F=A5=E4=BF=AE=E5=A4=8D=E5=92=8CTODO=E5=AE=8C?= =?UTF-8?q?=E6=88=90?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 修复硬编码配置: - 修改日志路径为环境变量:${LOG_PATH:./logs}/${spring.application.name}.log - 修改 RocketMQ 地址为环境变量:${ROCKETMQ_NAME_SERVER:172.16.46.63:30876} - 还原 Nacos namespace 从 hwc 到 dev 完成 TODO 功能: - TODO #1: 实现死信重试逻辑(重新投递消息到 MQ) - TODO #2: 实现日志重试逻辑(根据事件记录重新推送) - TODO #3: 实现全量同步触发(创建任务并异步执行) - TODO #4: 实现事件 ID 查询(通过 eventType 查询事件定义) 涉及文件: - pom.xml: 还原 Nacos namespace 到 dev - DatabusSyncDeadLetterServiceImpl: 实现死信重试 - DatabusSyncLogServiceImpl: 实现日志重试 - DatabusSyncSubscriptionServiceImpl: 实现全量同步触发 - DatabusEventPublisherImpl: 实现事件 ID 查询 - application-*.yml/yaml: 修复硬编码配置 Ref: 代码审查报告 --- pom.xml | 2 +- .../publisher/DatabusEventPublisherImpl.java | 19 +++++-- .../DatabusSyncDeadLetterServiceImpl.java | 41 +++++++++++--- .../impl/DatabusSyncLogServiceImpl.java | 56 ++++++++++++++++++- .../DatabusSyncSubscriptionServiceImpl.java | 39 ++++++++++++- .../src/main/resources/application-dev.yml | 6 +- .../src/main/resources/application-local.yml | 2 +- .../src/main/resources/application-dev.yaml | 4 +- .../src/main/resources/application-local.yaml | 2 +- 9 files changed, 146 insertions(+), 25 deletions(-) diff --git a/pom.xml b/pom.xml index ee0032e3..e10371b8 100644 --- a/pom.xml +++ b/pom.xml @@ -235,7 +235,7 @@ dev 172.16.46.63:30848 - hwc + dev DEFAULT_GROUP nacos P@ssword25 diff --git a/zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/core/publisher/DatabusEventPublisherImpl.java b/zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/core/publisher/DatabusEventPublisherImpl.java index 2ac2c33e..6ebc1be0 100644 --- a/zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/core/publisher/DatabusEventPublisherImpl.java +++ b/zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/core/publisher/DatabusEventPublisherImpl.java @@ -2,7 +2,9 @@ package com.zt.plat.framework.databus.server.core.publisher; import cn.hutool.core.bean.BeanUtil; import com.zt.plat.framework.databus.server.core.event.DatabusEvent; +import com.zt.plat.framework.databus.server.dal.dataobject.DatabusSyncEventDO; import com.zt.plat.framework.databus.server.dal.dataobject.DatabusSyncEventRecordDO; +import com.zt.plat.framework.databus.server.dal.mapper.DatabusSyncEventMapper; import com.zt.plat.framework.databus.server.dal.mapper.DatabusSyncEventRecordMapper; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -21,6 +23,7 @@ import org.springframework.transaction.annotation.Transactional; public class DatabusEventPublisherImpl implements DatabusEventPublisher { private final DatabusSyncEventRecordMapper eventRecordMapper; + private final DatabusSyncEventMapper eventMapper; @Async @Override @@ -43,16 +46,24 @@ public class DatabusEventPublisherImpl implements DatabusEventPublisher { * 保存事件记录到流水表 */ private Long saveEventRecord(DatabusEvent event) { - // 查询事件定义ID(这里简化处理,实际应该注入EventMapper查询) - // TODO: 根据 eventType 查询 event_id + // 根据 eventType 查询事件定义 ID + DatabusSyncEventDO eventDO = eventMapper.selectByEventType(event.getEventType()); + Long eventId = (eventDO != null) ? eventDO.getId() : null; + if (eventId == null) { + log.warn("[Databus] 事件定义不存在, eventType={}, 仍然保存事件记录", event.getEventType()); + } + + // 构建事件记录对象 DatabusSyncEventRecordDO record = new DatabusSyncEventRecordDO(); BeanUtil.copyProperties(event, record); + record.setEventId(eventId); // 设置事件定义 ID + // 保存到流水表 eventRecordMapper.insert(record); - log.info("[Databus] 事件记录已保存, id={}, eventType={}, eventAction={}", - record.getId(), event.getEventType(), event.getEventAction()); + log.info("[Databus] 事件记录已保存, id={}, eventId={}, eventType={}, eventAction={}", + record.getId(), eventId, event.getEventType(), event.getEventAction()); return record.getId(); } diff --git a/zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/service/impl/DatabusSyncDeadLetterServiceImpl.java b/zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/service/impl/DatabusSyncDeadLetterServiceImpl.java index ee16880b..edb3e55f 100644 --- a/zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/service/impl/DatabusSyncDeadLetterServiceImpl.java +++ b/zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/service/impl/DatabusSyncDeadLetterServiceImpl.java @@ -1,7 +1,10 @@ package com.zt.plat.framework.databus.server.service.impl; +import com.alibaba.fastjson2.JSON; import com.zt.plat.framework.common.pojo.PageResult; import com.zt.plat.framework.databus.server.controller.admin.vo.deadletter.DatabusSyncDeadLetterPageReqVO; +import com.zt.plat.framework.databus.server.core.message.SyncMessage; +import com.zt.plat.framework.databus.server.core.pusher.MessagePusher; import com.zt.plat.framework.databus.server.dal.dataobject.DatabusSyncDeadLetterDO; import com.zt.plat.framework.databus.server.dal.mapper.DatabusSyncDeadLetterMapper; import com.zt.plat.framework.databus.server.enums.DeadLetterStatusEnum; @@ -31,6 +34,9 @@ public class DatabusSyncDeadLetterServiceImpl implements DatabusSyncDeadLetterSe @Resource private DatabusSyncDeadLetterMapper deadLetterMapper; + @Resource + private MessagePusher messagePusher; + @Override public DatabusSyncDeadLetterDO getDeadLetter(Long id) { return deadLetterMapper.selectById(id); @@ -50,16 +56,33 @@ public class DatabusSyncDeadLetterServiceImpl implements DatabusSyncDeadLetterSe throw exception(DEAD_LETTER_NOT_EXISTS); } - // TODO: 实现重新投递逻辑,将消息重新发送到 MQ 或 HTTP - log.info("[reprocessDeadLetter] 重新投递死信消息,ID: {}, 同步ID: {}", id, deadLetter.getSyncId()); + // 重新投递逻辑:将消息重新发送到 MQ + log.info("[reprocessDeadLetter] 重新投递死信消息,ID: {}, 同步ID: {}, 客户端: {}", + id, deadLetter.getSyncId(), deadLetter.getClientCode()); - // 更新状态为已重新投递 - DatabusSyncDeadLetterDO updateObj = new DatabusSyncDeadLetterDO(); - updateObj.setId(id); - updateObj.setStatus(DeadLetterStatusEnum.REDELIVERED.getStatus()); - updateObj.setHandled(1); - updateObj.setHandleTime(LocalDateTime.now()); - deadLetterMapper.updateById(updateObj); + try { + // 1. 解析消息内容 + SyncMessage message = JSON.parseObject(deadLetter.getMessageBody(), SyncMessage.class); + + // 2. 构建 Topic(格式:databus-sync-{clientCode}) + String topic = "databus-sync-" + deadLetter.getClientCode(); + + // 3. 重新推送到 MQ + String messageId = messagePusher.pushByMQ(topic, message); + log.info("[reprocessDeadLetter] 死信消息重新投递成功,ID: {}, MQ消息ID: {}", id, messageId); + + // 4. 更新状态为已重新投递 + DatabusSyncDeadLetterDO updateObj = new DatabusSyncDeadLetterDO(); + updateObj.setId(id); + updateObj.setStatus(DeadLetterStatusEnum.REDELIVERED.getStatus()); + updateObj.setHandled(1); + updateObj.setHandleTime(LocalDateTime.now()); + deadLetterMapper.updateById(updateObj); + + } catch (Exception e) { + log.error("[reprocessDeadLetter] 死信消息重新投递失败,ID: {}", id, e); + throw new RuntimeException("死信消息重新投递失败: " + e.getMessage(), e); + } } @Override diff --git a/zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/service/impl/DatabusSyncLogServiceImpl.java b/zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/service/impl/DatabusSyncLogServiceImpl.java index 64910c9a..a5e048f7 100644 --- a/zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/service/impl/DatabusSyncLogServiceImpl.java +++ b/zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/service/impl/DatabusSyncLogServiceImpl.java @@ -1,15 +1,23 @@ package com.zt.plat.framework.databus.server.service.impl; import com.zt.plat.framework.common.pojo.PageResult; +import com.zt.plat.framework.common.util.date.LocalDateTimeUtils; import com.zt.plat.framework.databus.server.controller.admin.vo.pushlog.DatabusSyncPushLogPageReqVO; +import com.zt.plat.framework.databus.server.core.message.SyncMessage; +import com.zt.plat.framework.databus.server.core.pusher.MessagePusher; +import com.zt.plat.framework.databus.server.dal.dataobject.DatabusSyncEventRecordDO; import com.zt.plat.framework.databus.server.dal.dataobject.DatabusSyncLogDO; +import com.zt.plat.framework.databus.server.dal.mapper.DatabusSyncEventRecordMapper; import com.zt.plat.framework.databus.server.dal.mapper.DatabusSyncLogMapper; import com.zt.plat.framework.databus.server.service.DatabusSyncLogService; import jakarta.annotation.Resource; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; import org.springframework.validation.annotation.Validated; +import java.time.LocalDateTime; + import static com.zt.plat.framework.common.exception.util.ServiceExceptionUtil.exception; import static com.zt.plat.framework.databus.server.enums.ErrorCodeConstants.PUSH_LOG_NOT_EXISTS; @@ -26,6 +34,12 @@ public class DatabusSyncLogServiceImpl implements DatabusSyncLogService { @Resource private DatabusSyncLogMapper pushLogMapper; + @Resource + private DatabusSyncEventRecordMapper eventRecordMapper; + + @Resource + private MessagePusher messagePusher; + @Override public DatabusSyncLogDO getPushLog(Long id) { return pushLogMapper.selectById(id); @@ -37,14 +51,52 @@ public class DatabusSyncLogServiceImpl implements DatabusSyncLogService { } @Override + @Transactional(rollbackFor = Exception.class) public void retryPush(Long id) { // 校验存在 DatabusSyncLogDO pushLog = pushLogMapper.selectById(id); if (pushLog == null) { throw exception(PUSH_LOG_NOT_EXISTS); } - // TODO: 实现重试逻辑,将日志重新入队或触发推送 - log.info("[retryPush] 重试推送,日志ID: {}, 同步ID: {}", id, pushLog.getSyncId()); + + // 重试推送逻辑:根据事件记录ID查询原始数据,重新推送 + log.info("[retryPush] 重试推送,日志ID: {}, 同步ID: {}, 客户端: {}", + id, pushLog.getSyncId(), pushLog.getClientCode()); + + try { + // 1. 查询原始事件记录 + DatabusSyncEventRecordDO eventRecord = eventRecordMapper.selectById(pushLog.getEventRecordId()); + if (eventRecord == null) { + log.error("[retryPush] 事件记录不存在,无法重试推送,eventRecordId: {}", pushLog.getEventRecordId()); + throw new RuntimeException("事件记录不存在,无法重试推送"); + } + + // 2. 构建同步消息 + SyncMessage message = new SyncMessage(); + message.setSyncId(pushLog.getSyncId()); + message.setEventRecordId(eventRecord.getId()); + message.setEventType(pushLog.getEventType()); + message.setEventAction(eventRecord.getEventAction()); + message.setDataSnapshot(eventRecord.getDataSnapshot()); + message.setDataVersion(eventRecord.getDataVersion()); + message.setTimestamp(System.currentTimeMillis()); + + // 3. 重新推送到 MQ + String messageId = messagePusher.pushByMQ(pushLog.getMqTopic(), message); + log.info("[retryPush] 日志重新推送成功,日志ID: {}, MQ消息ID: {}", id, messageId); + + // 4. 更新推送日志状态 + DatabusSyncLogDO updateObj = new DatabusSyncLogDO(); + updateObj.setId(id); + updateObj.setStatus(3); // 状态改为重试中 + updateObj.setRetryCount(pushLog.getRetryCount() + 1); + updateObj.setMqMsgId(messageId); + pushLogMapper.updateById(updateObj); + + } catch (Exception e) { + log.error("[retryPush] 日志重试推送失败,日志ID: {}", id, e); + throw new RuntimeException("日志重试推送失败: " + e.getMessage(), e); + } } } diff --git a/zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/service/impl/DatabusSyncSubscriptionServiceImpl.java b/zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/service/impl/DatabusSyncSubscriptionServiceImpl.java index 6feb607c..d19d2bba 100644 --- a/zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/service/impl/DatabusSyncSubscriptionServiceImpl.java +++ b/zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/service/impl/DatabusSyncSubscriptionServiceImpl.java @@ -4,11 +4,13 @@ import com.zt.plat.framework.common.pojo.PageResult; import com.zt.plat.framework.databus.server.controller.admin.vo.subscription.DatabusSyncSubscriptionPageReqVO; import com.zt.plat.framework.databus.server.controller.admin.vo.subscription.DatabusSyncSubscriptionSaveReqVO; import com.zt.plat.framework.databus.server.convert.DatabusSyncSubscriptionConvert; +import com.zt.plat.framework.databus.server.core.sync.DatabusFullSyncService; import com.zt.plat.framework.databus.server.dal.dataobject.DatabusSyncSubscriptionDO; import com.zt.plat.framework.databus.server.dal.mapper.DatabusSyncSubscriptionMapper; import com.zt.plat.framework.databus.server.service.DatabusSyncSubscriptionService; import jakarta.annotation.Resource; import lombok.extern.slf4j.Slf4j; +import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import org.springframework.validation.annotation.Validated; @@ -29,6 +31,9 @@ public class DatabusSyncSubscriptionServiceImpl implements DatabusSyncSubscripti @Resource private DatabusSyncSubscriptionMapper subscriptionMapper; + @Resource + private DatabusFullSyncService fullSyncService; + @Override @Transactional(rollbackFor = Exception.class) public Long createSubscription(DatabusSyncSubscriptionSaveReqVO createReqVO) { @@ -122,8 +127,38 @@ public class DatabusSyncSubscriptionServiceImpl implements DatabusSyncSubscripti public void triggerSync(Long id) { // 校验存在 validateSubscriptionExists(id); - // TODO: 发送消息到 MQ 或调用异步任务触发同步 - log.info("[triggerSync] 手动触发同步,订阅ID: {}", id); + + // 手动触发全量同步:创建全量同步任务并异步执行 + log.info("[triggerSync] 手动触发全量同步,订阅ID: {}", id); + + try { + // 1. 创建全量同步任务 + Long taskId = fullSyncService.createFullSyncTask(id, "手动触发全量同步"); + log.info("[triggerSync] 创建全量同步任务成功,订阅ID: {}, 任务ID: {}", id, taskId); + + // 2. 异步执行全量同步任务 + executeFullSyncAsync(taskId); + + } catch (Exception e) { + log.error("[triggerSync] 手动触发全量同步失败,订阅ID: {}", id, e); + throw new RuntimeException("触发全量同步失败: " + e.getMessage(), e); + } + } + + /** + * 异步执行全量同步任务 + * + * @param taskId 任务ID + */ + @Async + protected void executeFullSyncAsync(Long taskId) { + try { + log.info("[executeFullSyncAsync] 开始执行全量同步任务,任务ID: {}", taskId); + fullSyncService.executeFullSyncTask(taskId); + log.info("[executeFullSyncAsync] 全量同步任务执行完成,任务ID: {}", taskId); + } catch (Exception e) { + log.error("[executeFullSyncAsync] 全量同步任务执行失败,任务ID: {}", taskId, e); + } } @Override diff --git a/zt-module-databus/zt-module-databus-server/src/main/resources/application-dev.yml b/zt-module-databus/zt-module-databus-server/src/main/resources/application-dev.yml index e33d10c4..892d171e 100644 --- a/zt-module-databus/zt-module-databus-server/src/main/resources/application-dev.yml +++ b/zt-module-databus/zt-module-databus-server/src/main/resources/application-dev.yml @@ -75,12 +75,12 @@ management: # 日志文件配置 logging: file: - name: D:/project/zhongtong/logs/${spring.application.name}.log # 日志文件名,全路径 + name: ${LOG_PATH:./logs}/${spring.application.name}.log # 日志文件名,使用环境变量或相对路径 # RocketMQ 配置项 rocketmq: - name-server: 172.16.240.64:9876 # RocketMQ Namesrv + name-server: ${ROCKETMQ_NAME_SERVER:172.16.46.63:30876} # RocketMQ Namesrv,使用环境变量 producer: group: databus-server-producer-group # 生产者组名 send-message-timeout: 10000 # 发送消息超时时间,单位:毫秒 @@ -121,7 +121,7 @@ zt: - company-b # 配置订阅的客户端(与客户端的client-code一致) mq: enabled: true - name-server: 172.16.240.64:9876 # RocketMQ NameServer 地址 + name-server: ${ROCKETMQ_NAME_SERVER:172.16.46.63:30876} # RocketMQ NameServer 地址,使用环境变量 topic-base: databus-sync producer-group: databus-server-producer send-msg-timeout: 10000 diff --git a/zt-module-databus/zt-module-databus-server/src/main/resources/application-local.yml b/zt-module-databus/zt-module-databus-server/src/main/resources/application-local.yml index 04b5c003..cd47ac91 100644 --- a/zt-module-databus/zt-module-databus-server/src/main/resources/application-local.yml +++ b/zt-module-databus/zt-module-databus-server/src/main/resources/application-local.yml @@ -88,7 +88,7 @@ mybatis-plus: # RocketMQ 配置项 rocketmq: - name-server: 172.16.46.63:9876 # RocketMQ Namesrv + name-server: ${ROCKETMQ_NAME_SERVER:172.16.46.63:30876} # RocketMQ Namesrv,使用环境变量 producer: group: databus-server-producer-group # 生产者组名 send-message-timeout: 10000 # 发送消息超时时间,单位:毫秒 diff --git a/zt-module-system/zt-module-system-server/src/main/resources/application-dev.yaml b/zt-module-system/zt-module-system-server/src/main/resources/application-dev.yaml index 7ae49cb2..9634d5cf 100644 --- a/zt-module-system/zt-module-system-server/src/main/resources/application-dev.yaml +++ b/zt-module-system/zt-module-system-server/src/main/resources/application-dev.yaml @@ -59,7 +59,7 @@ spring: # rocketmq 配置项,对应 RocketMQProperties 配置类 rocketmq: - name-server: 172.16.46.63:30876 # RocketMQ Namesrv + name-server: ${ROCKETMQ_NAME_SERVER:172.16.46.63:30876} # RocketMQ Namesrv,使用环境变量 spring: # RabbitMQ 配置项,对应 RabbitProperties 配置类 @@ -109,7 +109,7 @@ spring: # 日志文件配置 logging: file: - name: ${user.home}/logs/${spring.application.name}.log # 日志文件名,全路径 + name: ${LOG_PATH:./logs}/${spring.application.name}.log # 日志文件名,使用环境变量或相对路径 --- #################### 微信公众号、小程序相关配置 #################### wx: diff --git a/zt-module-system/zt-module-system-server/src/main/resources/application-local.yaml b/zt-module-system/zt-module-system-server/src/main/resources/application-local.yaml index 9bdd38e6..e22a45a1 100644 --- a/zt-module-system/zt-module-system-server/src/main/resources/application-local.yaml +++ b/zt-module-system/zt-module-system-server/src/main/resources/application-local.yaml @@ -60,7 +60,7 @@ spring: # rocketmq 配置项,对应 RocketMQProperties 配置类 rocketmq: - name-server: 172.16.46.63:30876 # RocketMQ Namesrv + name-server: ${ROCKETMQ_NAME_SERVER:172.16.46.63:30876} # RocketMQ Namesrv,使用环境变量 --- #################### 定时任务相关配置 ####################