update:数据同步分发机构岗位绑定关系

This commit is contained in:
hewencai
2025-12-16 12:03:15 +08:00
parent df3e80d907
commit ba1bc1fb6f
31 changed files with 1278 additions and 669 deletions

View File

@@ -0,0 +1,110 @@
package com.zt.plat.module.system.mq.producer.databus;
import cn.hutool.core.util.IdUtil;
import com.zt.plat.module.databus.api.data.DatabusUserDeptData;
import com.zt.plat.module.databus.api.message.DatabusMessage;
import com.zt.plat.module.databus.enums.DatabusEventType;
import com.zt.plat.module.system.dal.dataobject.userdept.UserDeptDO;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
/**
* 用户-部门关系变更消息 Producer
* <p>
* 负责发送用户与部门的关联关系变更事件
*
* @author ZT
*/
@Slf4j
@Component
public class DatabusUserDeptChangeProducer {
@Resource
private RocketMQTemplate rocketMQTemplate;
@Value("${zt.databus.change.topic-prefix:databus-change}")
private String topicPrefix;
private static final String TOPIC_SUFFIX = "-system-user-dept";
/**
* 发送用户-部门关系创建消息
*
* @param userDept 用户-部门关系对象
*/
public void sendUserDeptCreatedMessage(UserDeptDO userDept) {
try {
DatabusUserDeptData data = buildUserDeptData(userDept);
sendMessage(DatabusEventType.SYSTEM_USER_DEPT_CREATE, data);
log.info("[DatabusUserDeptChange] 发送用户-部门关系创建消息, userId={}, deptId={}",
userDept.getUserId(), userDept.getDeptId());
} catch (Exception e) {
log.error("[DatabusUserDeptChange] 发送用户-部门关系创建消息失败, userId={}, deptId={}",
userDept.getUserId(), userDept.getDeptId(), e);
}
}
/**
* 发送用户-部门关系更新消息
*
* @param userDept 用户-部门关系对象
*/
public void sendUserDeptUpdatedMessage(UserDeptDO userDept) {
try {
DatabusUserDeptData data = buildUserDeptData(userDept);
sendMessage(DatabusEventType.SYSTEM_USER_DEPT_UPDATE, data);
log.info("[DatabusUserDeptChange] 发送用户-部门关系更新消息, userId={}, deptId={}",
userDept.getUserId(), userDept.getDeptId());
} catch (Exception e) {
log.error("[DatabusUserDeptChange] 发送用户-部门关系更新消息失败, userId={}, deptId={}",
userDept.getUserId(), userDept.getDeptId(), e);
}
}
/**
* 发送用户-部门关系删除消息
*
* @param userDept 用户-部门关系对象(删除前查询的)
*/
public void sendUserDeptDeletedMessage(UserDeptDO userDept) {
try {
DatabusUserDeptData data = buildUserDeptData(userDept);
sendMessage(DatabusEventType.SYSTEM_USER_DEPT_DELETE, data);
log.info("[DatabusUserDeptChange] 发送用户-部门关系删除消息, userId={}, deptId={}",
userDept.getUserId(), userDept.getDeptId());
} catch (Exception e) {
log.error("[DatabusUserDeptChange] 发送用户-部门关系删除消息失败, userId={}, deptId={}",
userDept.getUserId(), userDept.getDeptId(), e);
}
}
/**
* 构建用户-部门关系数据
*/
private DatabusUserDeptData buildUserDeptData(UserDeptDO userDept) {
return DatabusUserDeptData.builder()
.id(userDept.getId())
.userId(userDept.getUserId())
.deptId(userDept.getDeptId())
.tenantId(userDept.getTenantId())
.remark(userDept.getRemark())
.build();
}
/**
* 发送消息到 MQ
*/
private void sendMessage(DatabusEventType eventType, DatabusUserDeptData data) {
DatabusMessage<DatabusUserDeptData> message = new DatabusMessage<>();
message.setEventType(eventType);
message.setData(data);
message.setTimestamp(java.time.LocalDateTime.now());
message.setMessageId(IdUtil.fastSimpleUUID());
String topic = topicPrefix + TOPIC_SUFFIX;
rocketMQTemplate.syncSend(topic, message);
}
}

View File

@@ -0,0 +1,108 @@
package com.zt.plat.module.system.mq.producer.databus;
import cn.hutool.core.util.IdUtil;
import com.zt.plat.module.databus.api.data.DatabusUserPostData;
import com.zt.plat.module.databus.api.message.DatabusMessage;
import com.zt.plat.module.databus.enums.DatabusEventType;
import com.zt.plat.module.system.dal.dataobject.dept.UserPostDO;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
/**
* 用户-岗位关系变更消息 Producer
* <p>
* 负责发送用户与岗位的关联关系变更事件
*
* @author ZT
*/
@Slf4j
@Component
public class DatabusUserPostChangeProducer {
@Resource
private RocketMQTemplate rocketMQTemplate;
@Value("${zt.databus.change.topic-prefix:databus-change}")
private String topicPrefix;
private static final String TOPIC_SUFFIX = "-system-user-post";
/**
* 发送用户-岗位关系创建消息
*
* @param userPost 用户-岗位关系对象
*/
public void sendUserPostCreatedMessage(UserPostDO userPost) {
try {
DatabusUserPostData data = buildUserPostData(userPost);
sendMessage(DatabusEventType.SYSTEM_USER_POST_CREATE, data);
log.info("[DatabusUserPostChange] 发送用户-岗位关系创建消息, userId={}, postId={}",
userPost.getUserId(), userPost.getPostId());
} catch (Exception e) {
log.error("[DatabusUserPostChange] 发送用户-岗位关系创建消息失败, userId={}, postId={}",
userPost.getUserId(), userPost.getPostId(), e);
}
}
/**
* 发送用户-岗位关系更新消息
*
* @param userPost 用户-岗位关系对象
*/
public void sendUserPostUpdatedMessage(UserPostDO userPost) {
try {
DatabusUserPostData data = buildUserPostData(userPost);
sendMessage(DatabusEventType.SYSTEM_USER_POST_UPDATE, data);
log.info("[DatabusUserPostChange] 发送用户-岗位关系更新消息, userId={}, postId={}",
userPost.getUserId(), userPost.getPostId());
} catch (Exception e) {
log.error("[DatabusUserPostChange] 发送用户-岗位关系更新消息失败, userId={}, postId={}",
userPost.getUserId(), userPost.getPostId(), e);
}
}
/**
* 发送用户-岗位关系删除消息
*
* @param userPost 用户-岗位关系对象(删除前查询的)
*/
public void sendUserPostDeletedMessage(UserPostDO userPost) {
try {
DatabusUserPostData data = buildUserPostData(userPost);
sendMessage(DatabusEventType.SYSTEM_USER_POST_DELETE, data);
log.info("[DatabusUserPostChange] 发送用户-岗位关系删除消息, userId={}, postId={}",
userPost.getUserId(), userPost.getPostId());
} catch (Exception e) {
log.error("[DatabusUserPostChange] 发送用户-岗位关系删除消息失败, userId={}, postId={}",
userPost.getUserId(), userPost.getPostId(), e);
}
}
/**
* 构建用户-岗位关系数据
*/
private DatabusUserPostData buildUserPostData(UserPostDO userPost) {
return DatabusUserPostData.builder()
.id(userPost.getId())
.userId(userPost.getUserId())
.postId(userPost.getPostId())
.build();
}
/**
* 发送消息到 MQ
*/
private void sendMessage(DatabusEventType eventType, DatabusUserPostData data) {
DatabusMessage<DatabusUserPostData> message = new DatabusMessage<>();
message.setEventType(eventType);
message.setData(data);
message.setTimestamp(java.time.LocalDateTime.now());
message.setMessageId(IdUtil.fastSimpleUUID());
String topic = topicPrefix + TOPIC_SUFFIX;
rocketMQTemplate.syncSend(topic, message);
}
}

View File

@@ -29,6 +29,7 @@ import com.zt.plat.module.system.dal.mysql.dept.UserPostMapper;
import com.zt.plat.module.system.dal.mysql.user.AdminUserMapper;
import com.zt.plat.module.system.enums.user.PasswordStrategyEnum;
import com.zt.plat.module.system.enums.user.UserSourceEnum;
import com.zt.plat.module.system.mq.producer.databus.DatabusChangeProducer;
import com.zt.plat.module.system.service.dept.DeptService;
import com.zt.plat.module.system.service.dept.PostService;
import com.zt.plat.module.system.service.permission.PermissionService;
@@ -93,11 +94,10 @@ public class AdminUserServiceImpl implements AdminUserService {
@Resource
private ConfigApi configApi;
@Resource
private com.zt.plat.module.system.mq.producer.databus.DatabusChangeProducer databusChangeProducer;
@Resource
private UserDeptService userDeptService;
@Resource
private DatabusChangeProducer databusChangeProducer;
@Override
@GlobalTransactional(rollbackFor = Exception.class)
@@ -128,7 +128,6 @@ public class AdminUserServiceImpl implements AdminUserService {
if (user.getUserSource() == null) {
user.setUserSource(UserSourceEnum.EXTERNAL.getSource());
}
user.setWorkcode(normalizeWorkcode(createReqVO.getWorkcode()));
PasswordStrategyEnum passwordStrategy = determinePasswordStrategy(user.getUserSource());
user.setAvatar(normalizeAvatarValue(createReqVO.getAvatar()));
user.setPassword(encodePassword(createReqVO.getPassword(), passwordStrategy));
@@ -144,10 +143,10 @@ public class AdminUserServiceImpl implements AdminUserService {
postId -> new UserPostDO().setUserId(user.getId()).setPostId(postId)));
}
// 2.4用户创建事件
// 3.用户创建消息到MQ供Databus消费转发
databusChangeProducer.sendUserCreatedMessage(user);
// 3. 记录操作日志上下文
// 4. 记录操作日志上下文
LogRecordContext.putVariable("user", user);
return user.getId();
}
@@ -237,13 +236,13 @@ public class AdminUserServiceImpl implements AdminUserService {
// 2.3 更新岗位
updateUserPost(updateReqVO, updateObj);
// 2.4用户更新事件(重新查询获取完整数据
// 3.用户更新消息到MQ供Databus消费转发
AdminUserDO updatedUser = userMapper.selectById(updateObj.getId());
if (updatedUser != null) {
databusChangeProducer.sendUserUpdatedMessage(updatedUser);
}
// 3. 记录操作日志上下文
// 4. 记录操作日志上下文
LogRecordContext.putVariable(DiffParseFunction.OLD_OBJECT, BeanUtils.toBean(oldUser, UserSaveReqVO.class));
LogRecordContext.putVariable("user", oldUser);
}
@@ -332,10 +331,10 @@ public class AdminUserServiceImpl implements AdminUserService {
// 2.2 删除用户岗位
userPostMapper.deleteByUserId(id);
// 2.3 发用户删除事件
// 3.用户删除消息到MQ供Databus消费转发
databusChangeProducer.sendUserDeletedMessage(id, user.getTenantId());
// 3. 记录操作日志上下文
// 4. 记录操作日志上下文
LogRecordContext.putVariable("user", user);
}

View File

@@ -6,6 +6,7 @@ import com.zt.plat.framework.security.core.LoginUser;
import com.zt.plat.module.system.dal.dataobject.userdept.UserDeptDO;
import com.zt.plat.module.system.dal.mysql.userdept.UserDeptMapper;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.validation.annotation.Validated;
@@ -23,18 +24,29 @@ import static com.zt.plat.module.system.enums.ErrorCodeConstants.USER_DEPT_NOT_E
*
* @author 管理员
*/
@Slf4j
@Service
@Validated
public class UserDeptServiceImpl implements UserDeptService {
@Resource
private UserDeptMapper userDeptMapper;
@Resource
private com.zt.plat.module.system.mq.producer.databus.DatabusUserDeptChangeProducer databusUserDeptChangeProducer;
@Override
public Long createUserDept(UserDeptDO createReqVO) {
// 插入
UserDeptDO userDept = BeanUtils.toBean(createReqVO, UserDeptDO.class);
userDeptMapper.insert(userDept);
// 发送用户-部门关系创建消息
try {
databusUserDeptChangeProducer.sendUserDeptCreatedMessage(userDept);
} catch (Exception e) {
log.error("[createUserDept] 发送用户-部门关系创建消息失败", e);
}
// 返回
return userDept.getId();
}
@@ -46,14 +58,34 @@ public class UserDeptServiceImpl implements UserDeptService {
// 更新
UserDeptDO updateObj = BeanUtils.toBean(updateReqVO, UserDeptDO.class);
userDeptMapper.updateById(updateObj);
// 发送用户-部门关系更新消息
try {
databusUserDeptChangeProducer.sendUserDeptUpdatedMessage(updateObj);
} catch (Exception e) {
log.error("[updateUserDept] 发送用户-部门关系更新消息失败", e);
}
}
@Override
public void deleteUserDept(Long id) {
// 校验存在
validateUserDeptExists(id);
// 查询完整对象(删除前查询,删除后无法查询)
UserDeptDO userDept = userDeptMapper.selectById(id);
// 删除
userDeptMapper.deleteById(id);
// 发送用户-部门关系删除消息
if (userDept != null) {
try {
databusUserDeptChangeProducer.sendUserDeptDeletedMessage(userDept);
} catch (Exception e) {
log.error("[deleteUserDept] 发送用户-部门关系删除消息失败", e);
}
}
}
@Override
@@ -111,6 +143,16 @@ public class UserDeptServiceImpl implements UserDeptService {
Long tenantId = Optional.ofNullable(getLoginUser()).orElse(new LoginUser()).getTenantId();
list.forEach(item -> item.setTenantId(tenantId));
userDeptMapper.insertBatch(list);
// 发送用户-部门关系创建消息(为每个关系发送)
for (UserDeptDO userDept : list) {
try {
databusUserDeptChangeProducer.sendUserDeptCreatedMessage(userDept);
} catch (Exception e) {
log.error("[batchCreateUserDept] 发送用户-部门关系创建消息失败, userId={}, deptId={}",
userDept.getUserId(), userDept.getDeptId(), e);
}
}
}
}