feat(databus): 完成阶段一+二-数据契约层与数据提供者

阶段一:数据契约层(任务 1-16)
- 新增 DatabusDeptData, DatabusAdminUserData, DatabusPostData 数据对象
- 新增 CursorPageReqDTO, CursorPageResult 游标分页 DTO
- 新增 DatabusDeptProviderApi, DatabusUserProviderApi, DatabusPostProviderApi Feign 接口
- 修改 system-api pom.xml 添加 databus-api 依赖

阶段二:数据提供者实现(任务 17-38)
- 新增 DatabusDeptProviderApiImpl, DatabusUserProviderApiImpl, DatabusPostProviderApiImpl Feign 接口实现
- 实现游标分页查询(基于 cursorTime + cursorId 复合游标)
- 新增 DatabusDeptChangeMessage, DatabusUserChangeMessage, DatabusPostChangeMessage MQ 消息类
- 新增 DatabusChangeProducer 消息生产者(支持部门、用户、岗位三实体)
- 修改 DeptServiceImpl, AdminUserServiceImpl, PostServiceImpl 添加事件发布

技术要点:
- 游标分页:cursorTime + cursorId 复合游标解决雪花ID乱序问题
- 事件发布:create/update/delete 操作后异步发送 MQ 消息
- 数据聚合:用户数据包含部门和岗位简要信息

Ref: docs/databus/implementation-checklist.md 任务 1-38
This commit is contained in:
hewencai
2025-12-01 22:25:28 +08:00
parent 50259f514f
commit acdc73999a
19 changed files with 1939 additions and 0 deletions

View File

@@ -22,6 +22,13 @@
<artifactId>zt-common</artifactId>
</dependency>
<!-- DataBus API -->
<dependency>
<groupId>com.zt.plat</groupId>
<artifactId>zt-module-databus-api</artifactId>
<version>${revision}</version>
</dependency>
<!-- Web 相关 -->
<dependency>
<groupId>org.springdoc</groupId> <!-- 接口文档:使用最新版本的 Swagger 模型 -->

View File

@@ -0,0 +1,160 @@
package com.zt.plat.module.system.api.mq;
import lombok.Data;
import lombok.experimental.Accessors;
import java.io.Serializable;
import java.time.LocalDateTime;
/**
* Databus 部门变更消息
* <p>
* 用于跨服务传递部门变更通知
*
* @author ZT
*/
@Data
@Accessors(chain = true)
public class DatabusDeptChangeMessage implements Serializable {
/**
* 消息 Topic
*/
public static final String TOPIC = "databus-change-system-dept";
/**
* 事件动作create-创建 update-更新 delete-删除
*/
private String action;
/**
* 部门ID
*/
private Long deptId;
/**
* 部门编码
*/
private String deptCode;
/**
* 部门名称
*/
private String deptName;
/**
* 部门简称
*/
private String shortName;
/**
* 上级部门ID
*/
private Long parentId;
/**
* 排序
*/
private Integer sort;
/**
* 负责人用户ID
*/
private Long leaderUserId;
/**
* 联系电话
*/
private String phone;
/**
* 邮箱
*/
private String email;
/**
* 状态0正常 1停用
*/
private Integer status;
/**
* 是否公司
*/
private Boolean isCompany;
/**
* 是否集团
*/
private Boolean isGroup;
/**
* 部门来源类型
*/
private Integer deptSource;
/**
* 租户ID
*/
private Long tenantId;
/**
* 事件时间
*/
private LocalDateTime eventTime;
// ==================== 静态工厂方法 ====================
public static DatabusDeptChangeMessage create(Long deptId, String deptCode, String deptName, String shortName,
Long parentId, Integer sort, Long leaderUserId, String phone,
String email, Integer status, Boolean isCompany, Boolean isGroup,
Integer deptSource, Long tenantId, LocalDateTime createTime) {
return new DatabusDeptChangeMessage()
.setAction("create")
.setDeptId(deptId)
.setDeptCode(deptCode)
.setDeptName(deptName)
.setShortName(shortName)
.setParentId(parentId)
.setSort(sort)
.setLeaderUserId(leaderUserId)
.setPhone(phone)
.setEmail(email)
.setStatus(status)
.setIsCompany(isCompany)
.setIsGroup(isGroup)
.setDeptSource(deptSource)
.setTenantId(tenantId)
.setEventTime(createTime != null ? createTime : LocalDateTime.now());
}
public static DatabusDeptChangeMessage update(Long deptId, String deptCode, String deptName, String shortName,
Long parentId, Integer sort, Long leaderUserId, String phone,
String email, Integer status, Boolean isCompany, Boolean isGroup,
Integer deptSource, Long tenantId, LocalDateTime updateTime) {
return new DatabusDeptChangeMessage()
.setAction("update")
.setDeptId(deptId)
.setDeptCode(deptCode)
.setDeptName(deptName)
.setShortName(shortName)
.setParentId(parentId)
.setSort(sort)
.setLeaderUserId(leaderUserId)
.setPhone(phone)
.setEmail(email)
.setStatus(status)
.setIsCompany(isCompany)
.setIsGroup(isGroup)
.setDeptSource(deptSource)
.setTenantId(tenantId)
.setEventTime(updateTime != null ? updateTime : LocalDateTime.now());
}
public static DatabusDeptChangeMessage delete(Long deptId, Long tenantId) {
return new DatabusDeptChangeMessage()
.setAction("delete")
.setDeptId(deptId)
.setTenantId(tenantId)
.setEventTime(LocalDateTime.now());
}
}

View File

@@ -0,0 +1,109 @@
package com.zt.plat.module.system.api.mq;
import lombok.Data;
import lombok.experimental.Accessors;
import java.io.Serializable;
import java.time.LocalDateTime;
/**
* Databus 岗位变更消息
* <p>
* 用于跨服务传递岗位变更通知
*
* @author ZT
*/
@Data
@Accessors(chain = true)
public class DatabusPostChangeMessage implements Serializable {
/**
* 消息 Topic
*/
public static final String TOPIC = "databus-change-system-post";
/**
* 事件动作create-创建 update-更新 delete-删除
*/
private String action;
/**
* 岗位ID
*/
private Long postId;
/**
* 岗位编码
*/
private String postCode;
/**
* 岗位名称
*/
private String postName;
/**
* 排序
*/
private Integer sort;
/**
* 状态0正常 1停用
*/
private Integer status;
/**
* 备注
*/
private String remark;
/**
* 租户IDPostDO 不支持多租户,固定为 null
*/
private Long tenantId;
/**
* 事件时间
*/
private LocalDateTime eventTime;
// ==================== 静态工厂方法 ====================
public static DatabusPostChangeMessage create(Long postId, String postCode, String postName,
Integer sort, Integer status, String remark,
LocalDateTime createTime) {
return new DatabusPostChangeMessage()
.setAction("create")
.setPostId(postId)
.setPostCode(postCode)
.setPostName(postName)
.setSort(sort)
.setStatus(status)
.setRemark(remark)
.setTenantId(null) // PostDO 不支持多租户
.setEventTime(createTime != null ? createTime : LocalDateTime.now());
}
public static DatabusPostChangeMessage update(Long postId, String postCode, String postName,
Integer sort, Integer status, String remark,
LocalDateTime updateTime) {
return new DatabusPostChangeMessage()
.setAction("update")
.setPostId(postId)
.setPostCode(postCode)
.setPostName(postName)
.setSort(sort)
.setStatus(status)
.setRemark(remark)
.setTenantId(null) // PostDO 不支持多租户
.setEventTime(updateTime != null ? updateTime : LocalDateTime.now());
}
public static DatabusPostChangeMessage delete(Long postId) {
return new DatabusPostChangeMessage()
.setAction("delete")
.setPostId(postId)
.setTenantId(null) // PostDO 不支持多租户
.setEventTime(LocalDateTime.now());
}
}

View File

@@ -0,0 +1,154 @@
package com.zt.plat.module.system.api.mq;
import lombok.Data;
import lombok.experimental.Accessors;
import java.io.Serializable;
import java.time.LocalDateTime;
import java.util.Set;
/**
* Databus 用户变更消息
* <p>
* 用于跨服务传递用户变更通知
*
* @author ZT
*/
@Data
@Accessors(chain = true)
public class DatabusUserChangeMessage implements Serializable {
/**
* 消息 Topic
*/
public static final String TOPIC = "databus-change-system-user";
/**
* 事件动作create-创建 update-更新 delete-删除
*/
private String action;
/**
* 用户ID
*/
private Long userId;
/**
* 用户名
*/
private String username;
/**
* 昵称
*/
private String nickname;
/**
* 备注
*/
private String remark;
/**
* 部门ID集合
*/
private Set<Long> deptIds;
/**
* 岗位ID集合
*/
private Set<Long> postIds;
/**
* 邮箱
*/
private String email;
/**
* 手机号码
*/
private String mobile;
/**
* 用户性别0未知 1男 2女
*/
private Integer sex;
/**
* 头像地址
*/
private String avatar;
/**
* 状态0正常 1停用
*/
private Integer status;
/**
* 用户来源类型
*/
private Integer userSource;
/**
* 租户ID
*/
private Long tenantId;
/**
* 事件时间
*/
private LocalDateTime eventTime;
// ==================== 静态工厂方法 ====================
public static DatabusUserChangeMessage create(Long userId, String username, String nickname, String remark,
Set<Long> deptIds, Set<Long> postIds, String email, String mobile,
Integer sex, String avatar, Integer status, Integer userSource,
Long tenantId, LocalDateTime createTime) {
return new DatabusUserChangeMessage()
.setAction("create")
.setUserId(userId)
.setUsername(username)
.setNickname(nickname)
.setRemark(remark)
.setDeptIds(deptIds)
.setPostIds(postIds)
.setEmail(email)
.setMobile(mobile)
.setSex(sex)
.setAvatar(avatar)
.setStatus(status)
.setUserSource(userSource)
.setTenantId(tenantId)
.setEventTime(createTime != null ? createTime : LocalDateTime.now());
}
public static DatabusUserChangeMessage update(Long userId, String username, String nickname, String remark,
Set<Long> deptIds, Set<Long> postIds, String email, String mobile,
Integer sex, String avatar, Integer status, Integer userSource,
Long tenantId, LocalDateTime updateTime) {
return new DatabusUserChangeMessage()
.setAction("update")
.setUserId(userId)
.setUsername(username)
.setNickname(nickname)
.setRemark(remark)
.setDeptIds(deptIds)
.setPostIds(postIds)
.setEmail(email)
.setMobile(mobile)
.setSex(sex)
.setAvatar(avatar)
.setStatus(status)
.setUserSource(userSource)
.setTenantId(tenantId)
.setEventTime(updateTime != null ? updateTime : LocalDateTime.now());
}
public static DatabusUserChangeMessage delete(Long userId, Long tenantId) {
return new DatabusUserChangeMessage()
.setAction("delete")
.setUserId(userId)
.setTenantId(tenantId)
.setEventTime(LocalDateTime.now());
}
}

View File

@@ -0,0 +1,218 @@
package com.zt.plat.module.system.api.databus;
import cn.hutool.core.collection.CollUtil;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.zt.plat.framework.common.pojo.CommonResult;
import com.zt.plat.module.databus.api.data.DatabusDeptData;
import com.zt.plat.module.databus.api.dto.CursorPageReqDTO;
import com.zt.plat.module.databus.api.dto.CursorPageResult;
import com.zt.plat.module.databus.api.provider.DatabusDeptProviderApi;
import com.zt.plat.module.system.dal.dataobject.dept.DeptDO;
import com.zt.plat.module.system.dal.dataobject.user.AdminUserDO;
import com.zt.plat.module.system.dal.mysql.dept.DeptMapper;
import com.zt.plat.module.system.dal.mysql.user.AdminUserMapper;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.RestController;
import java.util.*;
import java.util.stream.Collectors;
import static com.zt.plat.framework.common.pojo.CommonResult.success;
/**
* Databus 部门数据提供者 API 实现
*
* @author ZT
*/
@Slf4j
@RestController
@Validated
public class DatabusDeptProviderApiImpl implements DatabusDeptProviderApi {
@Resource
private DeptMapper deptMapper;
@Resource
private AdminUserMapper userMapper;
@Override
public CommonResult<CursorPageResult<DatabusDeptData>> getPageByCursor(CursorPageReqDTO reqDTO) {
// 构建游标查询条件
LambdaQueryWrapper<DeptDO> queryWrapper = new LambdaQueryWrapper<>();
// 游标条件create_time > cursorTime OR (create_time = cursorTime AND id > cursorId)
if (!reqDTO.isFirstPage()) {
queryWrapper.and(w -> w
.gt(DeptDO::getCreateTime, reqDTO.getCursorTime())
.or(o -> o
.eq(DeptDO::getCreateTime, reqDTO.getCursorTime())
.gt(DeptDO::getId, reqDTO.getCursorId())
)
);
}
// 租户过滤(如果指定)
if (reqDTO.getTenantId() != null) {
queryWrapper.eq(DeptDO::getTenantId, reqDTO.getTenantId());
}
// 按 create_time, id 升序排列,确保顺序稳定
queryWrapper.orderByAsc(DeptDO::getCreateTime)
.orderByAsc(DeptDO::getId);
// 多查一条判断是否有更多数据
int limit = reqDTO.getBatchSize() != null ? reqDTO.getBatchSize() : 100;
queryWrapper.last("LIMIT " + (limit + 1));
List<DeptDO> deptList = deptMapper.selectList(queryWrapper);
// 判断是否有更多
boolean hasMore = deptList.size() > limit;
if (hasMore) {
deptList = deptList.subList(0, limit);
}
if (CollUtil.isEmpty(deptList)) {
return success(CursorPageResult.empty());
}
// 收集负责人用户ID
Set<Long> leaderUserIds = deptList.stream()
.map(DeptDO::getLeaderUserId)
.filter(Objects::nonNull)
.collect(Collectors.toSet());
// 批量查询负责人用户名
Map<Long, String> userNameMap = new HashMap<>();
if (CollUtil.isNotEmpty(leaderUserIds)) {
List<AdminUserDO> users = userMapper.selectBatchIds(leaderUserIds);
userNameMap = users.stream()
.collect(Collectors.toMap(AdminUserDO::getId, AdminUserDO::getNickname, (v1, v2) -> v1));
}
// 转换为同步数据
Map<Long, String> finalUserNameMap = userNameMap;
List<DatabusDeptData> dataList = deptList.stream()
.map(dept -> convertToDatabusDeptData(dept, finalUserNameMap))
.collect(Collectors.toList());
// 获取最后一条数据的游标
DeptDO lastDept = deptList.get(deptList.size() - 1);
// 首次查询时返回总数
Long total = null;
if (reqDTO.isFirstPage()) {
LambdaQueryWrapper<DeptDO> countWrapper = new LambdaQueryWrapper<>();
if (reqDTO.getTenantId() != null) {
countWrapper.eq(DeptDO::getTenantId, reqDTO.getTenantId());
}
total = deptMapper.selectCount(countWrapper);
}
return success(CursorPageResult.of(
dataList,
lastDept.getCreateTime(),
lastDept.getId(),
hasMore,
total
));
}
@Override
public CommonResult<DatabusDeptData> getById(Long id) {
DeptDO dept = deptMapper.selectById(id);
if (dept == null) {
return success(null);
}
// 查询负责人用户名
Map<Long, String> userNameMap = new HashMap<>();
if (dept.getLeaderUserId() != null) {
AdminUserDO user = userMapper.selectById(dept.getLeaderUserId());
if (user != null) {
userNameMap.put(user.getId(), user.getNickname());
}
}
return success(convertToDatabusDeptData(dept, userNameMap));
}
@Override
public CommonResult<List<DatabusDeptData>> getListByIds(List<Long> ids) {
if (CollUtil.isEmpty(ids)) {
return success(Collections.emptyList());
}
List<DeptDO> deptList = deptMapper.selectBatchIds(ids);
if (CollUtil.isEmpty(deptList)) {
return success(Collections.emptyList());
}
// 收集负责人用户ID
Set<Long> leaderUserIds = deptList.stream()
.map(DeptDO::getLeaderUserId)
.filter(Objects::nonNull)
.collect(Collectors.toSet());
// 批量查询负责人用户名
Map<Long, String> userNameMap = new HashMap<>();
if (CollUtil.isNotEmpty(leaderUserIds)) {
List<AdminUserDO> users = userMapper.selectBatchIds(leaderUserIds);
userNameMap = users.stream()
.collect(Collectors.toMap(AdminUserDO::getId, AdminUserDO::getNickname, (v1, v2) -> v1));
}
Map<Long, String> finalUserNameMap = userNameMap;
List<DatabusDeptData> dataList = deptList.stream()
.map(dept -> convertToDatabusDeptData(dept, finalUserNameMap))
.collect(Collectors.toList());
return success(dataList);
}
@Override
public CommonResult<Long> count(Long tenantId) {
LambdaQueryWrapper<DeptDO> queryWrapper = new LambdaQueryWrapper<>();
if (tenantId != null) {
queryWrapper.eq(DeptDO::getTenantId, tenantId);
}
return success(deptMapper.selectCount(queryWrapper));
}
/**
* 将 DeptDO 转换为 DatabusDeptData
*/
private DatabusDeptData convertToDatabusDeptData(DeptDO dept, Map<Long, String> userNameMap) {
// 根据 isCompany 反推 deptType
Integer deptType = null;
if (Boolean.TRUE.equals(dept.getIsCompany())) {
deptType = 28; // 公司
} else if (Boolean.FALSE.equals(dept.getIsCompany())) {
deptType = 26; // 部门
}
return DatabusDeptData.builder()
.id(dept.getId())
.code(dept.getCode())
.name(dept.getName())
.shortName(dept.getShortName())
.parentId(dept.getParentId())
.sort(dept.getSort())
.status(dept.getStatus())
.deptType(deptType)
.isGroup(dept.getIsGroup())
.isCompany(dept.getIsCompany())
.deptSource(dept.getDeptSource())
.leaderUserId(dept.getLeaderUserId())
.leaderUserName(dept.getLeaderUserId() != null ? userNameMap.get(dept.getLeaderUserId()) : null)
.phone(dept.getPhone())
.email(dept.getEmail())
.tenantId(dept.getTenantId())
.createTime(dept.getCreateTime())
.updateTime(dept.getUpdateTime())
.build();
}
}

View File

@@ -0,0 +1,148 @@
package com.zt.plat.module.system.api.databus;
import cn.hutool.core.collection.CollUtil;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.zt.plat.framework.common.pojo.CommonResult;
import com.zt.plat.module.databus.api.data.DatabusPostData;
import com.zt.plat.module.databus.api.dto.CursorPageReqDTO;
import com.zt.plat.module.databus.api.dto.CursorPageResult;
import com.zt.plat.module.databus.api.provider.DatabusPostProviderApi;
import com.zt.plat.module.system.dal.dataobject.dept.PostDO;
import com.zt.plat.module.system.dal.mysql.dept.PostMapper;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.RestController;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import static com.zt.plat.framework.common.pojo.CommonResult.success;
/**
* Databus 岗位数据提供者 API 实现
*
* @author ZT
*/
@Slf4j
@RestController
@Validated
public class DatabusPostProviderApiImpl implements DatabusPostProviderApi {
@Resource
private PostMapper postMapper;
@Override
public CommonResult<CursorPageResult<DatabusPostData>> getPageByCursor(CursorPageReqDTO reqDTO) {
// 构建游标查询条件
LambdaQueryWrapper<PostDO> queryWrapper = new LambdaQueryWrapper<>();
// 游标条件create_time > cursorTime OR (create_time = cursorTime AND id > cursorId)
if (!reqDTO.isFirstPage()) {
queryWrapper.and(w -> w
.gt(PostDO::getCreateTime, reqDTO.getCursorTime())
.or(o -> o
.eq(PostDO::getCreateTime, reqDTO.getCursorTime())
.gt(PostDO::getId, reqDTO.getCursorId())
)
);
}
// PostDO 不支持多租户,忽略租户过滤
// 按 create_time, id 升序排列,确保顺序稳定
queryWrapper.orderByAsc(PostDO::getCreateTime)
.orderByAsc(PostDO::getId);
// 多查一条判断是否有更多数据
int limit = reqDTO.getBatchSize() != null ? reqDTO.getBatchSize() : 100;
queryWrapper.last("LIMIT " + (limit + 1));
List<PostDO> postList = postMapper.selectList(queryWrapper);
// 判断是否有更多
boolean hasMore = postList.size() > limit;
if (hasMore) {
postList = postList.subList(0, limit);
}
if (CollUtil.isEmpty(postList)) {
return success(CursorPageResult.empty());
}
// 转换为同步数据
List<DatabusPostData> dataList = postList.stream()
.map(this::convertToDatabusPostData)
.collect(Collectors.toList());
// 获取最后一条数据的游标
PostDO lastPost = postList.get(postList.size() - 1);
// 首次查询时返回总数
Long total = null;
if (reqDTO.isFirstPage()) {
total = postMapper.selectCount(new LambdaQueryWrapper<>());
}
return success(CursorPageResult.of(
dataList,
lastPost.getCreateTime(),
lastPost.getId(),
hasMore,
total
));
}
@Override
public CommonResult<DatabusPostData> getById(Long id) {
PostDO post = postMapper.selectById(id);
if (post == null) {
return success(null);
}
return success(convertToDatabusPostData(post));
}
@Override
public CommonResult<List<DatabusPostData>> getListByIds(List<Long> ids) {
if (CollUtil.isEmpty(ids)) {
return success(Collections.emptyList());
}
List<PostDO> postList = postMapper.selectBatchIds(ids);
if (CollUtil.isEmpty(postList)) {
return success(Collections.emptyList());
}
List<DatabusPostData> dataList = postList.stream()
.map(this::convertToDatabusPostData)
.collect(Collectors.toList());
return success(dataList);
}
@Override
public CommonResult<Long> count(Long tenantId) {
// PostDO 不支持多租户,忽略租户参数
return success(postMapper.selectCount(new LambdaQueryWrapper<>()));
}
/**
* 将 PostDO 转换为 DatabusPostData
*/
private DatabusPostData convertToDatabusPostData(PostDO post) {
return DatabusPostData.builder()
.id(post.getId())
.code(post.getCode())
.name(post.getName())
.sort(post.getSort())
.status(post.getStatus())
.remark(post.getRemark())
.tenantId(null) // PostDO 不支持多租户
.createTime(post.getCreateTime())
.updateTime(post.getUpdateTime())
.build();
}
}

View File

@@ -0,0 +1,267 @@
package com.zt.plat.module.system.api.databus;
import cn.hutool.core.collection.CollUtil;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.zt.plat.framework.common.pojo.CommonResult;
import com.zt.plat.module.databus.api.data.DatabusAdminUserData;
import com.zt.plat.module.databus.api.data.DatabusAdminUserData.DeptSimpleInfo;
import com.zt.plat.module.databus.api.data.DatabusAdminUserData.PostSimpleInfo;
import com.zt.plat.module.databus.api.dto.CursorPageReqDTO;
import com.zt.plat.module.databus.api.dto.CursorPageResult;
import com.zt.plat.module.databus.api.provider.DatabusUserProviderApi;
import com.zt.plat.module.system.dal.dataobject.dept.DeptDO;
import com.zt.plat.module.system.dal.dataobject.dept.PostDO;
import com.zt.plat.module.system.dal.dataobject.user.AdminUserDO;
import com.zt.plat.module.system.dal.dataobject.userdept.UserDeptDO;
import com.zt.plat.module.system.dal.mysql.dept.DeptMapper;
import com.zt.plat.module.system.dal.mysql.dept.PostMapper;
import com.zt.plat.module.system.dal.mysql.user.AdminUserMapper;
import com.zt.plat.module.system.dal.mysql.userdept.UserDeptMapper;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.RestController;
import java.util.*;
import java.util.stream.Collectors;
import static com.zt.plat.framework.common.pojo.CommonResult.success;
/**
* Databus 用户数据提供者 API 实现
*
* @author ZT
*/
@Slf4j
@RestController
@Validated
public class DatabusUserProviderApiImpl implements DatabusUserProviderApi {
@Resource
private AdminUserMapper userMapper;
@Resource
private UserDeptMapper userDeptMapper;
@Resource
private DeptMapper deptMapper;
@Resource
private PostMapper postMapper;
@Override
public CommonResult<CursorPageResult<DatabusAdminUserData>> getPageByCursor(CursorPageReqDTO reqDTO) {
// 构建游标查询条件
LambdaQueryWrapper<AdminUserDO> queryWrapper = new LambdaQueryWrapper<>();
// 游标条件create_time > cursorTime OR (create_time = cursorTime AND id > cursorId)
if (!reqDTO.isFirstPage()) {
queryWrapper.and(w -> w
.gt(AdminUserDO::getCreateTime, reqDTO.getCursorTime())
.or(o -> o
.eq(AdminUserDO::getCreateTime, reqDTO.getCursorTime())
.gt(AdminUserDO::getId, reqDTO.getCursorId())
)
);
}
// 租户过滤(如果指定)
if (reqDTO.getTenantId() != null) {
queryWrapper.eq(AdminUserDO::getTenantId, reqDTO.getTenantId());
}
// 按 create_time, id 升序排列,确保顺序稳定
queryWrapper.orderByAsc(AdminUserDO::getCreateTime)
.orderByAsc(AdminUserDO::getId);
// 多查一条判断是否有更多数据
int limit = reqDTO.getBatchSize() != null ? reqDTO.getBatchSize() : 100;
queryWrapper.last("LIMIT " + (limit + 1));
List<AdminUserDO> userList = userMapper.selectList(queryWrapper);
// 判断是否有更多
boolean hasMore = userList.size() > limit;
if (hasMore) {
userList = userList.subList(0, limit);
}
if (CollUtil.isEmpty(userList)) {
return success(CursorPageResult.empty());
}
// 批量聚合用户关联数据
List<DatabusAdminUserData> dataList = aggregateUserData(userList);
// 获取最后一条数据的游标
AdminUserDO lastUser = userList.get(userList.size() - 1);
// 首次查询时返回总数
Long total = null;
if (reqDTO.isFirstPage()) {
LambdaQueryWrapper<AdminUserDO> countWrapper = new LambdaQueryWrapper<>();
if (reqDTO.getTenantId() != null) {
countWrapper.eq(AdminUserDO::getTenantId, reqDTO.getTenantId());
}
total = userMapper.selectCount(countWrapper);
}
return success(CursorPageResult.of(
dataList,
lastUser.getCreateTime(),
lastUser.getId(),
hasMore,
total
));
}
@Override
public CommonResult<DatabusAdminUserData> getById(Long id) {
AdminUserDO user = userMapper.selectById(id);
if (user == null) {
return success(null);
}
List<DatabusAdminUserData> dataList = aggregateUserData(Collections.singletonList(user));
return success(dataList.isEmpty() ? null : dataList.get(0));
}
@Override
public CommonResult<List<DatabusAdminUserData>> getListByIds(List<Long> ids) {
if (CollUtil.isEmpty(ids)) {
return success(Collections.emptyList());
}
List<AdminUserDO> userList = userMapper.selectBatchIds(ids);
if (CollUtil.isEmpty(userList)) {
return success(Collections.emptyList());
}
return success(aggregateUserData(userList));
}
@Override
public CommonResult<Long> count(Long tenantId) {
LambdaQueryWrapper<AdminUserDO> queryWrapper = new LambdaQueryWrapper<>();
if (tenantId != null) {
queryWrapper.eq(AdminUserDO::getTenantId, tenantId);
}
return success(userMapper.selectCount(queryWrapper));
}
/**
* 批量聚合用户关联数据(部门、岗位)
*/
private List<DatabusAdminUserData> aggregateUserData(List<AdminUserDO> userList) {
if (CollUtil.isEmpty(userList)) {
return Collections.emptyList();
}
// 收集所有用户ID
List<Long> userIds = userList.stream()
.map(AdminUserDO::getId)
.collect(Collectors.toList());
// 批量查询用户-部门关系
List<UserDeptDO> userDeptList = userDeptMapper.selectValidListByUserIds(userIds);
Map<Long, List<Long>> userDeptIdsMap = userDeptList.stream()
.collect(Collectors.groupingBy(
UserDeptDO::getUserId,
Collectors.mapping(UserDeptDO::getDeptId, Collectors.toList())
));
// 收集所有部门ID
Set<Long> allDeptIds = userDeptList.stream()
.map(UserDeptDO::getDeptId)
.collect(Collectors.toSet());
// 批量查询部门信息
Map<Long, DeptDO> deptMap = new HashMap<>();
if (CollUtil.isNotEmpty(allDeptIds)) {
List<DeptDO> deptList = deptMapper.selectBatchIds(allDeptIds);
deptMap = deptList.stream()
.collect(Collectors.toMap(DeptDO::getId, d -> d, (v1, v2) -> v1));
}
// 收集所有岗位ID
Set<Long> allPostIds = userList.stream()
.map(AdminUserDO::getPostIds)
.filter(Objects::nonNull)
.flatMap(Set::stream)
.collect(Collectors.toSet());
// 批量查询岗位信息
Map<Long, PostDO> postMap = new HashMap<>();
if (CollUtil.isNotEmpty(allPostIds)) {
List<PostDO> postList = postMapper.selectBatchIds(allPostIds);
postMap = postList.stream()
.collect(Collectors.toMap(PostDO::getId, p -> p, (v1, v2) -> v1));
}
// 转换为同步数据
Map<Long, DeptDO> finalDeptMap = deptMap;
Map<Long, PostDO> finalPostMap = postMap;
return userList.stream()
.map(user -> convertToDatabusAdminUserData(user, userDeptIdsMap, finalDeptMap, finalPostMap))
.collect(Collectors.toList());
}
/**
* 将 AdminUserDO 转换为 DatabusAdminUserData
*/
private DatabusAdminUserData convertToDatabusAdminUserData(AdminUserDO user,
Map<Long, List<Long>> userDeptIdsMap,
Map<Long, DeptDO> deptMap,
Map<Long, PostDO> postMap) {
// 获取用户的部门ID列表
List<Long> deptIds = userDeptIdsMap.getOrDefault(user.getId(), Collections.emptyList());
// 构建部门简要信息列表
List<DeptSimpleInfo> depts = deptIds.stream()
.map(deptMap::get)
.filter(Objects::nonNull)
.map(dept -> DeptSimpleInfo.builder()
.deptId(dept.getId())
.deptCode(dept.getCode())
.deptName(dept.getName())
.build())
.collect(Collectors.toList());
// 获取用户的岗位ID列表
Set<Long> postIds = user.getPostIds();
List<Long> postIdList = postIds != null ? new ArrayList<>(postIds) : Collections.emptyList();
// 构建岗位简要信息列表
List<PostSimpleInfo> posts = postIdList.stream()
.map(postMap::get)
.filter(Objects::nonNull)
.map(post -> PostSimpleInfo.builder()
.postId(post.getId())
.postCode(post.getCode())
.postName(post.getName())
.build())
.collect(Collectors.toList());
return DatabusAdminUserData.builder()
.id(user.getId())
.username(user.getUsername())
.nickname(user.getNickname())
.mobile(user.getMobile())
.email(user.getEmail())
.sex(user.getSex())
.avatar(user.getAvatar())
.status(user.getStatus())
.remark(user.getRemark())
.userSource(user.getUserSource())
.deptIds(deptIds)
.depts(depts)
.postIds(postIdList)
.posts(posts)
.tenantId(user.getTenantId())
.createTime(user.getCreateTime())
.updateTime(user.getUpdateTime())
.build();
}
}

View File

@@ -0,0 +1,193 @@
package com.zt.plat.module.system.mq.producer.databus;
import com.zt.plat.module.system.api.mq.DatabusDeptChangeMessage;
import com.zt.plat.module.system.api.mq.DatabusPostChangeMessage;
import com.zt.plat.module.system.api.mq.DatabusUserChangeMessage;
import com.zt.plat.module.system.dal.dataobject.dept.DeptDO;
import com.zt.plat.module.system.dal.dataobject.dept.PostDO;
import com.zt.plat.module.system.dal.dataobject.user.AdminUserDO;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.stereotype.Component;
import jakarta.annotation.Resource;
/**
* Databus 数据变更消息生产者
* <p>
* 用于发送部门、用户、岗位变更消息到 MQ供 databus-server 消费
*
* @author ZT
*/
@Slf4j
@Component
public class DatabusChangeProducer {
@Resource
private RocketMQTemplate rocketMQTemplate;
// ==================== 部门变更消息 ====================
/**
* 发送部门创建消息
*/
public void sendDeptCreatedMessage(DeptDO dept) {
DatabusDeptChangeMessage message = DatabusDeptChangeMessage.create(
dept.getId(), dept.getCode(), dept.getName(), dept.getShortName(),
dept.getParentId(), dept.getSort(), dept.getLeaderUserId(), dept.getPhone(),
dept.getEmail(), dept.getStatus(), dept.getIsCompany(), dept.getIsGroup(),
dept.getDeptSource(), dept.getTenantId(), dept.getCreateTime());
sendDeptChangeMessage(message);
}
/**
* 发送部门更新消息
*/
public void sendDeptUpdatedMessage(DeptDO dept) {
DatabusDeptChangeMessage message = DatabusDeptChangeMessage.update(
dept.getId(), dept.getCode(), dept.getName(), dept.getShortName(),
dept.getParentId(), dept.getSort(), dept.getLeaderUserId(), dept.getPhone(),
dept.getEmail(), dept.getStatus(), dept.getIsCompany(), dept.getIsGroup(),
dept.getDeptSource(), dept.getTenantId(), dept.getUpdateTime());
sendDeptChangeMessage(message);
}
/**
* 发送部门删除消息
*/
public void sendDeptDeletedMessage(Long deptId, Long tenantId) {
DatabusDeptChangeMessage message = DatabusDeptChangeMessage.delete(deptId, tenantId);
sendDeptChangeMessage(message);
}
private void sendDeptChangeMessage(DatabusDeptChangeMessage message) {
try {
rocketMQTemplate.asyncSend(DatabusDeptChangeMessage.TOPIC, message, new org.apache.rocketmq.client.producer.SendCallback() {
@Override
public void onSuccess(org.apache.rocketmq.client.producer.SendResult sendResult) {
log.info("[Databus] 部门变更消息发送成功, action={}, deptId={}, msgId={}",
message.getAction(), message.getDeptId(), sendResult.getMsgId());
}
@Override
public void onException(Throwable e) {
log.error("[Databus] 部门变更消息发送失败, action={}, deptId={}",
message.getAction(), message.getDeptId(), e);
}
});
} catch (Exception e) {
log.error("[Databus] 部门变更消息发送异常, action={}, deptId={}",
message.getAction(), message.getDeptId(), e);
}
}
// ==================== 用户变更消息 ====================
/**
* 发送用户创建消息(完整数据)
*/
public void sendUserCreatedMessage(AdminUserDO user) {
DatabusUserChangeMessage message = DatabusUserChangeMessage.create(
user.getId(), user.getUsername(), user.getNickname(),
user.getRemark(), user.getDeptIds(), user.getPostIds(),
user.getEmail(), user.getMobile(), user.getSex(), user.getAvatar(),
user.getStatus(), user.getUserSource(),
user.getTenantId(), user.getCreateTime());
sendUserChangeMessage(message);
}
/**
* 发送用户更新消息(完整数据)
*/
public void sendUserUpdatedMessage(AdminUserDO user) {
DatabusUserChangeMessage message = DatabusUserChangeMessage.update(
user.getId(), user.getUsername(), user.getNickname(),
user.getRemark(), user.getDeptIds(), user.getPostIds(),
user.getEmail(), user.getMobile(), user.getSex(), user.getAvatar(),
user.getStatus(), user.getUserSource(),
user.getTenantId(), user.getUpdateTime());
sendUserChangeMessage(message);
}
/**
* 发送用户删除消息
*/
public void sendUserDeletedMessage(Long userId, Long tenantId) {
DatabusUserChangeMessage message = DatabusUserChangeMessage.delete(userId, tenantId);
sendUserChangeMessage(message);
}
private void sendUserChangeMessage(DatabusUserChangeMessage message) {
try {
rocketMQTemplate.asyncSend(DatabusUserChangeMessage.TOPIC, message, new org.apache.rocketmq.client.producer.SendCallback() {
@Override
public void onSuccess(org.apache.rocketmq.client.producer.SendResult sendResult) {
log.info("[Databus] 用户变更消息发送成功, action={}, userId={}, msgId={}",
message.getAction(), message.getUserId(), sendResult.getMsgId());
}
@Override
public void onException(Throwable e) {
log.error("[Databus] 用户变更消息发送失败, action={}, userId={}",
message.getAction(), message.getUserId(), e);
}
});
} catch (Exception e) {
log.error("[Databus] 用户变更消息发送异常, action={}, userId={}",
message.getAction(), message.getUserId(), e);
}
}
// ==================== 岗位变更消息 ====================
/**
* 发送岗位创建消息
*/
public void sendPostCreatedMessage(PostDO post) {
DatabusPostChangeMessage message = DatabusPostChangeMessage.create(
post.getId(), post.getCode(), post.getName(),
post.getSort(), post.getStatus(), post.getRemark(),
post.getCreateTime());
sendPostChangeMessage(message);
}
/**
* 发送岗位更新消息
*/
public void sendPostUpdatedMessage(PostDO post) {
DatabusPostChangeMessage message = DatabusPostChangeMessage.update(
post.getId(), post.getCode(), post.getName(),
post.getSort(), post.getStatus(), post.getRemark(),
post.getUpdateTime());
sendPostChangeMessage(message);
}
/**
* 发送岗位删除消息
*/
public void sendPostDeletedMessage(Long postId) {
DatabusPostChangeMessage message = DatabusPostChangeMessage.delete(postId);
sendPostChangeMessage(message);
}
private void sendPostChangeMessage(DatabusPostChangeMessage message) {
try {
rocketMQTemplate.asyncSend(DatabusPostChangeMessage.TOPIC, message, new org.apache.rocketmq.client.producer.SendCallback() {
@Override
public void onSuccess(org.apache.rocketmq.client.producer.SendResult sendResult) {
log.info("[Databus] 岗位变更消息发送成功, action={}, postId={}, msgId={}",
message.getAction(), message.getPostId(), sendResult.getMsgId());
}
@Override
public void onException(Throwable e) {
log.error("[Databus] 岗位变更消息发送失败, action={}, postId={}",
message.getAction(), message.getPostId(), e);
}
});
} catch (Exception e) {
log.error("[Databus] 岗位变更消息发送异常, action={}, postId={}",
message.getAction(), message.getPostId(), e);
}
}
}

View File

@@ -50,6 +50,8 @@ public class DeptServiceImpl implements DeptService {
private DeptMapper deptMapper;
@Resource
private UserDeptMapper userDeptMapper;
@Resource
private com.zt.plat.module.system.mq.producer.databus.DatabusChangeProducer databusChangeProducer;
private static final String ROOT_CODE_PREFIX = "ZT";
private static final int CODE_SEGMENT_LENGTH = 3;
@@ -99,6 +101,10 @@ public class DeptServiceImpl implements DeptService {
dept.setDeptSource(DeptSourceEnum.EXTERNAL.getSource());
}
deptMapper.insert(dept);
// 发布部门创建事件
databusChangeProducer.sendDeptCreatedMessage(dept);
return dept.getId();
}
@@ -153,6 +159,12 @@ public class DeptServiceImpl implements DeptService {
DeptDO updateObj = BeanUtils.toBean(updateReqVO, DeptDO.class);
deptMapper.updateById(updateObj);
// 发布部门更新事件(重新查询获取完整数据)
DeptDO updatedDept = deptMapper.selectById(updateObj.getId());
if (updatedDept != null) {
databusChangeProducer.sendDeptUpdatedMessage(updatedDept);
}
if (parentChanged) {
refreshChildCodesRecursively(updateObj.getId(), updateReqVO.getCode());
}
@@ -168,8 +180,16 @@ public class DeptServiceImpl implements DeptService {
if (deptMapper.selectCountByParentId(id) > 0) {
throw exception(DEPT_EXITS_CHILDREN);
}
// 删除前先查询部门信息(用于发布事件)
DeptDO dept = deptMapper.selectById(id);
Long tenantId = (dept != null) ? dept.getTenantId() : null;
// 删除部门
deptMapper.deleteById(id);
// 发布部门删除事件
databusChangeProducer.sendDeptDeletedMessage(id, tenantId);
}
@VisibleForTesting

View File

@@ -34,6 +34,9 @@ public class PostServiceImpl implements PostService {
@Resource
private PostMapper postMapper;
@Resource
private com.zt.plat.module.system.mq.producer.databus.DatabusChangeProducer databusChangeProducer;
@Override
public Long createPost(PostSaveReqVO createReqVO) {
// 校验正确性
@@ -42,6 +45,10 @@ public class PostServiceImpl implements PostService {
// 插入岗位
PostDO post = BeanUtils.toBean(createReqVO, PostDO.class);
postMapper.insert(post);
// 发布岗位创建事件
databusChangeProducer.sendPostCreatedMessage(post);
return post.getId();
}
@@ -53,14 +60,24 @@ public class PostServiceImpl implements PostService {
// 更新岗位
PostDO updateObj = BeanUtils.toBean(updateReqVO, PostDO.class);
postMapper.updateById(updateObj);
// 发布岗位更新事件(重新查询获取完整数据)
PostDO updatedPost = postMapper.selectById(updateObj.getId());
if (updatedPost != null) {
databusChangeProducer.sendPostUpdatedMessage(updatedPost);
}
}
@Override
public void deletePost(Long id) {
// 校验是否存在
validatePostExists(id);
// 删除部门
postMapper.deleteById(id);
// 发布岗位删除事件PostDO 不支持多租户tenantId 为 null
databusChangeProducer.sendPostDeletedMessage(id);
}
private void validatePostForCreateOrUpdate(Long id, String name, String code) {

View File

@@ -93,6 +93,9 @@ public class AdminUserServiceImpl implements AdminUserService {
@Resource
private ConfigApi configApi;
@Resource
private com.zt.plat.module.system.mq.producer.databus.DatabusChangeProducer databusChangeProducer;
@Resource
private UserDeptService userDeptService;
@@ -141,6 +144,9 @@ public class AdminUserServiceImpl implements AdminUserService {
postId -> new UserPostDO().setUserId(user.getId()).setPostId(postId)));
}
// 2.4 发布用户创建事件
databusChangeProducer.sendUserCreatedMessage(user);
// 3. 记录操作日志上下文
LogRecordContext.putVariable("user", user);
return user.getId();
@@ -232,6 +238,12 @@ public class AdminUserServiceImpl implements AdminUserService {
// 2.3 更新岗位
updateUserPost(updateReqVO, updateObj);
// 2.4 发布用户更新事件(重新查询获取完整数据)
AdminUserDO updatedUser = userMapper.selectById(updateObj.getId());
if (updatedUser != null) {
databusChangeProducer.sendUserUpdatedMessage(updatedUser);
}
// 3. 记录操作日志上下文
LogRecordContext.putVariable(DiffParseFunction.OLD_OBJECT, BeanUtils.toBean(oldUser, UserSaveReqVO.class));
LogRecordContext.putVariable("user", oldUser);
@@ -321,6 +333,9 @@ public class AdminUserServiceImpl implements AdminUserService {
// 2.2 删除用户岗位
userPostMapper.deleteByUserId(id);
// 2.3 发布用户删除事件
databusChangeProducer.sendUserDeletedMessage(id, user.getTenantId());
// 3. 记录操作日志上下文
LogRecordContext.putVariable("user", user);
}