fix(databus): 修复客户端消息处理和防止消息循环

1. 修复消息格式不匹配问题
   - 增量消息:兼容 SyncMessage 格式,从 dataSnapshot 字段反序列化数据
   - 批量消息:添加 getDataType() 方法获取泛型类型,正确转换 JSONObject

2. 防止消息循环
   - 添加 zt.databus.change.producer.enabled 配置项
   - 客户端禁用变更消息发送,避免 客户端写入 → 发送变更 → 循环

3. 修复 Feign 客户端注入
   - 在 RpcConfiguration 中添加 DeptApi、PostApi
   - 确保客户端能通过 Feign 调用本地 system-server API

相关文件:
- DatabusClientConsumer.java: 修复消息解析逻辑
- BatchSyncEventHandler.java: 添加 getDataType() 方法
- DatabusChangeProducer.java: 添加 enabled 开关
- RpcConfiguration.java: 启用 DeptApi/PostApi Feign 客户端

Ref: 修复 ClassCastException 和消息循环问题
This commit is contained in:
hewencai
2025-12-03 11:10:57 +08:00
parent dfca38feb7
commit 62494ced45
37 changed files with 659 additions and 41 deletions

View File

@@ -1,6 +1,9 @@
package com.zt.plat.module.databus.framework.rpc.config;
import com.zt.plat.framework.common.biz.system.oauth2.OAuth2TokenCommonApi;
import com.zt.plat.module.databus.api.provider.DatabusDeptProviderApi;
import com.zt.plat.module.databus.api.provider.DatabusPostProviderApi;
import com.zt.plat.module.databus.api.provider.DatabusUserProviderApi;
import com.zt.plat.module.system.api.user.AdminUserApi;
import org.springframework.cloud.openfeign.EnableFeignClients;
import org.springframework.context.annotation.Configuration;
@@ -9,6 +12,13 @@ import org.springframework.context.annotation.Configuration;
* Databus 模块的 RPC 配置,开启所需的 Feign 客户端。
*/
@Configuration(value = "databusRpcConfiguration", proxyBeanMethods = false)
@EnableFeignClients(clients = {AdminUserApi.class, OAuth2TokenCommonApi.class})
@EnableFeignClients(clients = {
AdminUserApi.class,
OAuth2TokenCommonApi.class,
// DataBus 数据提供者 Feign 客户端
DatabusDeptProviderApi.class,
DatabusUserProviderApi.class,
DatabusPostProviderApi.class
})
public class RpcConfiguration {
}

View File

@@ -1,90 +0,0 @@
package com.zt.plat.module.databus.mq.consumer;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.zt.plat.framework.databus.server.core.event.DatabusEvent;
import com.zt.plat.framework.databus.server.core.sync.DatabusIncrementalSyncService;
import com.zt.plat.module.system.api.mq.DatabusDeptChangeMessage;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
/**
* Databus 部门变更消息消费者
* <p>
* 消费来自 system-server 的部门变更消息,通过增量同步服务进行:
* 1. 三态判断(事件/客户端/订阅是否启用)
* 2. 记录到 event_record 流水表
* 3. 推送到客户端专属 Topicdatabus-sync-{clientCode}
*
* @author ZT
*/
@Slf4j
@Component
@RocketMQMessageListener(
topic = DatabusDeptChangeMessage.TOPIC,
consumerGroup = DatabusDeptChangeMessage.TOPIC + "_CONSUMER"
)
public class DatabusDeptChangeConsumer implements RocketMQListener<DatabusDeptChangeMessage> {
private static final String SOURCE_SERVICE = "system-server";
@Resource
private DatabusIncrementalSyncService databusIncrementalSyncService;
@Resource
private ObjectMapper objectMapper;
@Override
public void onMessage(DatabusDeptChangeMessage message) {
log.info("[Databus] 收到部门变更消息, action={}, deptId={}", message.getAction(), message.getDeptId());
try {
// 构建完整的业务数据快照
Map<String, Object> dataMap = new HashMap<>();
dataMap.put("id", message.getDeptId());
dataMap.put("code", message.getDeptCode());
dataMap.put("name", message.getDeptName());
dataMap.put("shortName", message.getShortName());
dataMap.put("parentId", message.getParentId());
dataMap.put("sort", message.getSort());
dataMap.put("leaderUserId", message.getLeaderUserId());
dataMap.put("phone", message.getPhone());
dataMap.put("email", message.getStatus());
dataMap.put("isCompany", message.getIsCompany());
dataMap.put("isGroup", message.getIsGroup());
dataMap.put("deptSource", message.getDeptSource());
dataMap.put("tenantId", message.getTenantId());
dataMap.put("eventTime", message.getEventTime());
// 构建完整的事件类型: system-dept-{action}
String eventType = String.format("system-dept-%s", message.getAction().toLowerCase());
// 构建 Databus 事件
DatabusEvent databusEvent = DatabusEvent.builder()
.eventType(eventType)
.eventAction(message.getAction())
.dataSnapshot(objectMapper.writeValueAsString(dataMap))
.dataVersion(1)
.sourceService(SOURCE_SERVICE)
.sourceTopic(DatabusDeptChangeMessage.TOPIC)
.tenantId(message.getTenantId())
.eventTime(message.getEventTime())
.build();
// 调用增量同步服务处理(三态判断 + 记录流水 + 推送客户端Topic
databusIncrementalSyncService.processEvent(databusEvent);
log.info("[Databus] 部门变更事件处理完成, eventType={}, deptId={}",
eventType, message.getDeptId());
} catch (Exception e) {
log.error("[Databus] 处理部门变更消息失败, action={}, deptId={}",
message.getAction(), message.getDeptId(), e);
throw new RuntimeException("处理部门变更消息失败", e);
}
}
}

View File

@@ -1,73 +0,0 @@
package com.zt.plat.module.databus.mq.consumer;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.zt.plat.framework.databus.server.core.event.DatabusEvent;
import com.zt.plat.framework.databus.server.core.sync.DatabusIncrementalSyncService;
import com.zt.plat.module.system.api.mq.DatabusPostChangeMessage;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
/**
* Databus 岗位变更消息消费者
*/
@Slf4j
@Component
@RocketMQMessageListener(
topic = DatabusPostChangeMessage.TOPIC,
consumerGroup = DatabusPostChangeMessage.TOPIC + "_CONSUMER"
)
public class DatabusPostChangeConsumer implements RocketMQListener<DatabusPostChangeMessage> {
private static final String SOURCE_SERVICE = "system-server";
@Resource
private DatabusIncrementalSyncService databusIncrementalSyncService;
@Resource
private ObjectMapper objectMapper;
@Override
public void onMessage(DatabusPostChangeMessage message) {
log.info("[Databus] 收到岗位变更消息, action={}, postId={}", message.getAction(), message.getPostId());
try {
Map<String, Object> dataMap = new HashMap<>();
dataMap.put("id", message.getPostId());
dataMap.put("code", message.getPostCode());
dataMap.put("name", message.getPostName());
dataMap.put("sort", message.getSort());
dataMap.put("status", message.getStatus());
dataMap.put("remark", message.getRemark());
dataMap.put("tenantId", message.getTenantId());
dataMap.put("eventTime", message.getEventTime());
String eventType = String.format("system-post-%s", message.getAction().toLowerCase());
DatabusEvent databusEvent = DatabusEvent.builder()
.eventType(eventType)
.eventAction(message.getAction())
.dataSnapshot(objectMapper.writeValueAsString(dataMap))
.dataVersion(1)
.sourceService(SOURCE_SERVICE)
.sourceTopic(DatabusPostChangeMessage.TOPIC)
.tenantId(message.getTenantId())
.eventTime(message.getEventTime())
.build();
databusIncrementalSyncService.processEvent(databusEvent);
log.info("[Databus] 岗位变更事件处理完成, eventType={}, postId={}",
eventType, message.getPostId());
} catch (Exception e) {
log.error("[Databus] 处理岗位变更消息失败, action={}, postId={}",
message.getAction(), message.getPostId(), e);
throw new RuntimeException("处理岗位变更消息失败", e);
}
}
}

View File

@@ -1,79 +0,0 @@
package com.zt.plat.module.databus.mq.consumer;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.zt.plat.framework.databus.server.core.event.DatabusEvent;
import com.zt.plat.framework.databus.server.core.sync.DatabusIncrementalSyncService;
import com.zt.plat.module.system.api.mq.DatabusUserChangeMessage;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
/**
* Databus 用户变更消息消费者
*/
@Slf4j
@Component
@RocketMQMessageListener(
topic = DatabusUserChangeMessage.TOPIC,
consumerGroup = DatabusUserChangeMessage.TOPIC + "_CONSUMER"
)
public class DatabusUserChangeConsumer implements RocketMQListener<DatabusUserChangeMessage> {
private static final String SOURCE_SERVICE = "system-server";
@Resource
private DatabusIncrementalSyncService databusIncrementalSyncService;
@Resource
private ObjectMapper objectMapper;
@Override
public void onMessage(DatabusUserChangeMessage message) {
log.info("[Databus] 收到用户变更消息, action={}, userId={}", message.getAction(), message.getUserId());
try {
Map<String, Object> dataMap = new HashMap<>();
dataMap.put("id", message.getUserId());
dataMap.put("username", message.getUsername());
dataMap.put("nickname", message.getNickname());
dataMap.put("remark", message.getRemark());
dataMap.put("deptIds", message.getDeptIds());
dataMap.put("postIds", message.getPostIds());
dataMap.put("email", message.getEmail());
dataMap.put("mobile", message.getMobile());
dataMap.put("sex", message.getSex());
dataMap.put("avatar", message.getAvatar());
dataMap.put("status", message.getStatus());
dataMap.put("userSource", message.getUserSource());
dataMap.put("tenantId", message.getTenantId());
dataMap.put("eventTime", message.getEventTime());
String eventType = String.format("system-user-%s", message.getAction().toLowerCase());
DatabusEvent databusEvent = DatabusEvent.builder()
.eventType(eventType)
.eventAction(message.getAction())
.dataSnapshot(objectMapper.writeValueAsString(dataMap))
.dataVersion(1)
.sourceService(SOURCE_SERVICE)
.sourceTopic(DatabusUserChangeMessage.TOPIC)
.tenantId(message.getTenantId())
.eventTime(message.getEventTime())
.build();
databusIncrementalSyncService.processEvent(databusEvent);
log.info("[Databus] 用户变更事件处理完成, eventType={}, userId={}",
eventType, message.getUserId());
} catch (Exception e) {
log.error("[Databus] 处理用户变更消息失败, action={}, userId={}",
message.getAction(), message.getUserId(), e);
throw new RuntimeException("处理用户变更消息失败", e);
}
}
}

View File

@@ -1,85 +0,0 @@
package com.zt.plat.module.databus.provider;
import com.zt.plat.framework.common.pojo.CommonResult;
import com.zt.plat.framework.databus.server.core.provider.DataProvider;
import com.zt.plat.framework.databus.server.core.provider.DataProviderRegistry;
import com.zt.plat.module.databus.api.dto.CursorPageReqDTO;
import com.zt.plat.module.databus.api.dto.CursorPageResult;
import com.zt.plat.module.databus.api.data.DatabusDeptData;
import com.zt.plat.module.databus.api.provider.DatabusDeptProviderApi;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
/**
* 部门数据提供者
* <p>
* 通过 Feign 调用 system-server 获取部门数据
*
* @author ZT
*/
@Slf4j
@Component
public class DeptDataFeignProvider implements DataProvider<DatabusDeptData> {
public static final String PROVIDER_TYPE = "DEPT";
@Resource
private DatabusDeptProviderApi deptProviderApi;
@Resource
private DataProviderRegistry dataProviderRegistry;
@PostConstruct
public void init() {
dataProviderRegistry.register(this);
}
@Override
public String getProviderType() {
return PROVIDER_TYPE;
}
@Override
public CursorPageData<DatabusDeptData> getPageByCursor(LocalDateTime cursorTime, Long cursorId,
int batchSize, Long tenantId) {
CursorPageReqDTO reqDTO = CursorPageReqDTO.builder()
.cursorTime(cursorTime)
.cursorId(cursorId)
.batchSize(batchSize)
.tenantId(tenantId)
.build();
CommonResult<CursorPageResult<DatabusDeptData>> result = deptProviderApi.getPageByCursor(reqDTO);
if (!result.isSuccess()) {
throw new RuntimeException("获取部门数据失败: " + result.getMsg());
}
CursorPageResult<DatabusDeptData> pageResult = result.getData();
return CursorPageData.of(
pageResult.getList(),
pageResult.getNextCursorTime(),
pageResult.getNextCursorId(),
pageResult.getCount(),
Boolean.TRUE.equals(pageResult.getHasMore()),
(pageResult.getTotal() != null ? pageResult.getTotal() : 0L)
);
}
@Override
public long count(Long tenantId) {
CommonResult<Long> result = deptProviderApi.count(tenantId);
if (!result.isSuccess()) {
throw new RuntimeException("获取部门总数失败: " + result.getMsg());
}
return result.getData();
}
@Override
public Long extractUid(DatabusDeptData data) {
return data.getId();
}
}

View File

@@ -1,81 +0,0 @@
package com.zt.plat.module.databus.provider;
import com.zt.plat.framework.common.pojo.CommonResult;
import com.zt.plat.framework.databus.server.core.provider.DataProvider;
import com.zt.plat.framework.databus.server.core.provider.DataProviderRegistry;
import com.zt.plat.module.databus.api.dto.CursorPageReqDTO;
import com.zt.plat.module.databus.api.dto.CursorPageResult;
import com.zt.plat.module.databus.api.data.DatabusPostData;
import com.zt.plat.module.databus.api.provider.DatabusPostProviderApi;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
/**
* 岗位数据提供者
*/
@Slf4j
@Component
public class PostDataFeignProvider implements DataProvider<DatabusPostData> {
public static final String PROVIDER_TYPE = "POST";
@Resource
private DatabusPostProviderApi postProviderApi;
@Resource
private DataProviderRegistry dataProviderRegistry;
@PostConstruct
public void init() {
dataProviderRegistry.register(this);
}
@Override
public String getProviderType() {
return PROVIDER_TYPE;
}
@Override
public CursorPageData<DatabusPostData> getPageByCursor(LocalDateTime cursorTime, Long cursorId,
int batchSize, Long tenantId) {
CursorPageReqDTO reqDTO = CursorPageReqDTO.builder()
.cursorTime(cursorTime)
.cursorId(cursorId)
.batchSize(batchSize)
.tenantId(tenantId)
.build();
CommonResult<CursorPageResult<DatabusPostData>> result = postProviderApi.getPageByCursor(reqDTO);
if (!result.isSuccess()) {
throw new RuntimeException("获取岗位数据失败: " + result.getMsg());
}
CursorPageResult<DatabusPostData> pageResult = result.getData();
return CursorPageData.of(
pageResult.getList(),
pageResult.getNextCursorTime(),
pageResult.getNextCursorId(),
pageResult.getCount(),
Boolean.TRUE.equals(pageResult.getHasMore()),
(pageResult.getTotal() != null ? pageResult.getTotal() : 0L)
);
}
@Override
public long count(Long tenantId) {
CommonResult<Long> result = postProviderApi.count(tenantId);
if (!result.isSuccess()) {
throw new RuntimeException("获取岗位总数失败: " + result.getMsg());
}
return result.getData();
}
@Override
public Long extractUid(DatabusPostData data) {
return data.getId();
}
}

View File

@@ -1,81 +0,0 @@
package com.zt.plat.module.databus.provider;
import com.zt.plat.framework.common.pojo.CommonResult;
import com.zt.plat.framework.databus.server.core.provider.DataProvider;
import com.zt.plat.framework.databus.server.core.provider.DataProviderRegistry;
import com.zt.plat.module.databus.api.dto.CursorPageReqDTO;
import com.zt.plat.module.databus.api.dto.CursorPageResult;
import com.zt.plat.module.databus.api.data.DatabusAdminUserData;
import com.zt.plat.module.databus.api.provider.DatabusUserProviderApi;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
/**
* 用户数据提供者
*/
@Slf4j
@Component
public class UserDataFeignProvider implements DataProvider<DatabusAdminUserData> {
public static final String PROVIDER_TYPE = "USER";
@Resource
private DatabusUserProviderApi userProviderApi;
@Resource
private DataProviderRegistry dataProviderRegistry;
@PostConstruct
public void init() {
dataProviderRegistry.register(this);
}
@Override
public String getProviderType() {
return PROVIDER_TYPE;
}
@Override
public CursorPageData<DatabusAdminUserData> getPageByCursor(LocalDateTime cursorTime, Long cursorId,
int batchSize, Long tenantId) {
CursorPageReqDTO reqDTO = CursorPageReqDTO.builder()
.cursorTime(cursorTime)
.cursorId(cursorId)
.batchSize(batchSize)
.tenantId(tenantId)
.build();
CommonResult<CursorPageResult<DatabusAdminUserData>> result = userProviderApi.getPageByCursor(reqDTO);
if (!result.isSuccess()) {
throw new RuntimeException("获取用户数据失败: " + result.getMsg());
}
CursorPageResult<DatabusAdminUserData> pageResult = result.getData();
return CursorPageData.of(
pageResult.getList(),
pageResult.getNextCursorTime(),
pageResult.getNextCursorId(),
pageResult.getCount(),
Boolean.TRUE.equals(pageResult.getHasMore()),
(pageResult.getTotal() != null ? pageResult.getTotal() : 0L)
);
}
@Override
public long count(Long tenantId) {
CommonResult<Long> result = userProviderApi.count(tenantId);
if (!result.isSuccess()) {
throw new RuntimeException("获取用户总数失败: " + result.getMsg());
}
return result.getData();
}
@Override
public Long extractUid(DatabusAdminUserData data) {
return data.getId();
}
}

View File

@@ -75,9 +75,16 @@ management:
# 日志文件配置
logging:
file:
name: ${user.home}/logs/${spring.application.name}.log # 日志文件名,全路径
name: D:/project/zhongtong/logs/${spring.application.name}.log # 日志文件名,全路径
# RocketMQ 配置项
rocketmq:
name-server: 172.16.240.64:9876 # RocketMQ Namesrv
producer:
group: databus-server-producer-group # 生产者组名
send-message-timeout: 10000 # 发送消息超时时间,单位:毫秒
justauth:
enabled: true
type:
@@ -105,3 +112,23 @@ justauth:
prefix: 'social_auth_state:' # 缓存前缀,目前只对 Redis 缓存生效,默认 JUSTAUTH::STATE::
timeout: 24h # 超时时长,目前只对 Redis 缓存生效,默认 3 分钟
zt:
databus:
sync:
server:
enabled: true
clients:
- company-b # 配置订阅的客户端与客户端的client-code一致
mq:
enabled: true
name-server: 172.16.240.64:9876 # RocketMQ NameServer 地址
topic-base: databus-sync
producer-group: databus-server-producer
send-msg-timeout: 10000
retry:
max-attempts: 5 # 最大重试次数
initial-delay: 1 # 初始重试延迟(秒)
multiplier: 2 # 重试延迟倍数
batch:
default-size: 500 # 默认批量大小
interval: 5 # 批量推送间隔(秒)

View File

@@ -86,6 +86,13 @@ mybatis-plus:
log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
# RocketMQ 配置项
rocketmq:
name-server: 172.16.46.63:9876 # RocketMQ Namesrv
producer:
group: databus-server-producer-group # 生产者组名
send-message-timeout: 10000 # 发送消息超时时间,单位:毫秒
# ZT配置项设置当前项目所有自定义的配置
zt:
env: # 多环境的配置项

View File

@@ -50,7 +50,7 @@ spring:
time-to-live: 1h # 设置过期时间为 1 小时
server:
port: 48100
port: 48105
logging:
file:
@@ -130,6 +130,11 @@ zt:
- /databus/api/portal/**
ignore-tables:
- databus_api_client_credential
# DataBus 数据同步服务端配置
databus:
sync:
server:
enabled: true # 启用 DataBus 同步服务端
databus:
gateway: