feat(databus): 完成阶段一+二-数据契约层与数据提供者

阶段一:数据契约层(任务 1-16)
- 新增 DatabusDeptData, DatabusAdminUserData, DatabusPostData 数据对象
- 新增 CursorPageReqDTO, CursorPageResult 游标分页 DTO
- 新增 DatabusDeptProviderApi, DatabusUserProviderApi, DatabusPostProviderApi Feign 接口
- 修改 system-api pom.xml 添加 databus-api 依赖

阶段二:数据提供者实现(任务 17-38)
- 新增 DatabusDeptProviderApiImpl, DatabusUserProviderApiImpl, DatabusPostProviderApiImpl Feign 接口实现
- 实现游标分页查询(基于 cursorTime + cursorId 复合游标)
- 新增 DatabusDeptChangeMessage, DatabusUserChangeMessage, DatabusPostChangeMessage MQ 消息类
- 新增 DatabusChangeProducer 消息生产者(支持部门、用户、岗位三实体)
- 修改 DeptServiceImpl, AdminUserServiceImpl, PostServiceImpl 添加事件发布

技术要点:
- 游标分页:cursorTime + cursorId 复合游标解决雪花ID乱序问题
- 事件发布:create/update/delete 操作后异步发送 MQ 消息
- 数据聚合:用户数据包含部门和岗位简要信息

Ref: docs/databus/implementation-checklist.md 任务 1-38
This commit is contained in:
hewencai
2025-12-01 22:25:28 +08:00
parent 7fae3203bc
commit f5ba493f95
19 changed files with 1939 additions and 0 deletions

View File

@@ -0,0 +1,218 @@
package com.zt.plat.module.system.api.databus;
import cn.hutool.core.collection.CollUtil;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.zt.plat.framework.common.pojo.CommonResult;
import com.zt.plat.module.databus.api.data.DatabusDeptData;
import com.zt.plat.module.databus.api.dto.CursorPageReqDTO;
import com.zt.plat.module.databus.api.dto.CursorPageResult;
import com.zt.plat.module.databus.api.provider.DatabusDeptProviderApi;
import com.zt.plat.module.system.dal.dataobject.dept.DeptDO;
import com.zt.plat.module.system.dal.dataobject.user.AdminUserDO;
import com.zt.plat.module.system.dal.mysql.dept.DeptMapper;
import com.zt.plat.module.system.dal.mysql.user.AdminUserMapper;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.RestController;
import java.util.*;
import java.util.stream.Collectors;
import static com.zt.plat.framework.common.pojo.CommonResult.success;
/**
* Databus 部门数据提供者 API 实现
*
* @author ZT
*/
@Slf4j
@RestController
@Validated
public class DatabusDeptProviderApiImpl implements DatabusDeptProviderApi {
@Resource
private DeptMapper deptMapper;
@Resource
private AdminUserMapper userMapper;
@Override
public CommonResult<CursorPageResult<DatabusDeptData>> getPageByCursor(CursorPageReqDTO reqDTO) {
// 构建游标查询条件
LambdaQueryWrapper<DeptDO> queryWrapper = new LambdaQueryWrapper<>();
// 游标条件create_time > cursorTime OR (create_time = cursorTime AND id > cursorId)
if (!reqDTO.isFirstPage()) {
queryWrapper.and(w -> w
.gt(DeptDO::getCreateTime, reqDTO.getCursorTime())
.or(o -> o
.eq(DeptDO::getCreateTime, reqDTO.getCursorTime())
.gt(DeptDO::getId, reqDTO.getCursorId())
)
);
}
// 租户过滤(如果指定)
if (reqDTO.getTenantId() != null) {
queryWrapper.eq(DeptDO::getTenantId, reqDTO.getTenantId());
}
// 按 create_time, id 升序排列,确保顺序稳定
queryWrapper.orderByAsc(DeptDO::getCreateTime)
.orderByAsc(DeptDO::getId);
// 多查一条判断是否有更多数据
int limit = reqDTO.getBatchSize() != null ? reqDTO.getBatchSize() : 100;
queryWrapper.last("LIMIT " + (limit + 1));
List<DeptDO> deptList = deptMapper.selectList(queryWrapper);
// 判断是否有更多
boolean hasMore = deptList.size() > limit;
if (hasMore) {
deptList = deptList.subList(0, limit);
}
if (CollUtil.isEmpty(deptList)) {
return success(CursorPageResult.empty());
}
// 收集负责人用户ID
Set<Long> leaderUserIds = deptList.stream()
.map(DeptDO::getLeaderUserId)
.filter(Objects::nonNull)
.collect(Collectors.toSet());
// 批量查询负责人用户名
Map<Long, String> userNameMap = new HashMap<>();
if (CollUtil.isNotEmpty(leaderUserIds)) {
List<AdminUserDO> users = userMapper.selectBatchIds(leaderUserIds);
userNameMap = users.stream()
.collect(Collectors.toMap(AdminUserDO::getId, AdminUserDO::getNickname, (v1, v2) -> v1));
}
// 转换为同步数据
Map<Long, String> finalUserNameMap = userNameMap;
List<DatabusDeptData> dataList = deptList.stream()
.map(dept -> convertToDatabusDeptData(dept, finalUserNameMap))
.collect(Collectors.toList());
// 获取最后一条数据的游标
DeptDO lastDept = deptList.get(deptList.size() - 1);
// 首次查询时返回总数
Long total = null;
if (reqDTO.isFirstPage()) {
LambdaQueryWrapper<DeptDO> countWrapper = new LambdaQueryWrapper<>();
if (reqDTO.getTenantId() != null) {
countWrapper.eq(DeptDO::getTenantId, reqDTO.getTenantId());
}
total = deptMapper.selectCount(countWrapper);
}
return success(CursorPageResult.of(
dataList,
lastDept.getCreateTime(),
lastDept.getId(),
hasMore,
total
));
}
@Override
public CommonResult<DatabusDeptData> getById(Long id) {
DeptDO dept = deptMapper.selectById(id);
if (dept == null) {
return success(null);
}
// 查询负责人用户名
Map<Long, String> userNameMap = new HashMap<>();
if (dept.getLeaderUserId() != null) {
AdminUserDO user = userMapper.selectById(dept.getLeaderUserId());
if (user != null) {
userNameMap.put(user.getId(), user.getNickname());
}
}
return success(convertToDatabusDeptData(dept, userNameMap));
}
@Override
public CommonResult<List<DatabusDeptData>> getListByIds(List<Long> ids) {
if (CollUtil.isEmpty(ids)) {
return success(Collections.emptyList());
}
List<DeptDO> deptList = deptMapper.selectBatchIds(ids);
if (CollUtil.isEmpty(deptList)) {
return success(Collections.emptyList());
}
// 收集负责人用户ID
Set<Long> leaderUserIds = deptList.stream()
.map(DeptDO::getLeaderUserId)
.filter(Objects::nonNull)
.collect(Collectors.toSet());
// 批量查询负责人用户名
Map<Long, String> userNameMap = new HashMap<>();
if (CollUtil.isNotEmpty(leaderUserIds)) {
List<AdminUserDO> users = userMapper.selectBatchIds(leaderUserIds);
userNameMap = users.stream()
.collect(Collectors.toMap(AdminUserDO::getId, AdminUserDO::getNickname, (v1, v2) -> v1));
}
Map<Long, String> finalUserNameMap = userNameMap;
List<DatabusDeptData> dataList = deptList.stream()
.map(dept -> convertToDatabusDeptData(dept, finalUserNameMap))
.collect(Collectors.toList());
return success(dataList);
}
@Override
public CommonResult<Long> count(Long tenantId) {
LambdaQueryWrapper<DeptDO> queryWrapper = new LambdaQueryWrapper<>();
if (tenantId != null) {
queryWrapper.eq(DeptDO::getTenantId, tenantId);
}
return success(deptMapper.selectCount(queryWrapper));
}
/**
* 将 DeptDO 转换为 DatabusDeptData
*/
private DatabusDeptData convertToDatabusDeptData(DeptDO dept, Map<Long, String> userNameMap) {
// 根据 isCompany 反推 deptType
Integer deptType = null;
if (Boolean.TRUE.equals(dept.getIsCompany())) {
deptType = 28; // 公司
} else if (Boolean.FALSE.equals(dept.getIsCompany())) {
deptType = 26; // 部门
}
return DatabusDeptData.builder()
.id(dept.getId())
.code(dept.getCode())
.name(dept.getName())
.shortName(dept.getShortName())
.parentId(dept.getParentId())
.sort(dept.getSort())
.status(dept.getStatus())
.deptType(deptType)
.isGroup(dept.getIsGroup())
.isCompany(dept.getIsCompany())
.deptSource(dept.getDeptSource())
.leaderUserId(dept.getLeaderUserId())
.leaderUserName(dept.getLeaderUserId() != null ? userNameMap.get(dept.getLeaderUserId()) : null)
.phone(dept.getPhone())
.email(dept.getEmail())
.tenantId(dept.getTenantId())
.createTime(dept.getCreateTime())
.updateTime(dept.getUpdateTime())
.build();
}
}

View File

@@ -0,0 +1,148 @@
package com.zt.plat.module.system.api.databus;
import cn.hutool.core.collection.CollUtil;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.zt.plat.framework.common.pojo.CommonResult;
import com.zt.plat.module.databus.api.data.DatabusPostData;
import com.zt.plat.module.databus.api.dto.CursorPageReqDTO;
import com.zt.plat.module.databus.api.dto.CursorPageResult;
import com.zt.plat.module.databus.api.provider.DatabusPostProviderApi;
import com.zt.plat.module.system.dal.dataobject.dept.PostDO;
import com.zt.plat.module.system.dal.mysql.dept.PostMapper;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.RestController;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import static com.zt.plat.framework.common.pojo.CommonResult.success;
/**
* Databus 岗位数据提供者 API 实现
*
* @author ZT
*/
@Slf4j
@RestController
@Validated
public class DatabusPostProviderApiImpl implements DatabusPostProviderApi {
@Resource
private PostMapper postMapper;
@Override
public CommonResult<CursorPageResult<DatabusPostData>> getPageByCursor(CursorPageReqDTO reqDTO) {
// 构建游标查询条件
LambdaQueryWrapper<PostDO> queryWrapper = new LambdaQueryWrapper<>();
// 游标条件create_time > cursorTime OR (create_time = cursorTime AND id > cursorId)
if (!reqDTO.isFirstPage()) {
queryWrapper.and(w -> w
.gt(PostDO::getCreateTime, reqDTO.getCursorTime())
.or(o -> o
.eq(PostDO::getCreateTime, reqDTO.getCursorTime())
.gt(PostDO::getId, reqDTO.getCursorId())
)
);
}
// PostDO 不支持多租户,忽略租户过滤
// 按 create_time, id 升序排列,确保顺序稳定
queryWrapper.orderByAsc(PostDO::getCreateTime)
.orderByAsc(PostDO::getId);
// 多查一条判断是否有更多数据
int limit = reqDTO.getBatchSize() != null ? reqDTO.getBatchSize() : 100;
queryWrapper.last("LIMIT " + (limit + 1));
List<PostDO> postList = postMapper.selectList(queryWrapper);
// 判断是否有更多
boolean hasMore = postList.size() > limit;
if (hasMore) {
postList = postList.subList(0, limit);
}
if (CollUtil.isEmpty(postList)) {
return success(CursorPageResult.empty());
}
// 转换为同步数据
List<DatabusPostData> dataList = postList.stream()
.map(this::convertToDatabusPostData)
.collect(Collectors.toList());
// 获取最后一条数据的游标
PostDO lastPost = postList.get(postList.size() - 1);
// 首次查询时返回总数
Long total = null;
if (reqDTO.isFirstPage()) {
total = postMapper.selectCount(new LambdaQueryWrapper<>());
}
return success(CursorPageResult.of(
dataList,
lastPost.getCreateTime(),
lastPost.getId(),
hasMore,
total
));
}
@Override
public CommonResult<DatabusPostData> getById(Long id) {
PostDO post = postMapper.selectById(id);
if (post == null) {
return success(null);
}
return success(convertToDatabusPostData(post));
}
@Override
public CommonResult<List<DatabusPostData>> getListByIds(List<Long> ids) {
if (CollUtil.isEmpty(ids)) {
return success(Collections.emptyList());
}
List<PostDO> postList = postMapper.selectBatchIds(ids);
if (CollUtil.isEmpty(postList)) {
return success(Collections.emptyList());
}
List<DatabusPostData> dataList = postList.stream()
.map(this::convertToDatabusPostData)
.collect(Collectors.toList());
return success(dataList);
}
@Override
public CommonResult<Long> count(Long tenantId) {
// PostDO 不支持多租户,忽略租户参数
return success(postMapper.selectCount(new LambdaQueryWrapper<>()));
}
/**
* 将 PostDO 转换为 DatabusPostData
*/
private DatabusPostData convertToDatabusPostData(PostDO post) {
return DatabusPostData.builder()
.id(post.getId())
.code(post.getCode())
.name(post.getName())
.sort(post.getSort())
.status(post.getStatus())
.remark(post.getRemark())
.tenantId(null) // PostDO 不支持多租户
.createTime(post.getCreateTime())
.updateTime(post.getUpdateTime())
.build();
}
}

View File

@@ -0,0 +1,267 @@
package com.zt.plat.module.system.api.databus;
import cn.hutool.core.collection.CollUtil;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.zt.plat.framework.common.pojo.CommonResult;
import com.zt.plat.module.databus.api.data.DatabusAdminUserData;
import com.zt.plat.module.databus.api.data.DatabusAdminUserData.DeptSimpleInfo;
import com.zt.plat.module.databus.api.data.DatabusAdminUserData.PostSimpleInfo;
import com.zt.plat.module.databus.api.dto.CursorPageReqDTO;
import com.zt.plat.module.databus.api.dto.CursorPageResult;
import com.zt.plat.module.databus.api.provider.DatabusUserProviderApi;
import com.zt.plat.module.system.dal.dataobject.dept.DeptDO;
import com.zt.plat.module.system.dal.dataobject.dept.PostDO;
import com.zt.plat.module.system.dal.dataobject.user.AdminUserDO;
import com.zt.plat.module.system.dal.dataobject.userdept.UserDeptDO;
import com.zt.plat.module.system.dal.mysql.dept.DeptMapper;
import com.zt.plat.module.system.dal.mysql.dept.PostMapper;
import com.zt.plat.module.system.dal.mysql.user.AdminUserMapper;
import com.zt.plat.module.system.dal.mysql.userdept.UserDeptMapper;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.RestController;
import java.util.*;
import java.util.stream.Collectors;
import static com.zt.plat.framework.common.pojo.CommonResult.success;
/**
* Databus 用户数据提供者 API 实现
*
* @author ZT
*/
@Slf4j
@RestController
@Validated
public class DatabusUserProviderApiImpl implements DatabusUserProviderApi {
@Resource
private AdminUserMapper userMapper;
@Resource
private UserDeptMapper userDeptMapper;
@Resource
private DeptMapper deptMapper;
@Resource
private PostMapper postMapper;
@Override
public CommonResult<CursorPageResult<DatabusAdminUserData>> getPageByCursor(CursorPageReqDTO reqDTO) {
// 构建游标查询条件
LambdaQueryWrapper<AdminUserDO> queryWrapper = new LambdaQueryWrapper<>();
// 游标条件create_time > cursorTime OR (create_time = cursorTime AND id > cursorId)
if (!reqDTO.isFirstPage()) {
queryWrapper.and(w -> w
.gt(AdminUserDO::getCreateTime, reqDTO.getCursorTime())
.or(o -> o
.eq(AdminUserDO::getCreateTime, reqDTO.getCursorTime())
.gt(AdminUserDO::getId, reqDTO.getCursorId())
)
);
}
// 租户过滤(如果指定)
if (reqDTO.getTenantId() != null) {
queryWrapper.eq(AdminUserDO::getTenantId, reqDTO.getTenantId());
}
// 按 create_time, id 升序排列,确保顺序稳定
queryWrapper.orderByAsc(AdminUserDO::getCreateTime)
.orderByAsc(AdminUserDO::getId);
// 多查一条判断是否有更多数据
int limit = reqDTO.getBatchSize() != null ? reqDTO.getBatchSize() : 100;
queryWrapper.last("LIMIT " + (limit + 1));
List<AdminUserDO> userList = userMapper.selectList(queryWrapper);
// 判断是否有更多
boolean hasMore = userList.size() > limit;
if (hasMore) {
userList = userList.subList(0, limit);
}
if (CollUtil.isEmpty(userList)) {
return success(CursorPageResult.empty());
}
// 批量聚合用户关联数据
List<DatabusAdminUserData> dataList = aggregateUserData(userList);
// 获取最后一条数据的游标
AdminUserDO lastUser = userList.get(userList.size() - 1);
// 首次查询时返回总数
Long total = null;
if (reqDTO.isFirstPage()) {
LambdaQueryWrapper<AdminUserDO> countWrapper = new LambdaQueryWrapper<>();
if (reqDTO.getTenantId() != null) {
countWrapper.eq(AdminUserDO::getTenantId, reqDTO.getTenantId());
}
total = userMapper.selectCount(countWrapper);
}
return success(CursorPageResult.of(
dataList,
lastUser.getCreateTime(),
lastUser.getId(),
hasMore,
total
));
}
@Override
public CommonResult<DatabusAdminUserData> getById(Long id) {
AdminUserDO user = userMapper.selectById(id);
if (user == null) {
return success(null);
}
List<DatabusAdminUserData> dataList = aggregateUserData(Collections.singletonList(user));
return success(dataList.isEmpty() ? null : dataList.get(0));
}
@Override
public CommonResult<List<DatabusAdminUserData>> getListByIds(List<Long> ids) {
if (CollUtil.isEmpty(ids)) {
return success(Collections.emptyList());
}
List<AdminUserDO> userList = userMapper.selectBatchIds(ids);
if (CollUtil.isEmpty(userList)) {
return success(Collections.emptyList());
}
return success(aggregateUserData(userList));
}
@Override
public CommonResult<Long> count(Long tenantId) {
LambdaQueryWrapper<AdminUserDO> queryWrapper = new LambdaQueryWrapper<>();
if (tenantId != null) {
queryWrapper.eq(AdminUserDO::getTenantId, tenantId);
}
return success(userMapper.selectCount(queryWrapper));
}
/**
* 批量聚合用户关联数据(部门、岗位)
*/
private List<DatabusAdminUserData> aggregateUserData(List<AdminUserDO> userList) {
if (CollUtil.isEmpty(userList)) {
return Collections.emptyList();
}
// 收集所有用户ID
List<Long> userIds = userList.stream()
.map(AdminUserDO::getId)
.collect(Collectors.toList());
// 批量查询用户-部门关系
List<UserDeptDO> userDeptList = userDeptMapper.selectValidListByUserIds(userIds);
Map<Long, List<Long>> userDeptIdsMap = userDeptList.stream()
.collect(Collectors.groupingBy(
UserDeptDO::getUserId,
Collectors.mapping(UserDeptDO::getDeptId, Collectors.toList())
));
// 收集所有部门ID
Set<Long> allDeptIds = userDeptList.stream()
.map(UserDeptDO::getDeptId)
.collect(Collectors.toSet());
// 批量查询部门信息
Map<Long, DeptDO> deptMap = new HashMap<>();
if (CollUtil.isNotEmpty(allDeptIds)) {
List<DeptDO> deptList = deptMapper.selectBatchIds(allDeptIds);
deptMap = deptList.stream()
.collect(Collectors.toMap(DeptDO::getId, d -> d, (v1, v2) -> v1));
}
// 收集所有岗位ID
Set<Long> allPostIds = userList.stream()
.map(AdminUserDO::getPostIds)
.filter(Objects::nonNull)
.flatMap(Set::stream)
.collect(Collectors.toSet());
// 批量查询岗位信息
Map<Long, PostDO> postMap = new HashMap<>();
if (CollUtil.isNotEmpty(allPostIds)) {
List<PostDO> postList = postMapper.selectBatchIds(allPostIds);
postMap = postList.stream()
.collect(Collectors.toMap(PostDO::getId, p -> p, (v1, v2) -> v1));
}
// 转换为同步数据
Map<Long, DeptDO> finalDeptMap = deptMap;
Map<Long, PostDO> finalPostMap = postMap;
return userList.stream()
.map(user -> convertToDatabusAdminUserData(user, userDeptIdsMap, finalDeptMap, finalPostMap))
.collect(Collectors.toList());
}
/**
* 将 AdminUserDO 转换为 DatabusAdminUserData
*/
private DatabusAdminUserData convertToDatabusAdminUserData(AdminUserDO user,
Map<Long, List<Long>> userDeptIdsMap,
Map<Long, DeptDO> deptMap,
Map<Long, PostDO> postMap) {
// 获取用户的部门ID列表
List<Long> deptIds = userDeptIdsMap.getOrDefault(user.getId(), Collections.emptyList());
// 构建部门简要信息列表
List<DeptSimpleInfo> depts = deptIds.stream()
.map(deptMap::get)
.filter(Objects::nonNull)
.map(dept -> DeptSimpleInfo.builder()
.deptId(dept.getId())
.deptCode(dept.getCode())
.deptName(dept.getName())
.build())
.collect(Collectors.toList());
// 获取用户的岗位ID列表
Set<Long> postIds = user.getPostIds();
List<Long> postIdList = postIds != null ? new ArrayList<>(postIds) : Collections.emptyList();
// 构建岗位简要信息列表
List<PostSimpleInfo> posts = postIdList.stream()
.map(postMap::get)
.filter(Objects::nonNull)
.map(post -> PostSimpleInfo.builder()
.postId(post.getId())
.postCode(post.getCode())
.postName(post.getName())
.build())
.collect(Collectors.toList());
return DatabusAdminUserData.builder()
.id(user.getId())
.username(user.getUsername())
.nickname(user.getNickname())
.mobile(user.getMobile())
.email(user.getEmail())
.sex(user.getSex())
.avatar(user.getAvatar())
.status(user.getStatus())
.remark(user.getRemark())
.userSource(user.getUserSource())
.deptIds(deptIds)
.depts(depts)
.postIds(postIdList)
.posts(posts)
.tenantId(user.getTenantId())
.createTime(user.getCreateTime())
.updateTime(user.getUpdateTime())
.build();
}
}

View File

@@ -0,0 +1,193 @@
package com.zt.plat.module.system.mq.producer.databus;
import com.zt.plat.module.system.api.mq.DatabusDeptChangeMessage;
import com.zt.plat.module.system.api.mq.DatabusPostChangeMessage;
import com.zt.plat.module.system.api.mq.DatabusUserChangeMessage;
import com.zt.plat.module.system.dal.dataobject.dept.DeptDO;
import com.zt.plat.module.system.dal.dataobject.dept.PostDO;
import com.zt.plat.module.system.dal.dataobject.user.AdminUserDO;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.stereotype.Component;
import jakarta.annotation.Resource;
/**
* Databus 数据变更消息生产者
* <p>
* 用于发送部门、用户、岗位变更消息到 MQ供 databus-server 消费
*
* @author ZT
*/
@Slf4j
@Component
public class DatabusChangeProducer {
@Resource
private RocketMQTemplate rocketMQTemplate;
// ==================== 部门变更消息 ====================
/**
* 发送部门创建消息
*/
public void sendDeptCreatedMessage(DeptDO dept) {
DatabusDeptChangeMessage message = DatabusDeptChangeMessage.create(
dept.getId(), dept.getCode(), dept.getName(), dept.getShortName(),
dept.getParentId(), dept.getSort(), dept.getLeaderUserId(), dept.getPhone(),
dept.getEmail(), dept.getStatus(), dept.getIsCompany(), dept.getIsGroup(),
dept.getDeptSource(), dept.getTenantId(), dept.getCreateTime());
sendDeptChangeMessage(message);
}
/**
* 发送部门更新消息
*/
public void sendDeptUpdatedMessage(DeptDO dept) {
DatabusDeptChangeMessage message = DatabusDeptChangeMessage.update(
dept.getId(), dept.getCode(), dept.getName(), dept.getShortName(),
dept.getParentId(), dept.getSort(), dept.getLeaderUserId(), dept.getPhone(),
dept.getEmail(), dept.getStatus(), dept.getIsCompany(), dept.getIsGroup(),
dept.getDeptSource(), dept.getTenantId(), dept.getUpdateTime());
sendDeptChangeMessage(message);
}
/**
* 发送部门删除消息
*/
public void sendDeptDeletedMessage(Long deptId, Long tenantId) {
DatabusDeptChangeMessage message = DatabusDeptChangeMessage.delete(deptId, tenantId);
sendDeptChangeMessage(message);
}
private void sendDeptChangeMessage(DatabusDeptChangeMessage message) {
try {
rocketMQTemplate.asyncSend(DatabusDeptChangeMessage.TOPIC, message, new org.apache.rocketmq.client.producer.SendCallback() {
@Override
public void onSuccess(org.apache.rocketmq.client.producer.SendResult sendResult) {
log.info("[Databus] 部门变更消息发送成功, action={}, deptId={}, msgId={}",
message.getAction(), message.getDeptId(), sendResult.getMsgId());
}
@Override
public void onException(Throwable e) {
log.error("[Databus] 部门变更消息发送失败, action={}, deptId={}",
message.getAction(), message.getDeptId(), e);
}
});
} catch (Exception e) {
log.error("[Databus] 部门变更消息发送异常, action={}, deptId={}",
message.getAction(), message.getDeptId(), e);
}
}
// ==================== 用户变更消息 ====================
/**
* 发送用户创建消息(完整数据)
*/
public void sendUserCreatedMessage(AdminUserDO user) {
DatabusUserChangeMessage message = DatabusUserChangeMessage.create(
user.getId(), user.getUsername(), user.getNickname(),
user.getRemark(), user.getDeptIds(), user.getPostIds(),
user.getEmail(), user.getMobile(), user.getSex(), user.getAvatar(),
user.getStatus(), user.getUserSource(),
user.getTenantId(), user.getCreateTime());
sendUserChangeMessage(message);
}
/**
* 发送用户更新消息(完整数据)
*/
public void sendUserUpdatedMessage(AdminUserDO user) {
DatabusUserChangeMessage message = DatabusUserChangeMessage.update(
user.getId(), user.getUsername(), user.getNickname(),
user.getRemark(), user.getDeptIds(), user.getPostIds(),
user.getEmail(), user.getMobile(), user.getSex(), user.getAvatar(),
user.getStatus(), user.getUserSource(),
user.getTenantId(), user.getUpdateTime());
sendUserChangeMessage(message);
}
/**
* 发送用户删除消息
*/
public void sendUserDeletedMessage(Long userId, Long tenantId) {
DatabusUserChangeMessage message = DatabusUserChangeMessage.delete(userId, tenantId);
sendUserChangeMessage(message);
}
private void sendUserChangeMessage(DatabusUserChangeMessage message) {
try {
rocketMQTemplate.asyncSend(DatabusUserChangeMessage.TOPIC, message, new org.apache.rocketmq.client.producer.SendCallback() {
@Override
public void onSuccess(org.apache.rocketmq.client.producer.SendResult sendResult) {
log.info("[Databus] 用户变更消息发送成功, action={}, userId={}, msgId={}",
message.getAction(), message.getUserId(), sendResult.getMsgId());
}
@Override
public void onException(Throwable e) {
log.error("[Databus] 用户变更消息发送失败, action={}, userId={}",
message.getAction(), message.getUserId(), e);
}
});
} catch (Exception e) {
log.error("[Databus] 用户变更消息发送异常, action={}, userId={}",
message.getAction(), message.getUserId(), e);
}
}
// ==================== 岗位变更消息 ====================
/**
* 发送岗位创建消息
*/
public void sendPostCreatedMessage(PostDO post) {
DatabusPostChangeMessage message = DatabusPostChangeMessage.create(
post.getId(), post.getCode(), post.getName(),
post.getSort(), post.getStatus(), post.getRemark(),
post.getCreateTime());
sendPostChangeMessage(message);
}
/**
* 发送岗位更新消息
*/
public void sendPostUpdatedMessage(PostDO post) {
DatabusPostChangeMessage message = DatabusPostChangeMessage.update(
post.getId(), post.getCode(), post.getName(),
post.getSort(), post.getStatus(), post.getRemark(),
post.getUpdateTime());
sendPostChangeMessage(message);
}
/**
* 发送岗位删除消息
*/
public void sendPostDeletedMessage(Long postId) {
DatabusPostChangeMessage message = DatabusPostChangeMessage.delete(postId);
sendPostChangeMessage(message);
}
private void sendPostChangeMessage(DatabusPostChangeMessage message) {
try {
rocketMQTemplate.asyncSend(DatabusPostChangeMessage.TOPIC, message, new org.apache.rocketmq.client.producer.SendCallback() {
@Override
public void onSuccess(org.apache.rocketmq.client.producer.SendResult sendResult) {
log.info("[Databus] 岗位变更消息发送成功, action={}, postId={}, msgId={}",
message.getAction(), message.getPostId(), sendResult.getMsgId());
}
@Override
public void onException(Throwable e) {
log.error("[Databus] 岗位变更消息发送失败, action={}, postId={}",
message.getAction(), message.getPostId(), e);
}
});
} catch (Exception e) {
log.error("[Databus] 岗位变更消息发送异常, action={}, postId={}",
message.getAction(), message.getPostId(), e);
}
}
}

View File

@@ -50,6 +50,8 @@ public class DeptServiceImpl implements DeptService {
private DeptMapper deptMapper;
@Resource
private UserDeptMapper userDeptMapper;
@Resource
private com.zt.plat.module.system.mq.producer.databus.DatabusChangeProducer databusChangeProducer;
private static final String ROOT_CODE_PREFIX = "ZT";
private static final int CODE_SEGMENT_LENGTH = 3;
@@ -99,6 +101,10 @@ public class DeptServiceImpl implements DeptService {
dept.setDeptSource(DeptSourceEnum.EXTERNAL.getSource());
}
deptMapper.insert(dept);
// 发布部门创建事件
databusChangeProducer.sendDeptCreatedMessage(dept);
return dept.getId();
}
@@ -153,6 +159,12 @@ public class DeptServiceImpl implements DeptService {
DeptDO updateObj = BeanUtils.toBean(updateReqVO, DeptDO.class);
deptMapper.updateById(updateObj);
// 发布部门更新事件(重新查询获取完整数据)
DeptDO updatedDept = deptMapper.selectById(updateObj.getId());
if (updatedDept != null) {
databusChangeProducer.sendDeptUpdatedMessage(updatedDept);
}
if (parentChanged) {
refreshChildCodesRecursively(updateObj.getId(), updateReqVO.getCode());
}
@@ -168,8 +180,16 @@ public class DeptServiceImpl implements DeptService {
if (deptMapper.selectCountByParentId(id) > 0) {
throw exception(DEPT_EXITS_CHILDREN);
}
// 删除前先查询部门信息(用于发布事件)
DeptDO dept = deptMapper.selectById(id);
Long tenantId = (dept != null) ? dept.getTenantId() : null;
// 删除部门
deptMapper.deleteById(id);
// 发布部门删除事件
databusChangeProducer.sendDeptDeletedMessage(id, tenantId);
}
@VisibleForTesting

View File

@@ -34,6 +34,9 @@ public class PostServiceImpl implements PostService {
@Resource
private PostMapper postMapper;
@Resource
private com.zt.plat.module.system.mq.producer.databus.DatabusChangeProducer databusChangeProducer;
@Override
public Long createPost(PostSaveReqVO createReqVO) {
// 校验正确性
@@ -42,6 +45,10 @@ public class PostServiceImpl implements PostService {
// 插入岗位
PostDO post = BeanUtils.toBean(createReqVO, PostDO.class);
postMapper.insert(post);
// 发布岗位创建事件
databusChangeProducer.sendPostCreatedMessage(post);
return post.getId();
}
@@ -53,14 +60,24 @@ public class PostServiceImpl implements PostService {
// 更新岗位
PostDO updateObj = BeanUtils.toBean(updateReqVO, PostDO.class);
postMapper.updateById(updateObj);
// 发布岗位更新事件(重新查询获取完整数据)
PostDO updatedPost = postMapper.selectById(updateObj.getId());
if (updatedPost != null) {
databusChangeProducer.sendPostUpdatedMessage(updatedPost);
}
}
@Override
public void deletePost(Long id) {
// 校验是否存在
validatePostExists(id);
// 删除部门
postMapper.deleteById(id);
// 发布岗位删除事件PostDO 不支持多租户tenantId 为 null
databusChangeProducer.sendPostDeletedMessage(id);
}
private void validatePostForCreateOrUpdate(Long id, String name, String code) {

View File

@@ -93,6 +93,9 @@ public class AdminUserServiceImpl implements AdminUserService {
@Resource
private ConfigApi configApi;
@Resource
private com.zt.plat.module.system.mq.producer.databus.DatabusChangeProducer databusChangeProducer;
@Resource
private UserDeptService userDeptService;
@@ -140,6 +143,9 @@ public class AdminUserServiceImpl implements AdminUserService {
postId -> new UserPostDO().setUserId(user.getId()).setPostId(postId)));
}
// 2.4 发布用户创建事件
databusChangeProducer.sendUserCreatedMessage(user);
// 3. 记录操作日志上下文
LogRecordContext.putVariable("user", user);
return user.getId();
@@ -228,6 +234,12 @@ public class AdminUserServiceImpl implements AdminUserService {
// 2.3 更新岗位
updateUserPost(updateReqVO, updateObj);
// 2.4 发布用户更新事件(重新查询获取完整数据)
AdminUserDO updatedUser = userMapper.selectById(updateObj.getId());
if (updatedUser != null) {
databusChangeProducer.sendUserUpdatedMessage(updatedUser);
}
// 3. 记录操作日志上下文
LogRecordContext.putVariable(DiffParseFunction.OLD_OBJECT, BeanUtils.toBean(oldUser, UserSaveReqVO.class));
LogRecordContext.putVariable("user", oldUser);
@@ -317,6 +329,9 @@ public class AdminUserServiceImpl implements AdminUserService {
// 2.2 删除用户岗位
userPostMapper.deleteByUserId(id);
// 2.3 发布用户删除事件
databusChangeProducer.sendUserDeletedMessage(id, user.getTenantId());
// 3. 记录操作日志上下文
LogRecordContext.putVariable("user", user);
}