diff --git a/pom.xml b/pom.xml
index 35973e7e..8c4d3911 100644
--- a/pom.xml
+++ b/pom.xml
@@ -235,7 +235,7 @@
dev
172.16.46.63:30848
- hwc
+ dev
DEFAULT_GROUP
nacos
P@ssword25
diff --git a/zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/core/publisher/DatabusEventPublisherImpl.java b/zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/core/publisher/DatabusEventPublisherImpl.java
index 2ac2c33e..6ebc1be0 100644
--- a/zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/core/publisher/DatabusEventPublisherImpl.java
+++ b/zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/core/publisher/DatabusEventPublisherImpl.java
@@ -2,7 +2,9 @@ package com.zt.plat.framework.databus.server.core.publisher;
import cn.hutool.core.bean.BeanUtil;
import com.zt.plat.framework.databus.server.core.event.DatabusEvent;
+import com.zt.plat.framework.databus.server.dal.dataobject.DatabusSyncEventDO;
import com.zt.plat.framework.databus.server.dal.dataobject.DatabusSyncEventRecordDO;
+import com.zt.plat.framework.databus.server.dal.mapper.DatabusSyncEventMapper;
import com.zt.plat.framework.databus.server.dal.mapper.DatabusSyncEventRecordMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
@@ -21,6 +23,7 @@ import org.springframework.transaction.annotation.Transactional;
public class DatabusEventPublisherImpl implements DatabusEventPublisher {
private final DatabusSyncEventRecordMapper eventRecordMapper;
+ private final DatabusSyncEventMapper eventMapper;
@Async
@Override
@@ -43,16 +46,24 @@ public class DatabusEventPublisherImpl implements DatabusEventPublisher {
* 保存事件记录到流水表
*/
private Long saveEventRecord(DatabusEvent event) {
- // 查询事件定义ID(这里简化处理,实际应该注入EventMapper查询)
- // TODO: 根据 eventType 查询 event_id
+ // 根据 eventType 查询事件定义 ID
+ DatabusSyncEventDO eventDO = eventMapper.selectByEventType(event.getEventType());
+ Long eventId = (eventDO != null) ? eventDO.getId() : null;
+ if (eventId == null) {
+ log.warn("[Databus] 事件定义不存在, eventType={}, 仍然保存事件记录", event.getEventType());
+ }
+
+ // 构建事件记录对象
DatabusSyncEventRecordDO record = new DatabusSyncEventRecordDO();
BeanUtil.copyProperties(event, record);
+ record.setEventId(eventId); // 设置事件定义 ID
+ // 保存到流水表
eventRecordMapper.insert(record);
- log.info("[Databus] 事件记录已保存, id={}, eventType={}, eventAction={}",
- record.getId(), event.getEventType(), event.getEventAction());
+ log.info("[Databus] 事件记录已保存, id={}, eventId={}, eventType={}, eventAction={}",
+ record.getId(), eventId, event.getEventType(), event.getEventAction());
return record.getId();
}
diff --git a/zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/service/impl/DatabusSyncDeadLetterServiceImpl.java b/zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/service/impl/DatabusSyncDeadLetterServiceImpl.java
index ee16880b..edb3e55f 100644
--- a/zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/service/impl/DatabusSyncDeadLetterServiceImpl.java
+++ b/zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/service/impl/DatabusSyncDeadLetterServiceImpl.java
@@ -1,7 +1,10 @@
package com.zt.plat.framework.databus.server.service.impl;
+import com.alibaba.fastjson2.JSON;
import com.zt.plat.framework.common.pojo.PageResult;
import com.zt.plat.framework.databus.server.controller.admin.vo.deadletter.DatabusSyncDeadLetterPageReqVO;
+import com.zt.plat.framework.databus.server.core.message.SyncMessage;
+import com.zt.plat.framework.databus.server.core.pusher.MessagePusher;
import com.zt.plat.framework.databus.server.dal.dataobject.DatabusSyncDeadLetterDO;
import com.zt.plat.framework.databus.server.dal.mapper.DatabusSyncDeadLetterMapper;
import com.zt.plat.framework.databus.server.enums.DeadLetterStatusEnum;
@@ -31,6 +34,9 @@ public class DatabusSyncDeadLetterServiceImpl implements DatabusSyncDeadLetterSe
@Resource
private DatabusSyncDeadLetterMapper deadLetterMapper;
+ @Resource
+ private MessagePusher messagePusher;
+
@Override
public DatabusSyncDeadLetterDO getDeadLetter(Long id) {
return deadLetterMapper.selectById(id);
@@ -50,16 +56,33 @@ public class DatabusSyncDeadLetterServiceImpl implements DatabusSyncDeadLetterSe
throw exception(DEAD_LETTER_NOT_EXISTS);
}
- // TODO: 实现重新投递逻辑,将消息重新发送到 MQ 或 HTTP
- log.info("[reprocessDeadLetter] 重新投递死信消息,ID: {}, 同步ID: {}", id, deadLetter.getSyncId());
+ // 重新投递逻辑:将消息重新发送到 MQ
+ log.info("[reprocessDeadLetter] 重新投递死信消息,ID: {}, 同步ID: {}, 客户端: {}",
+ id, deadLetter.getSyncId(), deadLetter.getClientCode());
- // 更新状态为已重新投递
- DatabusSyncDeadLetterDO updateObj = new DatabusSyncDeadLetterDO();
- updateObj.setId(id);
- updateObj.setStatus(DeadLetterStatusEnum.REDELIVERED.getStatus());
- updateObj.setHandled(1);
- updateObj.setHandleTime(LocalDateTime.now());
- deadLetterMapper.updateById(updateObj);
+ try {
+ // 1. 解析消息内容
+ SyncMessage message = JSON.parseObject(deadLetter.getMessageBody(), SyncMessage.class);
+
+ // 2. 构建 Topic(格式:databus-sync-{clientCode})
+ String topic = "databus-sync-" + deadLetter.getClientCode();
+
+ // 3. 重新推送到 MQ
+ String messageId = messagePusher.pushByMQ(topic, message);
+ log.info("[reprocessDeadLetter] 死信消息重新投递成功,ID: {}, MQ消息ID: {}", id, messageId);
+
+ // 4. 更新状态为已重新投递
+ DatabusSyncDeadLetterDO updateObj = new DatabusSyncDeadLetterDO();
+ updateObj.setId(id);
+ updateObj.setStatus(DeadLetterStatusEnum.REDELIVERED.getStatus());
+ updateObj.setHandled(1);
+ updateObj.setHandleTime(LocalDateTime.now());
+ deadLetterMapper.updateById(updateObj);
+
+ } catch (Exception e) {
+ log.error("[reprocessDeadLetter] 死信消息重新投递失败,ID: {}", id, e);
+ throw new RuntimeException("死信消息重新投递失败: " + e.getMessage(), e);
+ }
}
@Override
diff --git a/zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/service/impl/DatabusSyncLogServiceImpl.java b/zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/service/impl/DatabusSyncLogServiceImpl.java
index 64910c9a..a5e048f7 100644
--- a/zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/service/impl/DatabusSyncLogServiceImpl.java
+++ b/zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/service/impl/DatabusSyncLogServiceImpl.java
@@ -1,15 +1,23 @@
package com.zt.plat.framework.databus.server.service.impl;
import com.zt.plat.framework.common.pojo.PageResult;
+import com.zt.plat.framework.common.util.date.LocalDateTimeUtils;
import com.zt.plat.framework.databus.server.controller.admin.vo.pushlog.DatabusSyncPushLogPageReqVO;
+import com.zt.plat.framework.databus.server.core.message.SyncMessage;
+import com.zt.plat.framework.databus.server.core.pusher.MessagePusher;
+import com.zt.plat.framework.databus.server.dal.dataobject.DatabusSyncEventRecordDO;
import com.zt.plat.framework.databus.server.dal.dataobject.DatabusSyncLogDO;
+import com.zt.plat.framework.databus.server.dal.mapper.DatabusSyncEventRecordMapper;
import com.zt.plat.framework.databus.server.dal.mapper.DatabusSyncLogMapper;
import com.zt.plat.framework.databus.server.service.DatabusSyncLogService;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Transactional;
import org.springframework.validation.annotation.Validated;
+import java.time.LocalDateTime;
+
import static com.zt.plat.framework.common.exception.util.ServiceExceptionUtil.exception;
import static com.zt.plat.framework.databus.server.enums.ErrorCodeConstants.PUSH_LOG_NOT_EXISTS;
@@ -26,6 +34,12 @@ public class DatabusSyncLogServiceImpl implements DatabusSyncLogService {
@Resource
private DatabusSyncLogMapper pushLogMapper;
+ @Resource
+ private DatabusSyncEventRecordMapper eventRecordMapper;
+
+ @Resource
+ private MessagePusher messagePusher;
+
@Override
public DatabusSyncLogDO getPushLog(Long id) {
return pushLogMapper.selectById(id);
@@ -37,14 +51,52 @@ public class DatabusSyncLogServiceImpl implements DatabusSyncLogService {
}
@Override
+ @Transactional(rollbackFor = Exception.class)
public void retryPush(Long id) {
// 校验存在
DatabusSyncLogDO pushLog = pushLogMapper.selectById(id);
if (pushLog == null) {
throw exception(PUSH_LOG_NOT_EXISTS);
}
- // TODO: 实现重试逻辑,将日志重新入队或触发推送
- log.info("[retryPush] 重试推送,日志ID: {}, 同步ID: {}", id, pushLog.getSyncId());
+
+ // 重试推送逻辑:根据事件记录ID查询原始数据,重新推送
+ log.info("[retryPush] 重试推送,日志ID: {}, 同步ID: {}, 客户端: {}",
+ id, pushLog.getSyncId(), pushLog.getClientCode());
+
+ try {
+ // 1. 查询原始事件记录
+ DatabusSyncEventRecordDO eventRecord = eventRecordMapper.selectById(pushLog.getEventRecordId());
+ if (eventRecord == null) {
+ log.error("[retryPush] 事件记录不存在,无法重试推送,eventRecordId: {}", pushLog.getEventRecordId());
+ throw new RuntimeException("事件记录不存在,无法重试推送");
+ }
+
+ // 2. 构建同步消息
+ SyncMessage message = new SyncMessage();
+ message.setSyncId(pushLog.getSyncId());
+ message.setEventRecordId(eventRecord.getId());
+ message.setEventType(pushLog.getEventType());
+ message.setEventAction(eventRecord.getEventAction());
+ message.setDataSnapshot(eventRecord.getDataSnapshot());
+ message.setDataVersion(eventRecord.getDataVersion());
+ message.setTimestamp(System.currentTimeMillis());
+
+ // 3. 重新推送到 MQ
+ String messageId = messagePusher.pushByMQ(pushLog.getMqTopic(), message);
+ log.info("[retryPush] 日志重新推送成功,日志ID: {}, MQ消息ID: {}", id, messageId);
+
+ // 4. 更新推送日志状态
+ DatabusSyncLogDO updateObj = new DatabusSyncLogDO();
+ updateObj.setId(id);
+ updateObj.setStatus(3); // 状态改为重试中
+ updateObj.setRetryCount(pushLog.getRetryCount() + 1);
+ updateObj.setMqMsgId(messageId);
+ pushLogMapper.updateById(updateObj);
+
+ } catch (Exception e) {
+ log.error("[retryPush] 日志重试推送失败,日志ID: {}", id, e);
+ throw new RuntimeException("日志重试推送失败: " + e.getMessage(), e);
+ }
}
}
diff --git a/zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/service/impl/DatabusSyncSubscriptionServiceImpl.java b/zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/service/impl/DatabusSyncSubscriptionServiceImpl.java
index 6feb607c..d19d2bba 100644
--- a/zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/service/impl/DatabusSyncSubscriptionServiceImpl.java
+++ b/zt-framework/zt-spring-boot-starter-databus-server/src/main/java/com/zt/plat/framework/databus/server/service/impl/DatabusSyncSubscriptionServiceImpl.java
@@ -4,11 +4,13 @@ import com.zt.plat.framework.common.pojo.PageResult;
import com.zt.plat.framework.databus.server.controller.admin.vo.subscription.DatabusSyncSubscriptionPageReqVO;
import com.zt.plat.framework.databus.server.controller.admin.vo.subscription.DatabusSyncSubscriptionSaveReqVO;
import com.zt.plat.framework.databus.server.convert.DatabusSyncSubscriptionConvert;
+import com.zt.plat.framework.databus.server.core.sync.DatabusFullSyncService;
import com.zt.plat.framework.databus.server.dal.dataobject.DatabusSyncSubscriptionDO;
import com.zt.plat.framework.databus.server.dal.mapper.DatabusSyncSubscriptionMapper;
import com.zt.plat.framework.databus.server.service.DatabusSyncSubscriptionService;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
+import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.validation.annotation.Validated;
@@ -29,6 +31,9 @@ public class DatabusSyncSubscriptionServiceImpl implements DatabusSyncSubscripti
@Resource
private DatabusSyncSubscriptionMapper subscriptionMapper;
+ @Resource
+ private DatabusFullSyncService fullSyncService;
+
@Override
@Transactional(rollbackFor = Exception.class)
public Long createSubscription(DatabusSyncSubscriptionSaveReqVO createReqVO) {
@@ -122,8 +127,38 @@ public class DatabusSyncSubscriptionServiceImpl implements DatabusSyncSubscripti
public void triggerSync(Long id) {
// 校验存在
validateSubscriptionExists(id);
- // TODO: 发送消息到 MQ 或调用异步任务触发同步
- log.info("[triggerSync] 手动触发同步,订阅ID: {}", id);
+
+ // 手动触发全量同步:创建全量同步任务并异步执行
+ log.info("[triggerSync] 手动触发全量同步,订阅ID: {}", id);
+
+ try {
+ // 1. 创建全量同步任务
+ Long taskId = fullSyncService.createFullSyncTask(id, "手动触发全量同步");
+ log.info("[triggerSync] 创建全量同步任务成功,订阅ID: {}, 任务ID: {}", id, taskId);
+
+ // 2. 异步执行全量同步任务
+ executeFullSyncAsync(taskId);
+
+ } catch (Exception e) {
+ log.error("[triggerSync] 手动触发全量同步失败,订阅ID: {}", id, e);
+ throw new RuntimeException("触发全量同步失败: " + e.getMessage(), e);
+ }
+ }
+
+ /**
+ * 异步执行全量同步任务
+ *
+ * @param taskId 任务ID
+ */
+ @Async
+ protected void executeFullSyncAsync(Long taskId) {
+ try {
+ log.info("[executeFullSyncAsync] 开始执行全量同步任务,任务ID: {}", taskId);
+ fullSyncService.executeFullSyncTask(taskId);
+ log.info("[executeFullSyncAsync] 全量同步任务执行完成,任务ID: {}", taskId);
+ } catch (Exception e) {
+ log.error("[executeFullSyncAsync] 全量同步任务执行失败,任务ID: {}", taskId, e);
+ }
}
@Override
diff --git a/zt-module-databus/zt-module-databus-server/src/main/resources/application-dev.yml b/zt-module-databus/zt-module-databus-server/src/main/resources/application-dev.yml
index e33d10c4..892d171e 100644
--- a/zt-module-databus/zt-module-databus-server/src/main/resources/application-dev.yml
+++ b/zt-module-databus/zt-module-databus-server/src/main/resources/application-dev.yml
@@ -75,12 +75,12 @@ management:
# 日志文件配置
logging:
file:
- name: D:/project/zhongtong/logs/${spring.application.name}.log # 日志文件名,全路径
+ name: ${LOG_PATH:./logs}/${spring.application.name}.log # 日志文件名,使用环境变量或相对路径
# RocketMQ 配置项
rocketmq:
- name-server: 172.16.240.64:9876 # RocketMQ Namesrv
+ name-server: ${ROCKETMQ_NAME_SERVER:172.16.46.63:30876} # RocketMQ Namesrv,使用环境变量
producer:
group: databus-server-producer-group # 生产者组名
send-message-timeout: 10000 # 发送消息超时时间,单位:毫秒
@@ -121,7 +121,7 @@ zt:
- company-b # 配置订阅的客户端(与客户端的client-code一致)
mq:
enabled: true
- name-server: 172.16.240.64:9876 # RocketMQ NameServer 地址
+ name-server: ${ROCKETMQ_NAME_SERVER:172.16.46.63:30876} # RocketMQ NameServer 地址,使用环境变量
topic-base: databus-sync
producer-group: databus-server-producer
send-msg-timeout: 10000
diff --git a/zt-module-databus/zt-module-databus-server/src/main/resources/application-local.yml b/zt-module-databus/zt-module-databus-server/src/main/resources/application-local.yml
index 04b5c003..cd47ac91 100644
--- a/zt-module-databus/zt-module-databus-server/src/main/resources/application-local.yml
+++ b/zt-module-databus/zt-module-databus-server/src/main/resources/application-local.yml
@@ -88,7 +88,7 @@ mybatis-plus:
# RocketMQ 配置项
rocketmq:
- name-server: 172.16.46.63:9876 # RocketMQ Namesrv
+ name-server: ${ROCKETMQ_NAME_SERVER:172.16.46.63:30876} # RocketMQ Namesrv,使用环境变量
producer:
group: databus-server-producer-group # 生产者组名
send-message-timeout: 10000 # 发送消息超时时间,单位:毫秒
diff --git a/zt-module-system/zt-module-system-server/src/main/resources/application-dev.yaml b/zt-module-system/zt-module-system-server/src/main/resources/application-dev.yaml
index 7ae49cb2..9634d5cf 100644
--- a/zt-module-system/zt-module-system-server/src/main/resources/application-dev.yaml
+++ b/zt-module-system/zt-module-system-server/src/main/resources/application-dev.yaml
@@ -59,7 +59,7 @@ spring:
# rocketmq 配置项,对应 RocketMQProperties 配置类
rocketmq:
- name-server: 172.16.46.63:30876 # RocketMQ Namesrv
+ name-server: ${ROCKETMQ_NAME_SERVER:172.16.46.63:30876} # RocketMQ Namesrv,使用环境变量
spring:
# RabbitMQ 配置项,对应 RabbitProperties 配置类
@@ -109,7 +109,7 @@ spring:
# 日志文件配置
logging:
file:
- name: ${user.home}/logs/${spring.application.name}.log # 日志文件名,全路径
+ name: ${LOG_PATH:./logs}/${spring.application.name}.log # 日志文件名,使用环境变量或相对路径
--- #################### 微信公众号、小程序相关配置 ####################
wx:
diff --git a/zt-module-system/zt-module-system-server/src/main/resources/application-local.yaml b/zt-module-system/zt-module-system-server/src/main/resources/application-local.yaml
index 9bdd38e6..e22a45a1 100644
--- a/zt-module-system/zt-module-system-server/src/main/resources/application-local.yaml
+++ b/zt-module-system/zt-module-system-server/src/main/resources/application-local.yaml
@@ -60,7 +60,7 @@ spring:
# rocketmq 配置项,对应 RocketMQProperties 配置类
rocketmq:
- name-server: 172.16.46.63:30876 # RocketMQ Namesrv
+ name-server: ${ROCKETMQ_NAME_SERVER:172.16.46.63:30876} # RocketMQ Namesrv,使用环境变量
--- #################### 定时任务相关配置 ####################