diff --git a/zt-framework/pom.xml b/zt-framework/pom.xml
index f60ab308..241a211e 100644
--- a/zt-framework/pom.xml
+++ b/zt-framework/pom.xml
@@ -17,7 +17,8 @@
zt-spring-boot-starter-web
zt-spring-boot-starter-security
zt-spring-boot-starter-websocket
-
+ zt-spring-boot-starter-databus-server
+ zt-spring-boot-starter-databus-client
zt-spring-boot-starter-monitor
zt-spring-boot-starter-protection
diff --git a/zt-framework/zt-spring-boot-starter-databus-client/pom.xml b/zt-framework/zt-spring-boot-starter-databus-client/pom.xml
new file mode 100644
index 00000000..08e8dcc8
--- /dev/null
+++ b/zt-framework/zt-spring-boot-starter-databus-client/pom.xml
@@ -0,0 +1,71 @@
+
+
+
+ com.zt.plat
+ zt-framework
+ ${revision}
+
+ 4.0.0
+ zt-spring-boot-starter-databus-client
+ jar
+
+ ${project.artifactId}
+ DataBus 客户端组件,负责接收数据变更并同步
+ https://github.com/YunaiV/ruoyi-vue-pro
+
+
+
+ com.zt.plat
+ zt-common
+
+
+
+
+ com.zt.plat
+ zt-module-databus-api
+ ${revision}
+
+
+
+
+ com.zt.plat
+ zt-module-system-api
+ ${revision}
+ true
+
+
+
+ cn.hutool
+ hutool-all
+
+
+
+
+ com.zt.plat
+ zt-spring-boot-starter-mq
+
+
+
+
+ com.zt.plat
+ zt-spring-boot-starter-redis
+
+
+
+
+ org.springframework.boot
+ spring-boot-starter-web
+ true
+
+
+
+
+ com.zt.plat
+ zt-spring-boot-starter-test
+ test
+
+
+
+
diff --git a/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/config/DatabusSyncClientAutoConfiguration.java b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/config/DatabusSyncClientAutoConfiguration.java
new file mode 100644
index 00000000..afa72c43
--- /dev/null
+++ b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/config/DatabusSyncClientAutoConfiguration.java
@@ -0,0 +1,23 @@
+package com.zt.plat.framework.databus.client.config;
+
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.boot.autoconfigure.AutoConfiguration;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.context.annotation.ComponentScan;
+
+/**
+ * Databus 同步客户端自动配置
+ *
+ * @author ZT
+ */
+@Slf4j
+@AutoConfiguration
+@EnableConfigurationProperties(DatabusSyncClientProperties.class)
+@ComponentScan(basePackages = "com.zt.plat.framework.databus.client")
+public class DatabusSyncClientAutoConfiguration {
+
+ public DatabusSyncClientAutoConfiguration() {
+ log.info("[Databus] 数据同步客户端模块已加载");
+ }
+
+}
diff --git a/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/config/DatabusSyncClientProperties.java b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/config/DatabusSyncClientProperties.java
new file mode 100644
index 00000000..f79478ef
--- /dev/null
+++ b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/config/DatabusSyncClientProperties.java
@@ -0,0 +1,113 @@
+package com.zt.plat.framework.databus.client.config;
+
+import lombok.Data;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+
+/**
+ * Databus 数据同步客户端配置属性
+ *
+ * @author ZT
+ */
+@Data
+@ConfigurationProperties(prefix = "zt.databus.sync.client")
+public class DatabusSyncClientProperties {
+
+ /**
+ * 是否启用
+ */
+ private Boolean enabled = true;
+ /**
+ * RocketMQ NameServer 地址
+ */
+ private String nameServer;
+
+ /**
+ * 客户端编码(必填,用于订阅专属Topic)
+ * Topic格式: databus-sync-{eventType}-{clientCode}
+ */
+ private String clientCode;
+
+ /**
+ * MQ配置
+ */
+ private Mq mq = new Mq();
+
+ /**
+ * HTTP配置
+ */
+ private Http http = new Http();
+
+ /**
+ * 幂等配置
+ */
+ private Idempotent idempotent = new Idempotent();
+
+ @Data
+ public static class Mq {
+ /**
+ * 是否启用MQ消费
+ */
+ private Boolean enabled = true;
+ /**
+ * RocketMQ NameServer 地址
+ */
+ private String nameServer;
+
+ /**
+ * Topic基础名称
+ */
+ private String topicBase = "databus-sync";
+
+ /**
+ * 消费者组前缀,完整格式: {consumerGroupPrefix}-{eventType}
+ * 默认: databus-client-{clientCode}
+ */
+ private String consumerGroupPrefix;
+
+ /**
+ * 消费线程数
+ */
+ private Integer consumeThreadMin = 1;
+
+ /**
+ * 消费线程数
+ */
+ private Integer consumeThreadMax = 5;
+
+ /**
+ * 最大重试次数
+ */
+ private Integer maxReconsumeTimes = 3;
+ }
+
+ @Data
+ public static class Http {
+ /**
+ * 是否启用HTTP推送接收
+ */
+ private Boolean enabled = false;
+
+ /**
+ * 接收端点路径
+ */
+ private String endpoint = "/databus/sync/receive";
+ }
+
+ @Data
+ public static class Idempotent {
+ /**
+ * 是否启用幂等检查
+ */
+ private Boolean enabled = true;
+ /**
+ * RocketMQ NameServer 地址
+ */
+ private String nameServer;
+
+ /**
+ * 幂等记录过期时间(秒)
+ */
+ private Integer expireSeconds = 86400; // 24小时
+ }
+
+}
diff --git a/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/core/consumer/DatabusClientConsumer.java b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/core/consumer/DatabusClientConsumer.java
new file mode 100644
index 00000000..4a7c5cdb
--- /dev/null
+++ b/zt-framework/zt-spring-boot-starter-databus-client/src/main/java/com/zt/plat/framework/databus/client/core/consumer/DatabusClientConsumer.java
@@ -0,0 +1,228 @@
+package com.zt.plat.framework.databus.client.core.consumer;
+
+import com.alibaba.fastjson2.JSON;
+import com.zt.plat.framework.databus.client.core.registry.HandlerRegistry;
+import com.zt.plat.framework.databus.client.handler.BatchSyncEventHandler;
+import com.zt.plat.framework.databus.client.handler.SyncEventHandler;
+import com.zt.plat.module.databus.api.message.DatabusBatchMessage;
+import com.zt.plat.module.databus.api.message.DatabusMessage;
+import com.zt.plat.module.databus.enums.DatabusEventType;
+import jakarta.annotation.Resource;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
+import org.apache.rocketmq.spring.core.RocketMQListener;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.stereotype.Component;
+
+/**
+ * DataBus 客户端统一消费者
+ *
+ * 架构设计:
+ * 1. 监听客户端专属 Topic: databus-sync-{clientCode}
+ * 2. 消息体包含 eventType 字段,用于路由到具体 Handler
+ * 3. 根据 eventType 从 HandlerRegistry 获取对应 Handler
+ * 4. 区分增量消息和批量消息,调用不同的 Handler
+ *
+ * Topic 简化前后对比:
+ * - 旧格式:databus-sync-system-dept-create-branch-001(每个事件一个 Topic)
+ * - 新格式:databus-sync-branch-001(所有事件共用一个 Topic)
+ *
+ * @author ZT
+ */
+@Slf4j
+@Component
+@ConditionalOnProperty(prefix = "zt.databus.sync.client", name = "enabled", havingValue = "true")
+@RocketMQMessageListener(
+ topic = "${zt.databus.sync.client.mq.topic:databus-sync}-${zt.databus.sync.client.client-code}",
+ consumerGroup = "${zt.databus.sync.client.mq.consumer-group:databus-client-consumer}-${zt.databus.sync.client.client-code}"
+)
+public class DatabusClientConsumer implements RocketMQListener {
+
+ @Resource
+ private HandlerRegistry handlerRegistry;
+
+ @Override
+ public void onMessage(String body) {
+ log.debug("[DatabusClient] 收到消息, body={}", body);
+
+ try {
+ // 1. 解析消息获取 eventType
+ DatabusEventType eventType = parseEventType(body);
+ if (eventType == null) {
+ log.error("[DatabusClient] 无法解析 eventType, body={}", body);
+ return;
+ }
+
+ log.info("[DatabusClient] 收到消息, eventType={}", eventType);
+
+ // 2. 根据 eventType 判断消息类型并分发
+ if (isBatchMessage(eventType)) {
+ // 批量消息(全量同步)
+ handleBatchMessage(body, eventType);
+ } else {
+ // 增量消息
+ handleIncrementalMessage(body, eventType);
+ }
+
+ } catch (Exception e) {
+ log.error("[DatabusClient] 消息处理失败, body={}", body, e);
+ throw e; // 抛出异常触发重试
+ }
+ }
+
+ /**
+ * 处理批量消息(全量同步)
+ */
+ @SuppressWarnings("unchecked")
+ private void handleBatchMessage(String body, DatabusEventType eventType) {
+ // 1. 获取 BatchHandler
+ BatchSyncEventHandler handler = handlerRegistry.getBatchHandler(eventType);
+ if (handler == null) {
+ log.warn("[DatabusClient] 未找到全量Handler, eventType={}", eventType);
+ return;
+ }
+
+ // 2. 获取数据类型
+ Class> dataType = handler.getDataType();
+
+ // 3. 解析批量消息(兼容服务端 BatchSyncMessage 格式)
+ var json = JSON.parseObject(body);
+
+ // 兼容处理:服务端使用 fullTaskId,转换为 taskId
+ String taskId = json.getString("taskId");
+ if (taskId == null) {
+ taskId = String.valueOf(json.getLong("fullTaskId"));
+ }
+
+ DatabusBatchMessage