feat(databus): 完成阶段四-DataBus Server完整功能

- 补充缺失的 API 类(DatabusMessage、DatabusBatchMessage、DatabusEventType)
- 新增变更消息消费者(3个:部门、用户、岗位)
- 新增数据提供者(3个:部门、用户、岗位)
- 确认分发器服务(核心定向推送逻辑)
- 确认全量同步与消息推送组件
- 确认管理后台 API(5个 Controller)
- 确认 Service ��(4个核心服务)
- 确认 DAL 层(7个 DO + Mapper)
- 添加 databus-server starter 依赖到 pom.xml
- 编译验证通过

Ref: docs/databus/implementation-checklist.md 任务 39-70
This commit is contained in:
hewencai
2025-12-01 23:44:32 +08:00
parent f5ba493f95
commit db13036cea
109 changed files with 7673 additions and 0 deletions

View File

@@ -0,0 +1,106 @@
package com.zt.plat.module.databus.api.message;
import com.zt.plat.module.databus.enums.DatabusEventType;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
import java.time.LocalDateTime;
import java.util.List;
/**
* Databus 全量同步批量消息
* <p>
* 用于全量同步场景,支持分批传输
*
* @author ZT
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class DatabusBatchMessage<T> implements Serializable {
/**
* 消息ID用于幂等
*/
private String messageId;
/**
* 全量同步任务ID
*/
private String taskId;
/**
* 事件类型
*/
private DatabusEventType eventType;
/**
* 当前批次号从1开始
*/
private Integer batchNo;
/**
* 总批次数
*/
private Integer totalBatch;
/**
* 当前批次数据条数
*/
private Integer count;
/**
* 总数据条数
*/
private Integer totalCount;
/**
* 是否最后一批
*/
private Boolean isLastBatch;
/**
* 数据列<E68DAE><E58897>
*/
private List<T> dataList;
/**
* 消息产生时间
*/
private LocalDateTime timestamp;
/**
* 来源系统
*/
private String source;
/**
* 租户ID
*/
private Long tenantId;
/**
* 创建批量消息
*/
public static <T> DatabusBatchMessage<T> of(DatabusEventType eventType, String taskId,
int batchNo, int totalBatch,
List<T> dataList, int totalCount) {
DatabusBatchMessage<T> msg = new DatabusBatchMessage<>();
msg.setMessageId(java.util.UUID.randomUUID().toString());
msg.setTaskId(taskId);
msg.setEventType(eventType);
msg.setBatchNo(batchNo);
msg.setTotalBatch(totalBatch);
msg.setCount(dataList != null ? dataList.size() : 0);
msg.setTotalCount(totalCount);
msg.setIsLastBatch(batchNo >= totalBatch);
msg.setDataList(dataList);
msg.setTimestamp(LocalDateTime.now());
return msg;
}
}

View File

@@ -0,0 +1,73 @@
package com.zt.plat.module.databus.api.message;
import com.zt.plat.module.databus.enums.DatabusEventType;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
import java.time.LocalDateTime;
/**
* Databus 增量同步消息
* <p>
* 业务推送、服务端消费、服务端转发、客户端消费统一使用此消息体
*
* @author ZT
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class DatabusMessage<T> implements Serializable {
/**
* 消息ID用于幂等
*/
private String messageId;
/**
* 事件类型
*/
private DatabusEventType eventType;
/**
* 业务数据ID
*/
private Long dataId;
/**
* 业务数据(强类型)
*/
private T data;
/**
* 消息产生时间
*/
private LocalDateTime timestamp;
/**
* 来源系统
*/
private String source;
/**
* 租户ID
*/
private Long tenantId;
/**
* 创建简单消息
*/
public static <T> DatabusMessage<T> of(DatabusEventType eventType, Long dataId, T data) {
DatabusMessage<T> msg = new DatabusMessage<>();
msg.setMessageId(java.util.UUID.randomUUID().toString());
msg.setEventType(eventType);
msg.setDataId(dataId);
msg.setData(data);
msg.setTimestamp(LocalDateTime.now());
return msg;
}
}

View File

@@ -0,0 +1,306 @@
package com.zt.plat.module.databus.enums;
import lombok.AllArgsConstructor;
import lombok.Getter;
/**
* Databus 事件类型枚举
* <p>
* 三级结构: 模块_数据类型_操作
* <p>
* Topic 命名规则:
* - 业务推送到服务端: {topicBase}-{module}-{entity}-{action}
* - 服务端转发到客户端: {topicBase}-{module}-{entity}-{action}-{clientCode}
*
* @author ZT
*/
@Getter
@AllArgsConstructor
public enum DatabusEventType {
// ==================== SYSTEM 系统模块 ====================
/**
* 用户-创建
*/
SYSTEM_USER_CREATE("system", "user", "create", "用户创建"),
/**
* 用户-更新
*/
SYSTEM_USER_UPDATE("system", "user", "update", "用户更新"),
/**
* 用户-删除
*/
SYSTEM_USER_DELETE("system", "user", "delete", "用户删除"),
/**
* 用户-全量同步
*/
SYSTEM_USER_FULL("system", "user", "full", "用户全量同步"),
/**
* 部门-创建
*/
SYSTEM_DEPT_CREATE("system", "dept", "create", "部门创建"),
/**
* 部门-更新
*/
SYSTEM_DEPT_UPDATE("system", "dept", "update", "部门更新"),
/**
* 部门-删除
*/
SYSTEM_DEPT_DELETE("system", "dept", "delete", "部门删除"),
/**
* 部门-全量同步
*/
SYSTEM_DEPT_FULL("system", "dept", "full", "部门全量同步"),
/**
* 组织机构-创建(兼容老代码,保留但不推荐使用)
*/
@Deprecated
SYSTEM_ORG_CREATE("system", "org", "create", "组织机构创建"),
/**
* 组织机构-更新(兼容老代码,保留但不推荐使用)
*/
@Deprecated
SYSTEM_ORG_UPDATE("system", "org", "update", "组织机构更新"),
/**
* 组织机构-删除(兼容老代码,保留但不推荐使用)
*/
@Deprecated
SYSTEM_ORG_DELETE("system", "org", "delete", "组织机构删除"),
/**
* 组织机构-全量同步(兼容老代码,保留但不推荐使用)
*/
@Deprecated
SYSTEM_ORG_FULL("system", "org", "full", "组织机构全量同步"),
/**
* 岗位-创建
*/
SYSTEM_POST_CREATE("system", "post", "create", "岗位创建"),
/**
* 岗位-更新
*/
SYSTEM_POST_UPDATE("system", "post", "update", "岗位更新"),
/**
* 岗位-删除
*/
SYSTEM_POST_DELETE("system", "post", "delete", "岗位删除"),
/**
* 岗位-全量同步
*/
SYSTEM_POST_FULL("system", "post", "full", "岗位全量同步"),
/**
* 角色-创建
*/
SYSTEM_ROLE_CREATE("system", "role", "create", "角色创建"),
/**
* 角色-更新
*/
SYSTEM_ROLE_UPDATE("system", "role", "update", "角色更新"),
/**
* 角色-删除
*/
SYSTEM_ROLE_DELETE("system", "role", "delete", "角色删除"),
/**
* 角色-全量同步
*/
SYSTEM_ROLE_FULL("system", "role", "full", "角色全量同步"),
/**
* 字典-创建
*/
SYSTEM_DICT_CREATE("system", "dict", "create", "字典创建"),
/**
* 字典-更新
*/
SYSTEM_DICT_UPDATE("system", "dict", "update", "字典更新"),
/**
* 字典-删除
*/
SYSTEM_DICT_DELETE("system", "dict", "delete", "字典删除"),
/**
* 字典-全量同步
*/
SYSTEM_DICT_FULL("system", "dict", "full", "字典全量同步"),
// ==================== BASE 基础模块 ====================
/**
* 物料-创建
*/
BASE_MATERIAL_CREATE("base", "material", "create", "物料创建"),
/**
* 物料-更新
*/
BASE_MATERIAL_UPDATE("base", "material", "update", "物料更新"),
/**
* 物料-删除
*/
BASE_MATERIAL_DELETE("base", "material", "delete", "物料删除"),
/**
* 物料-全量同步
*/
BASE_MATERIAL_FULL("base", "material", "full", "物料全量同步"),
/**
* 供应<E4BE9B><E5BA94>-创建
*/
BASE_SUPPLIER_CREATE("base", "supplier", "create", "供应商创建"),
/**
* 供应商-更新
*/
BASE_SUPPLIER_UPDATE("base", "supplier", "update", "供应商更新"),
/**
* 供应商-删除
*/
BASE_SUPPLIER_DELETE("base", "supplier", "delete", "供应商删除"),
/**
* 供应商-全量同步
*/
BASE_SUPPLIER_FULL("base", "supplier", "full", "供应商全量同步"),
/**
* 客户-创建
*/
BASE_CUSTOMER_CREATE("base", "customer", "create", "客户创建"),
/**
* 客户-更新
*/
BASE_CUSTOMER_UPDATE("base", "customer", "update", "客户更新"),
/**
* 客户-删除
*/
BASE_CUSTOMER_DELETE("base", "customer", "delete", "客户删除"),
/**
* 客户-全量同步
*/
BASE_CUSTOMER_FULL("base", "customer", "full", "客户全量同步"),
;
/**
* 模块编码
*/
private final String module;
/**
* 实体编码
*/
private final String entity;
/**
* 操作编码
*/
private final String action;
/**
* 事件名称
*/
private final String name;
/**
* 获取Topic后缀不含topicBase和clientCode
* 格式: {module}-{entity}-{action}
*/
public String getTopicSuffix() {
return String.format("%s-%s-%s", module, entity, action);
}
/**
* 获取完整Topic名称服务端转发用
* 格式: {topicBase}-{module}-{entity}-{action}-{clientCode}
*/
public String getTopic(String topicBase, String clientCode) {
return String.format("%s-%s-%s-%s-%s", topicBase, module, entity, action, clientCode);
}
/**
* 获取完整Topic名称业务推送用不带clientCode
* 格式: {topicBase}-{module}-{entity}-{action}
*/
public String getTopic(String topicBase) {
return String.format("%s-%s-%s-%s", topicBase, module, entity, action);
}
/**
* 根据Topic后缀获取枚举
*
* @param topicSuffix Topic后缀格式: module-entity-action
* @return 枚举值未找到返回null
*/
public static DatabusEventType getByTopicSuffix(String topicSuffix) {
if (topicSuffix == null) {
return null;
}
for (DatabusEventType type : values()) {
if (type.getTopicSuffix().equalsIgnoreCase(topicSuffix)) {
return type;
}
}
return null;
}
/**
* 根据模块、实体、操作获取枚举
*
* @param module 模块编码
* @param entity 实体编码
* @param action 操作编码
* @return 枚举值未找到返回null
*/
public static DatabusEventType getByModuleEntityAction(String module, String entity, String action) {
for (DatabusEventType type : values()) {
if (type.getModule().equalsIgnoreCase(module)
&& type.getEntity().equalsIgnoreCase(entity)
&& type.getAction().equalsIgnoreCase(action)) {
return type;
}
}
return null;
}
/**
* 判断是否为全量同步事件
*/
public boolean isFullSync() {
return "full".equalsIgnoreCase(this.action);
}
/**
* 判断是否为增量同步事件
*/
public boolean isIncrementalSync() {
return !isFullSync();
}
}

View File

@@ -42,6 +42,13 @@
<version>${revision}</version>
</dependency>
<!-- DataBus Server Starter -->
<dependency>
<groupId>com.zt.plat</groupId>
<artifactId>zt-spring-boot-starter-databus-server</artifactId>
<version>${revision}</version>
</dependency>
<!-- 业务组件 -->
<dependency>
<groupId>com.zt.plat</groupId>

View File

@@ -0,0 +1,90 @@
package com.zt.plat.module.databus.mq.consumer;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.zt.plat.framework.databus.server.core.event.DatabusEvent;
import com.zt.plat.framework.databus.server.core.sync.DatabusIncrementalSyncService;
import com.zt.plat.module.system.api.mq.DatabusDeptChangeMessage;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
/**
* Databus 部门变更消息消费者
* <p>
* 消费来自 system-server 的部门变更消息,通过增量同步服务进行:
* 1. 三态判断(事件/客户端/订阅是否启用)
* 2. 记录到 event_record 流水表
* 3. 推送到客户端专属 Topicdatabus-sync-{clientCode}
*
* @author ZT
*/
@Slf4j
@Component
@RocketMQMessageListener(
topic = DatabusDeptChangeMessage.TOPIC,
consumerGroup = DatabusDeptChangeMessage.TOPIC + "_CONSUMER"
)
public class DatabusDeptChangeConsumer implements RocketMQListener<DatabusDeptChangeMessage> {
private static final String SOURCE_SERVICE = "system-server";
@Resource
private DatabusIncrementalSyncService databusIncrementalSyncService;
@Resource
private ObjectMapper objectMapper;
@Override
public void onMessage(DatabusDeptChangeMessage message) {
log.info("[Databus] 收到部门变更消息, action={}, deptId={}", message.getAction(), message.getDeptId());
try {
// 构建完整的业务数据快照
Map<String, Object> dataMap = new HashMap<>();
dataMap.put("id", message.getDeptId());
dataMap.put("code", message.getDeptCode());
dataMap.put("name", message.getDeptName());
dataMap.put("shortName", message.getShortName());
dataMap.put("parentId", message.getParentId());
dataMap.put("sort", message.getSort());
dataMap.put("leaderUserId", message.getLeaderUserId());
dataMap.put("phone", message.getPhone());
dataMap.put("email", message.getStatus());
dataMap.put("isCompany", message.getIsCompany());
dataMap.put("isGroup", message.getIsGroup());
dataMap.put("deptSource", message.getDeptSource());
dataMap.put("tenantId", message.getTenantId());
dataMap.put("eventTime", message.getEventTime());
// 构建完整的事件类型: system-dept-{action}
String eventType = String.format("system-dept-%s", message.getAction().toLowerCase());
// 构建 Databus 事件
DatabusEvent databusEvent = DatabusEvent.builder()
.eventType(eventType)
.eventAction(message.getAction())
.dataSnapshot(objectMapper.writeValueAsString(dataMap))
.dataVersion(1)
.sourceService(SOURCE_SERVICE)
.sourceTopic(DatabusDeptChangeMessage.TOPIC)
.tenantId(message.getTenantId())
.eventTime(message.getEventTime())
.build();
// 调用增量同步服务处理(三态判断 + 记录流水 + 推送客户端Topic
databusIncrementalSyncService.processEvent(databusEvent);
log.info("[Databus] 部门变更事件处理完成, eventType={}, deptId={}",
eventType, message.getDeptId());
} catch (Exception e) {
log.error("[Databus] 处理部门变更消息失败, action={}, deptId={}",
message.getAction(), message.getDeptId(), e);
throw new RuntimeException("处理部门变更消息失败", e);
}
}
}

View File

@@ -0,0 +1,73 @@
package com.zt.plat.module.databus.mq.consumer;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.zt.plat.framework.databus.server.core.event.DatabusEvent;
import com.zt.plat.framework.databus.server.core.sync.DatabusIncrementalSyncService;
import com.zt.plat.module.system.api.mq.DatabusPostChangeMessage;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
/**
* Databus 岗位变更消息消费者
*/
@Slf4j
@Component
@RocketMQMessageListener(
topic = DatabusPostChangeMessage.TOPIC,
consumerGroup = DatabusPostChangeMessage.TOPIC + "_CONSUMER"
)
public class DatabusPostChangeConsumer implements RocketMQListener<DatabusPostChangeMessage> {
private static final String SOURCE_SERVICE = "system-server";
@Resource
private DatabusIncrementalSyncService databusIncrementalSyncService;
@Resource
private ObjectMapper objectMapper;
@Override
public void onMessage(DatabusPostChangeMessage message) {
log.info("[Databus] 收到岗位变更消息, action={}, postId={}", message.getAction(), message.getPostId());
try {
Map<String, Object> dataMap = new HashMap<>();
dataMap.put("id", message.getPostId());
dataMap.put("code", message.getPostCode());
dataMap.put("name", message.getPostName());
dataMap.put("sort", message.getSort());
dataMap.put("status", message.getStatus());
dataMap.put("remark", message.getRemark());
dataMap.put("tenantId", message.getTenantId());
dataMap.put("eventTime", message.getEventTime());
String eventType = String.format("system-post-%s", message.getAction().toLowerCase());
DatabusEvent databusEvent = DatabusEvent.builder()
.eventType(eventType)
.eventAction(message.getAction())
.dataSnapshot(objectMapper.writeValueAsString(dataMap))
.dataVersion(1)
.sourceService(SOURCE_SERVICE)
.sourceTopic(DatabusPostChangeMessage.TOPIC)
.tenantId(message.getTenantId())
.eventTime(message.getEventTime())
.build();
databusIncrementalSyncService.processEvent(databusEvent);
log.info("[Databus] 岗位变更事件处理完成, eventType={}, postId={}",
eventType, message.getPostId());
} catch (Exception e) {
log.error("[Databus] 处理岗位变更消息失败, action={}, postId={}",
message.getAction(), message.getPostId(), e);
throw new RuntimeException("处理岗位变更消息失败", e);
}
}
}

View File

@@ -0,0 +1,79 @@
package com.zt.plat.module.databus.mq.consumer;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.zt.plat.framework.databus.server.core.event.DatabusEvent;
import com.zt.plat.framework.databus.server.core.sync.DatabusIncrementalSyncService;
import com.zt.plat.module.system.api.mq.DatabusUserChangeMessage;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
/**
* Databus 用户变更消息消费者
*/
@Slf4j
@Component
@RocketMQMessageListener(
topic = DatabusUserChangeMessage.TOPIC,
consumerGroup = DatabusUserChangeMessage.TOPIC + "_CONSUMER"
)
public class DatabusUserChangeConsumer implements RocketMQListener<DatabusUserChangeMessage> {
private static final String SOURCE_SERVICE = "system-server";
@Resource
private DatabusIncrementalSyncService databusIncrementalSyncService;
@Resource
private ObjectMapper objectMapper;
@Override
public void onMessage(DatabusUserChangeMessage message) {
log.info("[Databus] 收到用户变更消息, action={}, userId={}", message.getAction(), message.getUserId());
try {
Map<String, Object> dataMap = new HashMap<>();
dataMap.put("id", message.getUserId());
dataMap.put("username", message.getUsername());
dataMap.put("nickname", message.getNickname());
dataMap.put("remark", message.getRemark());
dataMap.put("deptIds", message.getDeptIds());
dataMap.put("postIds", message.getPostIds());
dataMap.put("email", message.getEmail());
dataMap.put("mobile", message.getMobile());
dataMap.put("sex", message.getSex());
dataMap.put("avatar", message.getAvatar());
dataMap.put("status", message.getStatus());
dataMap.put("userSource", message.getUserSource());
dataMap.put("tenantId", message.getTenantId());
dataMap.put("eventTime", message.getEventTime());
String eventType = String.format("system-user-%s", message.getAction().toLowerCase());
DatabusEvent databusEvent = DatabusEvent.builder()
.eventType(eventType)
.eventAction(message.getAction())
.dataSnapshot(objectMapper.writeValueAsString(dataMap))
.dataVersion(1)
.sourceService(SOURCE_SERVICE)
.sourceTopic(DatabusUserChangeMessage.TOPIC)
.tenantId(message.getTenantId())
.eventTime(message.getEventTime())
.build();
databusIncrementalSyncService.processEvent(databusEvent);
log.info("[Databus] 用户变更事件处理完成, eventType={}, userId={}",
eventType, message.getUserId());
} catch (Exception e) {
log.error("[Databus] 处理用户变更消息失败, action={}, userId={}",
message.getAction(), message.getUserId(), e);
throw new RuntimeException("处理用户变更消息失败", e);
}
}
}

View File

@@ -0,0 +1,85 @@
package com.zt.plat.module.databus.provider;
import com.zt.plat.framework.common.pojo.CommonResult;
import com.zt.plat.framework.databus.server.core.provider.DataProvider;
import com.zt.plat.framework.databus.server.core.provider.DataProviderRegistry;
import com.zt.plat.module.databus.api.dto.CursorPageReqDTO;
import com.zt.plat.module.databus.api.dto.CursorPageResult;
import com.zt.plat.module.databus.api.data.DatabusDeptData;
import com.zt.plat.module.databus.api.provider.DatabusDeptProviderApi;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
/**
* 部门数据提供者
* <p>
* 通过 Feign 调用 system-server 获取部门数据
*
* @author ZT
*/
@Slf4j
@Component
public class DeptDataFeignProvider implements DataProvider<DatabusDeptData> {
public static final String PROVIDER_TYPE = "DEPT";
@Resource
private DatabusDeptProviderApi deptProviderApi;
@Resource
private DataProviderRegistry dataProviderRegistry;
@PostConstruct
public void init() {
dataProviderRegistry.register(this);
}
@Override
public String getProviderType() {
return PROVIDER_TYPE;
}
@Override
public CursorPageData<DatabusDeptData> getPageByCursor(LocalDateTime cursorTime, Long cursorId,
int batchSize, Long tenantId) {
CursorPageReqDTO reqDTO = CursorPageReqDTO.builder()
.cursorTime(cursorTime)
.cursorId(cursorId)
.batchSize(batchSize)
.tenantId(tenantId)
.build();
CommonResult<CursorPageResult<DatabusDeptData>> result = deptProviderApi.getPageByCursor(reqDTO);
if (!result.isSuccess()) {
throw new RuntimeException("获取部门数据失败: " + result.getMsg());
}
CursorPageResult<DatabusDeptData> pageResult = result.getData();
return CursorPageData.of(
pageResult.getList(),
pageResult.getNextCursorTime(),
pageResult.getNextCursorId(),
pageResult.getCount(),
Boolean.TRUE.equals(pageResult.getHasMore()),
(pageResult.getTotal() != null ? pageResult.getTotal() : 0L)
);
}
@Override
public long count(Long tenantId) {
CommonResult<Long> result = deptProviderApi.count(tenantId);
if (!result.isSuccess()) {
throw new RuntimeException("获取部门总数失败: " + result.getMsg());
}
return result.getData();
}
@Override
public Long extractUid(DatabusDeptData data) {
return data.getId();
}
}

View File

@@ -0,0 +1,81 @@
package com.zt.plat.module.databus.provider;
import com.zt.plat.framework.common.pojo.CommonResult;
import com.zt.plat.framework.databus.server.core.provider.DataProvider;
import com.zt.plat.framework.databus.server.core.provider.DataProviderRegistry;
import com.zt.plat.module.databus.api.dto.CursorPageReqDTO;
import com.zt.plat.module.databus.api.dto.CursorPageResult;
import com.zt.plat.module.databus.api.data.DatabusPostData;
import com.zt.plat.module.databus.api.provider.DatabusPostProviderApi;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
/**
* 岗位数据提供者
*/
@Slf4j
@Component
public class PostDataFeignProvider implements DataProvider<DatabusPostData> {
public static final String PROVIDER_TYPE = "POST";
@Resource
private DatabusPostProviderApi postProviderApi;
@Resource
private DataProviderRegistry dataProviderRegistry;
@PostConstruct
public void init() {
dataProviderRegistry.register(this);
}
@Override
public String getProviderType() {
return PROVIDER_TYPE;
}
@Override
public CursorPageData<DatabusPostData> getPageByCursor(LocalDateTime cursorTime, Long cursorId,
int batchSize, Long tenantId) {
CursorPageReqDTO reqDTO = CursorPageReqDTO.builder()
.cursorTime(cursorTime)
.cursorId(cursorId)
.batchSize(batchSize)
.tenantId(tenantId)
.build();
CommonResult<CursorPageResult<DatabusPostData>> result = postProviderApi.getPageByCursor(reqDTO);
if (!result.isSuccess()) {
throw new RuntimeException("获取岗位数据失败: " + result.getMsg());
}
CursorPageResult<DatabusPostData> pageResult = result.getData();
return CursorPageData.of(
pageResult.getList(),
pageResult.getNextCursorTime(),
pageResult.getNextCursorId(),
pageResult.getCount(),
Boolean.TRUE.equals(pageResult.getHasMore()),
(pageResult.getTotal() != null ? pageResult.getTotal() : 0L)
);
}
@Override
public long count(Long tenantId) {
CommonResult<Long> result = postProviderApi.count(tenantId);
if (!result.isSuccess()) {
throw new RuntimeException("获取岗位总数失败: " + result.getMsg());
}
return result.getData();
}
@Override
public Long extractUid(DatabusPostData data) {
return data.getId();
}
}

View File

@@ -0,0 +1,81 @@
package com.zt.plat.module.databus.provider;
import com.zt.plat.framework.common.pojo.CommonResult;
import com.zt.plat.framework.databus.server.core.provider.DataProvider;
import com.zt.plat.framework.databus.server.core.provider.DataProviderRegistry;
import com.zt.plat.module.databus.api.dto.CursorPageReqDTO;
import com.zt.plat.module.databus.api.dto.CursorPageResult;
import com.zt.plat.module.databus.api.data.DatabusAdminUserData;
import com.zt.plat.module.databus.api.provider.DatabusUserProviderApi;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
/**
* 用户数据提供者
*/
@Slf4j
@Component
public class UserDataFeignProvider implements DataProvider<DatabusAdminUserData> {
public static final String PROVIDER_TYPE = "USER";
@Resource
private DatabusUserProviderApi userProviderApi;
@Resource
private DataProviderRegistry dataProviderRegistry;
@PostConstruct
public void init() {
dataProviderRegistry.register(this);
}
@Override
public String getProviderType() {
return PROVIDER_TYPE;
}
@Override
public CursorPageData<DatabusAdminUserData> getPageByCursor(LocalDateTime cursorTime, Long cursorId,
int batchSize, Long tenantId) {
CursorPageReqDTO reqDTO = CursorPageReqDTO.builder()
.cursorTime(cursorTime)
.cursorId(cursorId)
.batchSize(batchSize)
.tenantId(tenantId)
.build();
CommonResult<CursorPageResult<DatabusAdminUserData>> result = userProviderApi.getPageByCursor(reqDTO);
if (!result.isSuccess()) {
throw new RuntimeException("获取用户数据失败: " + result.getMsg());
}
CursorPageResult<DatabusAdminUserData> pageResult = result.getData();
return CursorPageData.of(
pageResult.getList(),
pageResult.getNextCursorTime(),
pageResult.getNextCursorId(),
pageResult.getCount(),
Boolean.TRUE.equals(pageResult.getHasMore()),
(pageResult.getTotal() != null ? pageResult.getTotal() : 0L)
);
}
@Override
public long count(Long tenantId) {
CommonResult<Long> result = userProviderApi.count(tenantId);
if (!result.isSuccess()) {
throw new RuntimeException("获取用户总数失败: " + result.getMsg());
}
return result.getData();
}
@Override
public Long extractUid(DatabusAdminUserData data) {
return data.getId();
}
}