Merge branch 'dev' into 'test'

update:数据同步分发机构岗位绑定关系

See merge request jygk/dsc!9
This commit is contained in:
wencai he
2025-12-16 04:03:41 +00:00
31 changed files with 1278 additions and 669 deletions

View File

@@ -1,148 +0,0 @@
package com.zt.plat.framework.databus.client.core.listener;
import com.zt.plat.framework.databus.client.config.DatabusSyncClientProperties;
import com.zt.plat.framework.databus.client.core.processor.MessageProcessor;
import com.zt.plat.module.databus.enums.DatabusEventType;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
/**
* Databus Consumer 注册器
* <p>
* 根据 DatabusEventType 枚举自动为所有事件类型创建消费者
* Topic格式: {topicBase}-{module}-{entity}-{action}-{clientCode}
* 例如: databus-sync-system-user-create-company-a
*
* @author ZT
*/
@Slf4j
@Component
@ConditionalOnProperty(prefix = "zt.databus.sync.client.mq", name = "enabled", havingValue = "true", matchIfMissing = true)
public class DatabusConsumerRegistry {
private final MessageProcessor messageProcessor;
private final DatabusSyncClientProperties properties;
/**
* 管理的消费者列表
*/
private final List<DefaultMQPushConsumer> consumers = new ArrayList<>();
public DatabusConsumerRegistry(MessageProcessor messageProcessor, DatabusSyncClientProperties properties) {
this.messageProcessor = messageProcessor;
this.properties = properties;
}
@PostConstruct
public void init() {
if (!Boolean.TRUE.equals(properties.getEnabled())) {
log.info("[Databus Client] 客户端未启用,跳过初始化");
return;
}
String nameServer = properties.getMq().getNameServer();
if (nameServer == null || nameServer.isEmpty()) {
log.warn("[Databus Client] RocketMQ nameServer未配置跳过MQ消费者初始化");
return;
}
String clientCode = properties.getClientCode();
if (clientCode == null || clientCode.isEmpty()) {
log.error("[Databus Client] clientCode未配置无法订阅Topic");
return;
}
String topicBase = properties.getMq().getTopicBase();
String consumerGroupPrefix = properties.getMq().getConsumerGroupPrefix();
if (consumerGroupPrefix == null || consumerGroupPrefix.isEmpty()) {
consumerGroupPrefix = "databus-client-" + clientCode;
}
// 为每个事件类型创建独立的消费者
for (DatabusEventType eventType : DatabusEventType.values()) {
try {
createConsumer(eventType, topicBase, clientCode, consumerGroupPrefix, nameServer);
} catch (Exception e) {
log.error("[Databus Client] 创建消费者失败, eventType={}", eventType.name(), e);
}
}
log.info("[Databus Client] 消费者注册完成,共创建 {} 个消费者", consumers.size());
}
/**
* 为指定事件类型创建消费者
*/
private void createConsumer(DatabusEventType eventType, String topicBase, String clientCode, String consumerGroupPrefix, String nameServer) throws MQClientException {
// Topic: databus-sync-system-user-create-company-a
String topic = eventType.getTopic(topicBase, clientCode);
// ConsumerGroup: databus-client-company-a-system-user-create
String consumerGroup = String.format("%s-%s", consumerGroupPrefix, eventType.getTopicSuffix());
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
consumer.setNamesrvAddr(nameServer);
consumer.setConsumeThreadMin(properties.getMq().getConsumeThreadMin());
consumer.setConsumeThreadMax(properties.getMq().getConsumeThreadMax());
consumer.setMaxReconsumeTimes(properties.getMq().getMaxReconsumeTimes());
// 订阅Topic
consumer.subscribe(topic, "*");
// 设置消息监听器
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
for (MessageExt msg : msgs) {
try {
String messageBody = new String(msg.getBody(), StandardCharsets.UTF_8);
log.info("[Databus Client] 收到消息, topic={}, msgId={}, eventType={}",
msg.getTopic(), msg.getMsgId(), eventType.name());
messageProcessor.process(messageBody, eventType);
} catch (Exception e) {
log.error("[Databus Client] 消息处理失败, topic={}, msgId={}, reconsumeTimes={}",
msg.getTopic(), msg.getMsgId(), msg.getReconsumeTimes(), e);
if (msg.getReconsumeTimes() >= properties.getMq().getMaxReconsumeTimes()) {
log.error("[Databus Client] 重试次数已达上限,放弃处理, msgId={}", msg.getMsgId());
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
consumers.add(consumer);
log.info("[Databus Client] 消费者启动成功, topic={}, consumerGroup={}, event={}",
topic, consumerGroup, eventType.getName());
}
@PreDestroy
public void destroy() {
for (DefaultMQPushConsumer consumer : consumers) {
try {
consumer.shutdown();
} catch (Exception e) {
log.error("[Databus Client] 关闭消费者失败", e);
}
}
consumers.clear();
log.info("[Databus Client] 所有消费者已关闭");
}
}

View File

@@ -66,13 +66,15 @@ public class HandlerRegistry {
public void init() { public void init() {
log.info("[HandlerRegistry] 开始初始化 Handler 注册表..."); log.info("[HandlerRegistry] 开始初始化 Handler 注册表...");
// 注册增量同步 Handler // 注册增量同步 Handler支持一个Handler注册多个事件类型
if (syncEventHandlers != null) { if (syncEventHandlers != null) {
for (SyncEventHandler<?> handler : syncEventHandlers) { for (SyncEventHandler<?> handler : syncEventHandlers) {
DatabusEventType eventType = handler.getSupportedEventType(); List<DatabusEventType> eventTypes = handler.getSupportedEventTypes();
incrementalHandlers.put(eventType, handler); for (DatabusEventType eventType : eventTypes) {
log.info("[HandlerRegistry] 注册增量Handler: {} -> {}", incrementalHandlers.put(eventType, handler);
eventType, handler.getClass().getSimpleName()); log.info("[HandlerRegistry] 注册增量Handler: {} -> {}",
eventType, handler.getClass().getSimpleName());
}
} }
} }

View File

@@ -5,22 +5,49 @@ import com.zt.plat.module.databus.enums.DatabusEventType;
import java.lang.reflect.ParameterizedType; import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type; import java.lang.reflect.Type;
import java.util.Collections;
import java.util.List;
/** /**
* 同步事件处理器接口 * 同步事件处理器接口
* <p> * <p>
* 业务系统需要实现此接口来处理增量数据同步 * 业务系统需要实现此接口来处理增量数据同步
* <p>
* 支持单事件和多事件处理:
* - 单事件:实现 getSupportedEventType()
* - 多事件:实现 getSupportedEventTypes()推荐一个Handler处理多个操作
* *
* @author ZT * @author ZT
*/ */
public interface SyncEventHandler<T> { public interface SyncEventHandler<T> {
/** /**
* 获取支持的事件类型 * 获取支持的事件类型(单个)
* <p>
* 默认实现:返回 getSupportedEventTypes() 的第一个元素
* <p>
* 建议新Handler优先实现 getSupportedEventTypes()
* *
* @return 事件类型枚举 * @return 事件类型枚举
*/ */
DatabusEventType getSupportedEventType(); default DatabusEventType getSupportedEventType() {
List<DatabusEventType> types = getSupportedEventTypes();
return types.isEmpty() ? null : types.get(0);
}
/**
* 获取支持的事件类型列表(多个)
* <p>
* 默认实现:返回包含 getSupportedEventType() 的单元素列表
* <p>
* 子类可以覆盖此方法返回多个事件类型从而一个Handler处理多个操作
*
* @return 事件类型列表
*/
default List<DatabusEventType> getSupportedEventTypes() {
DatabusEventType type = getSupportedEventType();
return type == null ? Collections.emptyList() : Collections.singletonList(type);
}
/** /**
* 处理同步消息 * 处理同步消息

View File

@@ -1,50 +0,0 @@
package com.zt.plat.framework.databus.client.handler.dept;
import com.zt.plat.framework.databus.client.handler.SyncEventHandler;
import com.zt.plat.module.databus.api.data.DatabusDeptData;
import com.zt.plat.module.databus.api.message.DatabusMessage;
import com.zt.plat.module.databus.enums.DatabusEventType;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
/**
* 部门创建事件处理器
* <p>
* 使用条件:
* 1. zt.databus.sync.client.enabled=true
* 2. 存在 DeptSyncService Bean
*
* @author ZT
*/
@Slf4j
@Component
@ConditionalOnProperty(prefix = "zt.databus.sync.client", name = "enabled", havingValue = "true")
@ConditionalOnBean(DeptSyncService.class)
public class SystemDeptCreateHandler implements SyncEventHandler<DatabusDeptData> {
@Resource
private DeptSyncService deptSyncService;
@Override
public DatabusEventType getSupportedEventType() {
return DatabusEventType.SYSTEM_DEPT_CREATE;
}
@Override
public void handle(DatabusMessage<DatabusDeptData> message) {
DatabusDeptData data = message.getData();
log.info("[DeptSync] 收到部门创建事件, id={}, name={}, parentId={}",
data.getId(), data.getName(), data.getParentId());
try {
deptSyncService.create(data);
log.info("[DeptSync] 部门创建成功, id={}", data.getId());
} catch (Exception e) {
log.error("[DeptSync] 部门创建失败, id={}", data.getId(), e);
throw e; // 抛出异常触发重试
}
}
}

View File

@@ -1,49 +0,0 @@
package com.zt.plat.framework.databus.client.handler.dept;
import com.zt.plat.framework.databus.client.handler.SyncEventHandler;
import com.zt.plat.module.databus.api.data.DatabusDeptData;
import com.zt.plat.module.databus.api.message.DatabusMessage;
import com.zt.plat.module.databus.enums.DatabusEventType;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
/**
* 部门删除事件处理器
* <p>
* 使用条件:
* 1. zt.databus.sync.client.enabled=true
* 2. 存在 DeptSyncService Bean
*
* @author ZT
*/
@Slf4j
@Component
@ConditionalOnProperty(prefix = "zt.databus.sync.client", name = "enabled", havingValue = "true")
@ConditionalOnBean(DeptSyncService.class)
public class SystemDeptDeleteHandler implements SyncEventHandler<DatabusDeptData> {
@Resource
private DeptSyncService deptSyncService;
@Override
public DatabusEventType getSupportedEventType() {
return DatabusEventType.SYSTEM_DEPT_DELETE;
}
@Override
public void handle(DatabusMessage<DatabusDeptData> message) {
DatabusDeptData data = message.getData();
log.info("[DeptSync] 收到部门删除事件, id={}", data.getId());
try {
deptSyncService.delete(data.getId());
log.info("[DeptSync] 部门删除成功, id={}", data.getId());
} catch (Exception e) {
log.error("[DeptSync] 部门删除失败, id={}", data.getId(), e);
throw e; // 抛出异常触发重试
}
}
}

View File

@@ -0,0 +1,81 @@
package com.zt.plat.framework.databus.client.handler.dept;
import com.zt.plat.framework.databus.client.handler.SyncEventHandler;
import com.zt.plat.module.databus.api.data.DatabusDeptData;
import com.zt.plat.module.databus.api.message.DatabusMessage;
import com.zt.plat.module.databus.enums.DatabusEventType;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
import java.util.Arrays;
import java.util.List;
/**
* 部门增量同步事件处理器(合并版)
* <p>
* 处理事件类型:
* - SYSTEM_DEPT_CREATE部门创建
* - SYSTEM_DEPT_UPDATE部门更新
* - SYSTEM_DEPT_DELETE部门删除
* <p>
* 使用条件:
* 1. zt.databus.sync.client.enabled=true
* 2. 存在 DeptSyncService Bean
*
* @author ZT
*/
@Slf4j
@Component
@ConditionalOnProperty(prefix = "zt.databus.sync.client", name = "enabled", havingValue = "true")
@ConditionalOnBean(DeptSyncService.class)
public class SystemDeptHandler implements SyncEventHandler<DatabusDeptData> {
@Resource
private DeptSyncService deptSyncService;
@Override
public List<DatabusEventType> getSupportedEventTypes() {
return Arrays.asList(
DatabusEventType.SYSTEM_DEPT_CREATE,
DatabusEventType.SYSTEM_DEPT_UPDATE,
DatabusEventType.SYSTEM_DEPT_DELETE
);
}
@Override
public void handle(DatabusMessage<DatabusDeptData> message) {
DatabusDeptData data = message.getData();
DatabusEventType eventType = message.getEventType();
log.info("[DeptSync] 收到部门{}事件, id={}, name={}, parentId={}",
eventType.getAction(), data.getId(), data.getName(), data.getParentId());
try {
switch (eventType) {
case SYSTEM_DEPT_CREATE:
deptSyncService.create(data);
log.info("[DeptSync] 部门创建成功, id={}", data.getId());
break;
case SYSTEM_DEPT_UPDATE:
deptSyncService.update(data);
log.info("[DeptSync] 部门更新成功, id={}", data.getId());
break;
case SYSTEM_DEPT_DELETE:
deptSyncService.delete(data.getId());
log.info("[DeptSync] 部门删除成功, id={}", data.getId());
break;
default:
log.warn("[DeptSync] 未知事件类型: {}", eventType);
}
} catch (Exception e) {
log.error("[DeptSync] 部门{}失败, id={}", eventType.getAction(), data.getId(), e);
throw e; // 抛出异常触发重试
}
}
}

View File

@@ -1,49 +0,0 @@
package com.zt.plat.framework.databus.client.handler.dept;
import com.zt.plat.framework.databus.client.handler.SyncEventHandler;
import com.zt.plat.module.databus.api.data.DatabusDeptData;
import com.zt.plat.module.databus.api.message.DatabusMessage;
import com.zt.plat.module.databus.enums.DatabusEventType;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
/**
* 部门更新事件处理器
* <p>
* 使用条件:
* 1. zt.databus.sync.client.enabled=true
* 2. 存在 DeptSyncService Bean
*
* @author ZT
*/
@Slf4j
@Component
@ConditionalOnProperty(prefix = "zt.databus.sync.client", name = "enabled", havingValue = "true")
@ConditionalOnBean(DeptSyncService.class)
public class SystemDeptUpdateHandler implements SyncEventHandler<DatabusDeptData> {
@Resource
private DeptSyncService deptSyncService;
@Override
public DatabusEventType getSupportedEventType() {
return DatabusEventType.SYSTEM_DEPT_UPDATE;
}
@Override
public void handle(DatabusMessage<DatabusDeptData> message) {
DatabusDeptData data = message.getData();
log.info("[DeptSync] 收到部门更新事件, id={}, name={}", data.getId(), data.getName());
try {
deptSyncService.update(data);
log.info("[DeptSync] 部门更新成功, id={}", data.getId());
} catch (Exception e) {
log.error("[DeptSync] 部门更新失败, id={}", data.getId(), e);
throw e; // 抛出异常触发重试
}
}
}

View File

@@ -1,50 +0,0 @@
package com.zt.plat.framework.databus.client.handler.post;
import com.zt.plat.framework.databus.client.handler.SyncEventHandler;
import com.zt.plat.module.databus.api.data.DatabusPostData;
import com.zt.plat.module.databus.api.message.DatabusMessage;
import com.zt.plat.module.databus.enums.DatabusEventType;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
/**
* 岗位创建事件处理器
* <p>
* 使用条件:
* 1. zt.databus.sync.client.enabled=true
* 2. 存在 PostSyncService Bean
*
* @author ZT
*/
@Slf4j
@Component
@ConditionalOnProperty(prefix = "zt.databus.sync.client", name = "enabled", havingValue = "true")
@ConditionalOnBean(PostSyncService.class)
public class SystemPostCreateHandler implements SyncEventHandler<DatabusPostData> {
@Resource
private PostSyncService postSyncService;
@Override
public DatabusEventType getSupportedEventType() {
return DatabusEventType.SYSTEM_POST_CREATE;
}
@Override
public void handle(DatabusMessage<DatabusPostData> message) {
DatabusPostData data = message.getData();
log.info("[PostSync] 收到岗位创建事件, id={}, name={}, code={}",
data.getId(), data.getName(), data.getCode());
try {
postSyncService.create(data);
log.info("[PostSync] 岗位创建成功, id={}", data.getId());
} catch (Exception e) {
log.error("[PostSync] 岗位创建失败, id={}", data.getId(), e);
throw e; // 抛出异常触发重试
}
}
}

View File

@@ -1,49 +0,0 @@
package com.zt.plat.framework.databus.client.handler.post;
import com.zt.plat.framework.databus.client.handler.SyncEventHandler;
import com.zt.plat.module.databus.api.data.DatabusPostData;
import com.zt.plat.module.databus.api.message.DatabusMessage;
import com.zt.plat.module.databus.enums.DatabusEventType;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
/**
* 岗位删除事件处理器
* <p>
* 使用条件:
* 1. zt.databus.sync.client.enabled=true
* 2. 存在 PostSyncService Bean
*
* @author ZT
*/
@Slf4j
@Component
@ConditionalOnProperty(prefix = "zt.databus.sync.client", name = "enabled", havingValue = "true")
@ConditionalOnBean(PostSyncService.class)
public class SystemPostDeleteHandler implements SyncEventHandler<DatabusPostData> {
@Resource
private PostSyncService postSyncService;
@Override
public DatabusEventType getSupportedEventType() {
return DatabusEventType.SYSTEM_POST_DELETE;
}
@Override
public void handle(DatabusMessage<DatabusPostData> message) {
DatabusPostData data = message.getData();
log.info("[PostSync] 收到岗位删除事件, id={}", data.getId());
try {
postSyncService.delete(data.getId());
log.info("[PostSync] 岗位删除成功, id={}", data.getId());
} catch (Exception e) {
log.error("[PostSync] 岗位删除失败, id={}", data.getId(), e);
throw e; // 抛出异常触发重试
}
}
}

View File

@@ -0,0 +1,81 @@
package com.zt.plat.framework.databus.client.handler.post;
import com.zt.plat.framework.databus.client.handler.SyncEventHandler;
import com.zt.plat.module.databus.api.data.DatabusPostData;
import com.zt.plat.module.databus.api.message.DatabusMessage;
import com.zt.plat.module.databus.enums.DatabusEventType;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
import java.util.Arrays;
import java.util.List;
/**
* 岗位增量同步事件处理器(合并版)
* <p>
* 处理事件类型:
* - SYSTEM_POST_CREATE岗位创建
* - SYSTEM_POST_UPDATE岗位更新
* - SYSTEM_POST_DELETE岗位删除
* <p>
* 使用条件:
* 1. zt.databus.sync.client.enabled=true
* 2. 存在 PostSyncService Bean
*
* @author ZT
*/
@Slf4j
@Component
@ConditionalOnProperty(prefix = "zt.databus.sync.client", name = "enabled", havingValue = "true")
@ConditionalOnBean(PostSyncService.class)
public class SystemPostHandler implements SyncEventHandler<DatabusPostData> {
@Resource
private PostSyncService postSyncService;
@Override
public List<DatabusEventType> getSupportedEventTypes() {
return Arrays.asList(
DatabusEventType.SYSTEM_POST_CREATE,
DatabusEventType.SYSTEM_POST_UPDATE,
DatabusEventType.SYSTEM_POST_DELETE
);
}
@Override
public void handle(DatabusMessage<DatabusPostData> message) {
DatabusPostData data = message.getData();
DatabusEventType eventType = message.getEventType();
log.info("[PostSync] 收到岗位{}事件, id={}, name={}, code={}",
eventType.getAction(), data.getId(), data.getName(), data.getCode());
try {
switch (eventType) {
case SYSTEM_POST_CREATE:
postSyncService.create(data);
log.info("[PostSync] 岗位创建成功, id={}", data.getId());
break;
case SYSTEM_POST_UPDATE:
postSyncService.update(data);
log.info("[PostSync] 岗位更新成功, id={}", data.getId());
break;
case SYSTEM_POST_DELETE:
postSyncService.delete(data.getId());
log.info("[PostSync] 岗位删除成功, id={}", data.getId());
break;
default:
log.warn("[PostSync] 未知事件类型: {}", eventType);
}
} catch (Exception e) {
log.error("[PostSync] 岗位{}失败, id={}", eventType.getAction(), data.getId(), e);
throw e; // 抛出异常触发重试
}
}
}

View File

@@ -1,50 +0,0 @@
package com.zt.plat.framework.databus.client.handler.post;
import com.zt.plat.framework.databus.client.handler.SyncEventHandler;
import com.zt.plat.module.databus.api.data.DatabusPostData;
import com.zt.plat.module.databus.api.message.DatabusMessage;
import com.zt.plat.module.databus.enums.DatabusEventType;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
/**
* 岗位更新事件处理器
* <p>
* 使用条件:
* 1. zt.databus.sync.client.enabled=true
* 2. 存在 PostSyncService Bean
*
* @author ZT
*/
@Slf4j
@Component
@ConditionalOnProperty(prefix = "zt.databus.sync.client", name = "enabled", havingValue = "true")
@ConditionalOnBean(PostSyncService.class)
public class SystemPostUpdateHandler implements SyncEventHandler<DatabusPostData> {
@Resource
private PostSyncService postSyncService;
@Override
public DatabusEventType getSupportedEventType() {
return DatabusEventType.SYSTEM_POST_UPDATE;
}
@Override
public void handle(DatabusMessage<DatabusPostData> message) {
DatabusPostData data = message.getData();
log.info("[PostSync] 收到岗位更新事件, id={}, name={}, code={}",
data.getId(), data.getName(), data.getCode());
try {
postSyncService.update(data);
log.info("[PostSync] 岗位更新成功, id={}", data.getId());
} catch (Exception e) {
log.error("[PostSync] 岗位更新失败, id={}", data.getId(), e);
throw e; // 抛出异常触发重试
}
}
}

View File

@@ -1,49 +0,0 @@
package com.zt.plat.framework.databus.client.handler.user;
import com.zt.plat.framework.databus.client.handler.SyncEventHandler;
import com.zt.plat.module.databus.api.data.DatabusAdminUserData;
import com.zt.plat.module.databus.api.message.DatabusMessage;
import com.zt.plat.module.databus.enums.DatabusEventType;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
/**
* 用户创建事件处理器
* <p>
* 使用条件:
* 1. zt.databus.sync.client.enabled=true
* 2. 存在 AdminUserSyncService Bean
*
* @author ZT
*/
@Slf4j
@Component
@ConditionalOnProperty(prefix = "zt.databus.sync.client", name = "enabled", havingValue = "true")
@ConditionalOnBean(AdminUserSyncService.class)
public class SystemUserCreateHandler implements SyncEventHandler<DatabusAdminUserData> {
@Resource
private AdminUserSyncService adminUserSyncService;
@Override
public DatabusEventType getSupportedEventType() {
return DatabusEventType.SYSTEM_USER_CREATE;
}
@Override
public void handle(DatabusMessage<DatabusAdminUserData> message) {
DatabusAdminUserData data = message.getData();
log.info("[UserSync] 收到用户创建事件, id={}, username={}", data.getId(), data.getUsername());
try {
adminUserSyncService.create(data);
log.info("[UserSync] 用户创建成功, id={}", data.getId());
} catch (Exception e) {
log.error("[UserSync] 用户创建失败, id={}", data.getId(), e);
throw e; // 抛出异常触发重试
}
}
}

View File

@@ -1,49 +0,0 @@
package com.zt.plat.framework.databus.client.handler.user;
import com.zt.plat.framework.databus.client.handler.SyncEventHandler;
import com.zt.plat.module.databus.api.data.DatabusAdminUserData;
import com.zt.plat.module.databus.api.message.DatabusMessage;
import com.zt.plat.module.databus.enums.DatabusEventType;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
/**
* 用户删除事件处理器
* <p>
* 使用条件:
* 1. zt.databus.sync.client.enabled=true
* 2. 存在 AdminUserSyncService Bean
*
* @author ZT
*/
@Slf4j
@Component
@ConditionalOnProperty(prefix = "zt.databus.sync.client", name = "enabled", havingValue = "true")
@ConditionalOnBean(AdminUserSyncService.class)
public class SystemUserDeleteHandler implements SyncEventHandler<DatabusAdminUserData> {
@Resource
private AdminUserSyncService adminUserSyncService;
@Override
public DatabusEventType getSupportedEventType() {
return DatabusEventType.SYSTEM_USER_DELETE;
}
@Override
public void handle(DatabusMessage<DatabusAdminUserData> message) {
DatabusAdminUserData data = message.getData();
log.info("[UserSync] 收到用户删除事件, id={}", data.getId());
try {
adminUserSyncService.delete(data.getId());
log.info("[UserSync] 用户删除成功, id={}", data.getId());
} catch (Exception e) {
log.error("[UserSync] 用户删除失败, id={}", data.getId(), e);
throw e; // 抛出异常触发重试
}
}
}

View File

@@ -0,0 +1,81 @@
package com.zt.plat.framework.databus.client.handler.user;
import com.zt.plat.framework.databus.client.handler.SyncEventHandler;
import com.zt.plat.module.databus.api.data.DatabusAdminUserData;
import com.zt.plat.module.databus.api.message.DatabusMessage;
import com.zt.plat.module.databus.enums.DatabusEventType;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
import java.util.Arrays;
import java.util.List;
/**
* 用户增量同步事件处理器(合并版)
* <p>
* 处理事件类型:
* - SYSTEM_USER_CREATE用户创建
* - SYSTEM_USER_UPDATE用户更新
* - SYSTEM_USER_DELETE用户删除
* <p>
* 使用条件:
* 1. zt.databus.sync.client.enabled=true
* 2. 存在 AdminUserSyncService Bean
*
* @author ZT
*/
@Slf4j
@Component
@ConditionalOnProperty(prefix = "zt.databus.sync.client", name = "enabled", havingValue = "true")
@ConditionalOnBean(AdminUserSyncService.class)
public class SystemUserHandler implements SyncEventHandler<DatabusAdminUserData> {
@Resource
private AdminUserSyncService adminUserSyncService;
@Override
public List<DatabusEventType> getSupportedEventTypes() {
return Arrays.asList(
DatabusEventType.SYSTEM_USER_CREATE,
DatabusEventType.SYSTEM_USER_UPDATE,
DatabusEventType.SYSTEM_USER_DELETE
);
}
@Override
public void handle(DatabusMessage<DatabusAdminUserData> message) {
DatabusAdminUserData data = message.getData();
DatabusEventType eventType = message.getEventType();
log.info("[UserSync] 收到用户{}事件, id={}, username={}, nickname={}",
eventType.getAction(), data.getId(), data.getUsername(), data.getNickname());
try {
switch (eventType) {
case SYSTEM_USER_CREATE:
adminUserSyncService.create(data);
log.info("[UserSync] 用户创建成功, id={}", data.getId());
break;
case SYSTEM_USER_UPDATE:
adminUserSyncService.update(data);
log.info("[UserSync] 用户更新成功, id={}", data.getId());
break;
case SYSTEM_USER_DELETE:
adminUserSyncService.delete(data.getId());
log.info("[UserSync] 用户删除成功, id={}", data.getId());
break;
default:
log.warn("[UserSync] 未知事件类型: {}", eventType);
}
} catch (Exception e) {
log.error("[UserSync] 用户{}失败, id={}", eventType.getAction(), data.getId(), e);
throw e; // 抛出异常触发重试
}
}
}

View File

@@ -1,49 +0,0 @@
package com.zt.plat.framework.databus.client.handler.user;
import com.zt.plat.framework.databus.client.handler.SyncEventHandler;
import com.zt.plat.module.databus.api.data.DatabusAdminUserData;
import com.zt.plat.module.databus.api.message.DatabusMessage;
import com.zt.plat.module.databus.enums.DatabusEventType;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
/**
* 用户更新事件处理器
* <p>
* 使用条件:
* 1. zt.databus.sync.client.enabled=true
* 2. 存在 AdminUserSyncService Bean
*
* @author ZT
*/
@Slf4j
@Component
@ConditionalOnProperty(prefix = "zt.databus.sync.client", name = "enabled", havingValue = "true")
@ConditionalOnBean(AdminUserSyncService.class)
public class SystemUserUpdateHandler implements SyncEventHandler<DatabusAdminUserData> {
@Resource
private AdminUserSyncService adminUserSyncService;
@Override
public DatabusEventType getSupportedEventType() {
return DatabusEventType.SYSTEM_USER_UPDATE;
}
@Override
public void handle(DatabusMessage<DatabusAdminUserData> message) {
DatabusAdminUserData data = message.getData();
log.info("[UserSync] 收到用户更新事件, id={}, username={}", data.getId(), data.getUsername());
try {
adminUserSyncService.update(data);
log.info("[UserSync] 用户更新成功, id={}", data.getId());
} catch (Exception e) {
log.error("[UserSync] 用户更新失败, id={}", data.getId(), e);
throw e; // 抛出异常触发重试
}
}
}

View File

@@ -0,0 +1,70 @@
package com.zt.plat.framework.databus.client.handler.userdept;
import com.zt.plat.framework.databus.client.handler.BatchSyncEventHandler;
import com.zt.plat.module.databus.api.data.DatabusUserDeptData;
import com.zt.plat.module.databus.api.message.DatabusBatchMessage;
import com.zt.plat.module.databus.enums.DatabusEventType;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
/**
* 用户-部门关系全量同步事件处理器(批量处理)
* <p>
* 使用条件:
* 1. zt.databus.sync.client.enabled=true
* 2. 存在 UserDeptSyncService Bean
*
* @author ZT
*/
@Slf4j
@Component
@ConditionalOnProperty(prefix = "zt.databus.sync.client", name = "enabled", havingValue = "true")
@ConditionalOnBean(UserDeptSyncService.class)
public class SystemUserDeptFullHandler implements BatchSyncEventHandler<DatabusUserDeptData> {
@Resource
private UserDeptSyncService userDeptSyncService;
@Override
public DatabusEventType getSupportedEventType() {
return DatabusEventType.SYSTEM_USER_DEPT_FULL;
}
@Override
public void onFullSyncStart(DatabusBatchMessage<DatabusUserDeptData> message) {
log.info("[UserDeptSync] 开始用户-部门关系全量同步, taskId={}, totalBatch={}",
message.getTaskId(), message.getTotalBatch());
}
@Override
public void handleBatch(DatabusBatchMessage<DatabusUserDeptData> message) {
log.info("[UserDeptSync] 处理用户-部门关系批次数据, taskId={}, batchNo={}/{}, size={}",
message.getTaskId(), message.getBatchNo(), message.getTotalBatch(),
message.getDataList().size());
// 逐条处理全量同步数据
for (DatabusUserDeptData data : message.getDataList()) {
try {
userDeptSyncService.fullSync(data);
log.debug("[UserDeptSync] 用户-部门关系全量同步成功, userId={}, deptId={}",
data.getUserId(), data.getDeptId());
} catch (Exception e) {
log.error("[UserDeptSync] 用户-部门关系全量同步失败, userId={}, deptId={}",
data.getUserId(), data.getDeptId(), e);
// 单条失败不影响其他数据,继续处理
}
}
log.info("[UserDeptSync] 用户-部门关系批次处理完成, taskId={}, batchNo={}/{}",
message.getTaskId(), message.getBatchNo(), message.getTotalBatch());
}
@Override
public void onFullSyncComplete(DatabusBatchMessage<DatabusUserDeptData> message) {
log.info("[UserDeptSync] 用户-部门关系全量同步完成, taskId={}, totalBatch={}",
message.getTaskId(), message.getTotalBatch());
}
}

View File

@@ -0,0 +1,87 @@
package com.zt.plat.framework.databus.client.handler.userdept;
import com.zt.plat.framework.databus.client.handler.SyncEventHandler;
import com.zt.plat.module.databus.api.data.DatabusUserDeptData;
import com.zt.plat.module.databus.api.message.DatabusMessage;
import com.zt.plat.module.databus.enums.DatabusEventType;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
import java.util.Arrays;
import java.util.List;
/**
* 用户-部门关系增量同步事件处理器(合并版)
* <p>
* 处理事件类型:
* - SYSTEM_USER_DEPT_CREATE用户-部门关系创建
* - SYSTEM_USER_DEPT_UPDATE用户-部门关系更新
* - SYSTEM_USER_DEPT_DELETE用户-部门关系删除
* <p>
* 使用条件:
* 1. zt.databus.sync.client.enabled=true
* 2. 存在 UserDeptSyncService Bean
*
* @author ZT
*/
@Slf4j
@Component
@ConditionalOnProperty(prefix = "zt.databus.sync.client", name = "enabled", havingValue = "true")
@ConditionalOnBean(UserDeptSyncService.class)
public class SystemUserDeptHandler implements SyncEventHandler<DatabusUserDeptData> {
@Resource
private UserDeptSyncService userDeptSyncService;
@Override
public List<DatabusEventType> getSupportedEventTypes() {
return Arrays.asList(
DatabusEventType.SYSTEM_USER_DEPT_CREATE,
DatabusEventType.SYSTEM_USER_DEPT_UPDATE,
DatabusEventType.SYSTEM_USER_DEPT_DELETE
);
}
@Override
public void handle(DatabusMessage<DatabusUserDeptData> message) {
DatabusUserDeptData data = message.getData();
DatabusEventType eventType = message.getEventType();
log.info("[UserDeptSync] 收到用户-部门关系{}事件, userId={}, deptId={}",
eventType.getAction(), data != null ? data.getUserId() : null,
data != null ? data.getDeptId() : null);
try {
switch (eventType) {
case SYSTEM_USER_DEPT_CREATE:
userDeptSyncService.create(data);
log.info("[UserDeptSync] 用户-部门关系创建成功, userId={}, deptId={}",
data.getUserId(), data.getDeptId());
break;
case SYSTEM_USER_DEPT_UPDATE:
userDeptSyncService.update(data);
log.info("[UserDeptSync] 用户-部门关系更新成功, userId={}, deptId={}",
data.getUserId(), data.getDeptId());
break;
case SYSTEM_USER_DEPT_DELETE:
userDeptSyncService.delete(data.getId());
log.info("[UserDeptSync] 用户-部门关系删除成功, id={}", data.getId());
break;
default:
log.warn("[UserDeptSync] 未知事件类型: {}", eventType);
}
} catch (Exception e) {
log.error("[UserDeptSync] 用户-部门关系{}失败, userId={}, deptId={}",
eventType.getAction(),
data != null ? data.getUserId() : null,
data != null ? data.getDeptId() : null, e);
throw e; // 抛出异常触发重试
}
}
}

View File

@@ -0,0 +1,44 @@
package com.zt.plat.framework.databus.client.handler.userdept;
import com.zt.plat.module.databus.api.data.DatabusUserDeptData;
/**
* 用户-部门关系同步服务接口
* <p>
* 分公司需要实现此接口,完成数据的本地持久化
* 或通过默认实现 {@link UserDeptSyncServiceImpl} 使用 Feign 调用远程 API
*
* @author ZT
*/
public interface UserDeptSyncService {
/**
* 创建用户-部门关系(增量同步)
*
* @param data 用户-部门关系数据
*/
void create(DatabusUserDeptData data);
/**
* 更新用户-部门关系(增量同步)
*
* @param data 用户-部门关系数据
*/
void update(DatabusUserDeptData data);
/**
* 删除用户-部门关系(增量同步)
*
* @param id 关系ID
*/
void delete(Long id);
/**
* 全量同步单条数据
* <p>
* 逻辑:存在则更新,不存在则插入
*
* @param data 用户-部门关系数据
*/
void fullSync(DatabusUserDeptData data);
}

View File

@@ -0,0 +1,56 @@
package com.zt.plat.framework.databus.client.handler.userdept;
import com.zt.plat.module.databus.api.data.DatabusUserDeptData;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Service;
/**
* 用户-部门关系同步服务实现
* <p>
* 使用条件:
* 1. zt.databus.sync.client.enabled=true
* <p>
* 注意:由于用户-部门关系通常集成在用户管理中,此实现为占位符。
* 分公司可以根据实际情况:
* 1. 自定义实现此接口,直接操作本地数据库
* 2. 或者通过用户管理 API 间接处理关联关系
*
* @author ZT
*/
@Slf4j
@Service
@ConditionalOnProperty(prefix = "zt.databus.sync.client", name = "enabled", havingValue = "true")
public class UserDeptSyncServiceImpl implements UserDeptSyncService {
@Override
public void create(DatabusUserDeptData data) {
log.info("[UserDeptSync] 收到创建用户-部门关系请求, userId={}, deptId={}",
data.getUserId(), data.getDeptId());
log.warn("[UserDeptSync] 用户-部门关系同步服务需要分公司自定义实现,当前为占位符实现");
// TODO: 分公司需要实现此方法,通过本地 API 或直接数据库操作完成同步
}
@Override
public void update(DatabusUserDeptData data) {
log.info("[UserDeptSync] 收到更新用户-部门关系请求, userId={}, deptId={}",
data.getUserId(), data.getDeptId());
log.warn("[UserDeptSync] 用户-部门关系同步服务需要分公司自定义实现,当前为占位符实现");
// TODO: 分公司需要实现此方法
}
@Override
public void delete(Long id) {
log.info("[UserDeptSync] 收到删除用户-部门关系请求, id={}", id);
log.warn("[UserDeptSync] 用户-部门关系同步服务需要分公司自定义实现,当前为占位符实现");
// TODO: 分公司需要实现此方法
}
@Override
public void fullSync(DatabusUserDeptData data) {
log.info("[UserDeptSync] 收到全量同步用户-部门关系请求, userId={}, deptId={}",
data.getUserId(), data.getDeptId());
log.warn("[UserDeptSync] 用户-部门关系同步服务需要分公司自定义实现,当前为占位符实现");
// TODO: 分公司需要实现此方法,逻辑:存在则更新,不存在则插入
}
}

View File

@@ -0,0 +1,70 @@
package com.zt.plat.framework.databus.client.handler.userpost;
import com.zt.plat.framework.databus.client.handler.BatchSyncEventHandler;
import com.zt.plat.module.databus.api.data.DatabusUserPostData;
import com.zt.plat.module.databus.api.message.DatabusBatchMessage;
import com.zt.plat.module.databus.enums.DatabusEventType;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
/**
* 用户-岗位关系全量同步事件处理器(批量处理)
* <p>
* 使用条件:
* 1. zt.databus.sync.client.enabled=true
* 2. 存在 UserPostSyncService Bean
*
* @author ZT
*/
@Slf4j
@Component
@ConditionalOnProperty(prefix = "zt.databus.sync.client", name = "enabled", havingValue = "true")
@ConditionalOnBean(UserPostSyncService.class)
public class SystemUserPostFullHandler implements BatchSyncEventHandler<DatabusUserPostData> {
@Resource
private UserPostSyncService userPostSyncService;
@Override
public DatabusEventType getSupportedEventType() {
return DatabusEventType.SYSTEM_USER_POST_FULL;
}
@Override
public void onFullSyncStart(DatabusBatchMessage<DatabusUserPostData> message) {
log.info("[UserPostSync] 开始用户-岗位关系全量同步, taskId={}, totalBatch={}",
message.getTaskId(), message.getTotalBatch());
}
@Override
public void handleBatch(DatabusBatchMessage<DatabusUserPostData> message) {
log.info("[UserPostSync] 处理用户-岗位关系批次数据, taskId={}, batchNo={}/{}, size={}",
message.getTaskId(), message.getBatchNo(), message.getTotalBatch(),
message.getDataList().size());
// 逐条处理全量同步数据
for (DatabusUserPostData data : message.getDataList()) {
try {
userPostSyncService.fullSync(data);
log.debug("[UserPostSync] 用户-岗位关系全量同步成功, userId={}, postId={}",
data.getUserId(), data.getPostId());
} catch (Exception e) {
log.error("[UserPostSync] 用户-岗位关系全量同步失败, userId={}, postId={}",
data.getUserId(), data.getPostId(), e);
// 单条失败不影响其他数据,继续处理
}
}
log.info("[UserPostSync] 用户-岗位关系批次处理完成, taskId={}, batchNo={}/{}",
message.getTaskId(), message.getBatchNo(), message.getTotalBatch());
}
@Override
public void onFullSyncComplete(DatabusBatchMessage<DatabusUserPostData> message) {
log.info("[UserPostSync] 用户-岗位关系全量同步完成, taskId={}, totalBatch={}",
message.getTaskId(), message.getTotalBatch());
}
}

View File

@@ -0,0 +1,87 @@
package com.zt.plat.framework.databus.client.handler.userpost;
import com.zt.plat.framework.databus.client.handler.SyncEventHandler;
import com.zt.plat.module.databus.api.data.DatabusUserPostData;
import com.zt.plat.module.databus.api.message.DatabusMessage;
import com.zt.plat.module.databus.enums.DatabusEventType;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;
import java.util.Arrays;
import java.util.List;
/**
* 用户-岗位关系增量同步事件处理器(合并版)
* <p>
* 处理事件类型:
* - SYSTEM_USER_POST_CREATE用户-岗位关系创建
* - SYSTEM_USER_POST_UPDATE用户-岗位关系更新
* - SYSTEM_USER_POST_DELETE用户-岗位关系删除
* <p>
* 使用条件:
* 1. zt.databus.sync.client.enabled=true
* 2. 存在 UserPostSyncService Bean
*
* @author ZT
*/
@Slf4j
@Component
@ConditionalOnProperty(prefix = "zt.databus.sync.client", name = "enabled", havingValue = "true")
@ConditionalOnBean(UserPostSyncService.class)
public class SystemUserPostHandler implements SyncEventHandler<DatabusUserPostData> {
@Resource
private UserPostSyncService userPostSyncService;
@Override
public List<DatabusEventType> getSupportedEventTypes() {
return Arrays.asList(
DatabusEventType.SYSTEM_USER_POST_CREATE,
DatabusEventType.SYSTEM_USER_POST_UPDATE,
DatabusEventType.SYSTEM_USER_POST_DELETE
);
}
@Override
public void handle(DatabusMessage<DatabusUserPostData> message) {
DatabusUserPostData data = message.getData();
DatabusEventType eventType = message.getEventType();
log.info("[UserPostSync] 收到用户-岗位关系{}事件, userId={}, postId={}",
eventType.getAction(), data != null ? data.getUserId() : null,
data != null ? data.getPostId() : null);
try {
switch (eventType) {
case SYSTEM_USER_POST_CREATE:
userPostSyncService.create(data);
log.info("[UserPostSync] 用户-岗位关系创建成功, userId={}, postId={}",
data.getUserId(), data.getPostId());
break;
case SYSTEM_USER_POST_UPDATE:
userPostSyncService.update(data);
log.info("[UserPostSync] 用户-岗位关系更新成功, userId={}, postId={}",
data.getUserId(), data.getPostId());
break;
case SYSTEM_USER_POST_DELETE:
userPostSyncService.delete(data.getId());
log.info("[UserPostSync] 用户-岗位关系删除成功, id={}", data.getId());
break;
default:
log.warn("[UserPostSync] 未知事件类型: {}", eventType);
}
} catch (Exception e) {
log.error("[UserPostSync] 用户-岗位关系{}失败, userId={}, postId={}",
eventType.getAction(),
data != null ? data.getUserId() : null,
data != null ? data.getPostId() : null, e);
throw e; // 抛出异常触发重试
}
}
}

View File

@@ -0,0 +1,44 @@
package com.zt.plat.framework.databus.client.handler.userpost;
import com.zt.plat.module.databus.api.data.DatabusUserPostData;
/**
* 用户-岗位关系同步服务接口
* <p>
* 分公司需要实现此接口,完成数据的本地持久化
* 或通过默认实现 {@link UserPostSyncServiceImpl} 使用 Feign 调用远程 API
*
* @author ZT
*/
public interface UserPostSyncService {
/**
* 创建用户-岗位关系(增量同步)
*
* @param data 用户-岗位关系数据
*/
void create(DatabusUserPostData data);
/**
* 更新用户-岗位关系(增量同步)
*
* @param data 用户-岗位关系数据
*/
void update(DatabusUserPostData data);
/**
* 删除用户-岗位关系(增量同步)
*
* @param id 关系ID
*/
void delete(Long id);
/**
* 全量同步单条数据
* <p>
* 逻辑:存在则更新,不存在则插入
*
* @param data 用户-岗位关系数据
*/
void fullSync(DatabusUserPostData data);
}

View File

@@ -0,0 +1,56 @@
package com.zt.plat.framework.databus.client.handler.userpost;
import com.zt.plat.module.databus.api.data.DatabusUserPostData;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Service;
/**
* 用户-岗位关系同步服务实现
* <p>
* 使用条件:
* 1. zt.databus.sync.client.enabled=true
* <p>
* 注意:由于用户-岗位关系通常集成在用户管理中,此实现为占位符。
* 分公司可以根据实际情况:
* 1. 自定义实现此接口,直接操作本地数据库
* 2. 或者通过用户管理 API 间接处理关联关系
*
* @author ZT
*/
@Slf4j
@Service
@ConditionalOnProperty(prefix = "zt.databus.sync.client", name = "enabled", havingValue = "true")
public class UserPostSyncServiceImpl implements UserPostSyncService {
@Override
public void create(DatabusUserPostData data) {
log.info("[UserPostSync] 收到创建用户-岗位关系请求, userId={}, postId={}",
data.getUserId(), data.getPostId());
log.warn("[UserPostSync] 用户-岗位关系同步服务需要分公司自定义实现,当前为占位符实现");
// TODO: 分公司需要实现此方法,通过本地 API 或直接数据库操作完成同步
}
@Override
public void update(DatabusUserPostData data) {
log.info("[UserPostSync] 收到更新用户-岗位关系请求, userId={}, postId={}",
data.getUserId(), data.getPostId());
log.warn("[UserPostSync] 用户-岗位关系同步服务需要分公司自定义实现,当前为占位符实现");
// TODO: 分公司需要实现此方法
}
@Override
public void delete(Long id) {
log.info("[UserPostSync] 收到删除用户-岗位关系请求, id={}", id);
log.warn("[UserPostSync] 用户-岗位关系同步服务需要分公司自定义实现,当前为占位符实现");
// TODO: 分公司需要实现此方法
}
@Override
public void fullSync(DatabusUserPostData data) {
log.info("[UserPostSync] 收到全量同步用户-岗位关系请求, userId={}, postId={}",
data.getUserId(), data.getPostId());
log.warn("[UserPostSync] 用户-岗位关系同步服务需要分公司自定义实现,当前为占位符实现");
// TODO: 分公司需要实现此方法,逻辑:存在则更新,不存在则插入
}
}

View File

@@ -0,0 +1,48 @@
package com.zt.plat.module.databus.api.data;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
/**
* 用户与组织机构关联关系数据对象
* <p>
* 对应表system_user_dept
*
* @author ZT
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class DatabusUserDeptData implements Serializable {
/**
* 关联关系ID
*/
private Long id;
/**
* 用户ID
*/
private Long userId;
/**
* 部门ID
*/
private Long deptId;
/**
* 租户ID
*/
private Long tenantId;
/**
* 备注
*/
private String remark;
}

View File

@@ -0,0 +1,38 @@
package com.zt.plat.module.databus.api.data;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
/**
* 用户与岗位关联关系数据对象
* <p>
* 对应表system_user_post
*
* @author ZT
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class DatabusUserPostData implements Serializable {
/**
* 关联关系ID
*/
private Long id;
/**
* 用户ID
*/
private Long userId;
/**
* 岗位ID
*/
private Long postId;
}

View File

@@ -41,49 +41,45 @@ public enum DatabusEventType {
SYSTEM_USER_FULL("system", "user", "full", "用户全量同步"), SYSTEM_USER_FULL("system", "user", "full", "用户全量同步"),
/** /**
* 部门-创建 * 组织机构-创建
*/
SYSTEM_ORG_CREATE("system", "org", "create", "组织机构创建"),
/**
* 组织机构-更新
*/
SYSTEM_ORG_UPDATE("system", "org", "update", "组织机构更新"),
/**
* 组织机构-删除
*/
SYSTEM_ORG_DELETE("system", "org", "delete", "组织机构删除"),
/**
* 组织机构-全量同步
*/
SYSTEM_ORG_FULL("system", "org", "full", "组织机构全量同步"),
/**
* 部门-创建(新命名,推荐使用)
*/ */
SYSTEM_DEPT_CREATE("system", "dept", "create", "部门创建"), SYSTEM_DEPT_CREATE("system", "dept", "create", "部门创建"),
/** /**
* 部门-更新 * 部门-更新(新命名,推荐使用)
*/ */
SYSTEM_DEPT_UPDATE("system", "dept", "update", "部门更新"), SYSTEM_DEPT_UPDATE("system", "dept", "update", "部门更新"),
/** /**
* 部门-删除 * 部门-删除(新命名,推荐使用)
*/ */
SYSTEM_DEPT_DELETE("system", "dept", "delete", "部门删除"), SYSTEM_DEPT_DELETE("system", "dept", "delete", "部门删除"),
/** /**
* 部门-全量同步 * 部门-全量同步(新命名,推荐使用)
*/ */
SYSTEM_DEPT_FULL("system", "dept", "full", "部门全量同步"), SYSTEM_DEPT_FULL("system", "dept", "full", "部门全量同步"),
/**
* 组织机构-创建(兼容老代码,保留但不推荐使用)
*/
@Deprecated
SYSTEM_ORG_CREATE("system", "org", "create", "组织机构创建"),
/**
* 组织机构-更新(兼容老代码,保留但不推荐使用)
*/
@Deprecated
SYSTEM_ORG_UPDATE("system", "org", "update", "组织机构更新"),
/**
* 组织机构-删除(兼容老代码,保留但不推荐使用)
*/
@Deprecated
SYSTEM_ORG_DELETE("system", "org", "delete", "组织机构删除"),
/**
* 组织机构-全量同步(兼容老代码,保留但不推荐使用)
*/
@Deprecated
SYSTEM_ORG_FULL("system", "org", "full", "组织机构全量同步"),
/** /**
* 岗位-创建 * 岗位-创建
*/ */
@@ -104,6 +100,84 @@ public enum DatabusEventType {
*/ */
SYSTEM_POST_FULL("system", "post", "full", "岗位全量同步"), SYSTEM_POST_FULL("system", "post", "full", "岗位全量同步"),
/**
* 用户-部门关系-创建
*/
SYSTEM_USER_DEPT_CREATE("system", "user-dept", "create", "用户-部门关系创建"),
/**
* 用户-部门关系-更新
*/
SYSTEM_USER_DEPT_UPDATE("system", "user-dept", "update", "用户-部门关系更新"),
/**
* 用户-部门关系-删除
*/
SYSTEM_USER_DEPT_DELETE("system", "user-dept", "delete", "用户-部门关系删除"),
/**
* 用户-部门关系-全量同步
*/
SYSTEM_USER_DEPT_FULL("system", "user-dept", "full", "用户-部门关系全量同步"),
/**
* 用户-岗位关系-创建
*/
SYSTEM_USER_POST_CREATE("system", "user-post", "create", "用户-岗位关系创建"),
/**
* 用户-岗位关系-更新
*/
SYSTEM_USER_POST_UPDATE("system", "user-post", "update", "用户-岗位关系更新"),
/**
* 用户-岗位关系-删除
*/
SYSTEM_USER_POST_DELETE("system", "user-post", "delete", "用户-岗位关系删除"),
/**
* 用户-岗位关系-全量同步
*/
SYSTEM_USER_POST_FULL("system", "user-post", "full", "用户-岗位关系全量同步"),
/**
* @deprecated 已拆分为 SYSTEM_USER_DEPT_* 和 SYSTEM_USER_POST_*,保留用于向后兼容
*/
@Deprecated
SYSTEM_USER_RELATION_CREATE("system", "user-relation", "create", "用户关联创建"),
/**
* @deprecated 已拆分为 SYSTEM_USER_DEPT_* 和 SYSTEM_USER_POST_*,保留用于向后兼容
*/
@Deprecated
SYSTEM_USER_RELATION_UPDATE("system", "user-relation", "update", "用户关联更新"),
/**
* @deprecated 已拆分为 SYSTEM_USER_DEPT_* 和 SYSTEM_USER_POST_*,保留用于向后兼容
*/
@Deprecated
SYSTEM_USER_RELATION_DELETE("system", "user-relation", "delete", "用户关联删除"),
/**
* @deprecated 已拆分为 SYSTEM_USER_DEPT_* 和 SYSTEM_USER_POST_*,保留用于向后兼容
*/
@Deprecated
SYSTEM_USER_RELATION_FULL("system", "user-relation", "full", "用户关联全量同步"),
/**
* 基础数据-全量同步(组合编排事件)
* <p>
* 按依赖顺序触发以下全量同步:
* 1. 组织机构SYSTEM_DEPT_FULL
* 2. 岗位SYSTEM_POST_FULL
* 3. 用户SYSTEM_USER_FULL
* 4. 用户-部门关系SYSTEM_USER_DEPT_FULL
* 5. 用户-岗位关系SYSTEM_USER_POST_FULL
* <p>
* 注意本事件只负责触发实际数据同步由5个独立的全量同步任务完成
*/
SYSTEM_BASE_DATA_FULL("system", "base-data", "full", "基础数据全量同步"),
/** /**
* 角色-创建 * 角色-创建
*/ */
@@ -167,7 +241,7 @@ public enum DatabusEventType {
BASE_MATERIAL_FULL("base", "material", "full", "物料全量同步"), BASE_MATERIAL_FULL("base", "material", "full", "物料全量同步"),
/** /**
* 供应<EFBFBD><EFBFBD>-创建 * 供应-创建
*/ */
BASE_SUPPLIER_CREATE("base", "supplier", "create", "供应商创建"), BASE_SUPPLIER_CREATE("base", "supplier", "create", "供应商创建"),
@@ -238,13 +312,10 @@ public enum DatabusEventType {
/** /**
* 获取完整Topic名称服务端转发用 * 获取完整Topic名称服务端转发用
* 格式: {topicBase}-{clientCode}(简化版,所有事件共用一个 Topic * 格式: {topicBase}-{module}-{entity}-{action}-{clientCode}
* 示例: databus-sync-branch-001
*
* 注意:不再为每个事件创建独立 Topic而是通过消息体中的 eventType 字段路由
*/ */
public String getTopic(String topicBase, String clientCode) { public String getTopic(String topicBase, String clientCode) {
return String.format("%s-%s", topicBase, clientCode); return String.format("%s-%s-%s-%s-%s", topicBase, module, entity, action, clientCode);
} }
/** /**
@@ -273,31 +344,6 @@ public enum DatabusEventType {
return null; return null;
} }
/**
* 根据事件类型字符串获取枚举(支持大写下划线格式)
* 例如SYSTEM_POST_FULL → DatabusEventType.SYSTEM_POST_FULL
*
* @param eventType 事件类型字符串(格式: MODULE_ENTITY_ACTION
* @return 枚举值未找到返回null
*/
public static DatabusEventType getByEventType(String eventType) {
if (eventType == null) {
return null;
}
// 先尝试直接匹配枚举名称
try {
return DatabusEventType.valueOf(eventType.toUpperCase());
} catch (IllegalArgumentException e) {
// 如果失败,尝试转换格式后匹配(如 system-post-full → SYSTEM_POST_FULL
String normalized = eventType.toUpperCase().replace("-", "_");
try {
return DatabusEventType.valueOf(normalized);
} catch (IllegalArgumentException ex) {
return null;
}
}
}
/** /**
* 根据模块、实体、操作获取枚举 * 根据模块、实体、操作获取枚举
* *
@@ -317,6 +363,25 @@ public enum DatabusEventType {
return null; return null;
} }
/**
* 根据事件类型名称获取枚举
* <p>
* 支持大写下划线格式,如: SYSTEM_POST_FULL
*
* @param eventType 事件类型名称(枚举常量名)
* @return 枚举值未找到返回null
*/
public static DatabusEventType getByEventType(String eventType) {
if (eventType == null || eventType.isEmpty()) {
return null;
}
try {
return DatabusEventType.valueOf(eventType);
} catch (IllegalArgumentException e) {
return null;
}
}
/** /**
* 判断是否为全量同步事件 * 判断是否为全量同步事件
*/ */

View File

@@ -4,6 +4,8 @@ import com.zt.plat.framework.common.biz.system.oauth2.OAuth2TokenCommonApi;
import com.zt.plat.module.databus.api.provider.DatabusDeptProviderApi; import com.zt.plat.module.databus.api.provider.DatabusDeptProviderApi;
import com.zt.plat.module.databus.api.provider.DatabusPostProviderApi; import com.zt.plat.module.databus.api.provider.DatabusPostProviderApi;
import com.zt.plat.module.databus.api.provider.DatabusUserProviderApi; import com.zt.plat.module.databus.api.provider.DatabusUserProviderApi;
import com.zt.plat.module.system.api.dept.DeptApi;
import com.zt.plat.module.system.api.dept.PostApi;
import com.zt.plat.module.system.api.user.AdminUserApi; import com.zt.plat.module.system.api.user.AdminUserApi;
import org.springframework.cloud.openfeign.EnableFeignClients; import org.springframework.cloud.openfeign.EnableFeignClients;
import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Configuration;
@@ -18,7 +20,10 @@ import org.springframework.context.annotation.Configuration;
// DataBus 数据提供者 Feign 客户端 // DataBus 数据提供者 Feign 客户端
DatabusDeptProviderApi.class, DatabusDeptProviderApi.class,
DatabusUserProviderApi.class, DatabusUserProviderApi.class,
DatabusPostProviderApi.class DatabusPostProviderApi.class,
PostApi.class,
DeptApi.class,
AdminUserApi.class,
}) })
public class RpcConfiguration { public class RpcConfiguration {
} }

View File

@@ -0,0 +1,110 @@
package com.zt.plat.module.system.mq.producer.databus;
import cn.hutool.core.util.IdUtil;
import com.zt.plat.module.databus.api.data.DatabusUserDeptData;
import com.zt.plat.module.databus.api.message.DatabusMessage;
import com.zt.plat.module.databus.enums.DatabusEventType;
import com.zt.plat.module.system.dal.dataobject.userdept.UserDeptDO;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
/**
* 用户-部门关系变更消息 Producer
* <p>
* 负责发送用户与部门的关联关系变更事件
*
* @author ZT
*/
@Slf4j
@Component
public class DatabusUserDeptChangeProducer {
@Resource
private RocketMQTemplate rocketMQTemplate;
@Value("${zt.databus.change.topic-prefix:databus-change}")
private String topicPrefix;
private static final String TOPIC_SUFFIX = "-system-user-dept";
/**
* 发送用户-部门关系创建消息
*
* @param userDept 用户-部门关系对象
*/
public void sendUserDeptCreatedMessage(UserDeptDO userDept) {
try {
DatabusUserDeptData data = buildUserDeptData(userDept);
sendMessage(DatabusEventType.SYSTEM_USER_DEPT_CREATE, data);
log.info("[DatabusUserDeptChange] 发送用户-部门关系创建消息, userId={}, deptId={}",
userDept.getUserId(), userDept.getDeptId());
} catch (Exception e) {
log.error("[DatabusUserDeptChange] 发送用户-部门关系创建消息失败, userId={}, deptId={}",
userDept.getUserId(), userDept.getDeptId(), e);
}
}
/**
* 发送用户-部门关系更新消息
*
* @param userDept 用户-部门关系对象
*/
public void sendUserDeptUpdatedMessage(UserDeptDO userDept) {
try {
DatabusUserDeptData data = buildUserDeptData(userDept);
sendMessage(DatabusEventType.SYSTEM_USER_DEPT_UPDATE, data);
log.info("[DatabusUserDeptChange] 发送用户-部门关系更新消息, userId={}, deptId={}",
userDept.getUserId(), userDept.getDeptId());
} catch (Exception e) {
log.error("[DatabusUserDeptChange] 发送用户-部门关系更新消息失败, userId={}, deptId={}",
userDept.getUserId(), userDept.getDeptId(), e);
}
}
/**
* 发送用户-部门关系删除消息
*
* @param userDept 用户-部门关系对象(删除前查询的)
*/
public void sendUserDeptDeletedMessage(UserDeptDO userDept) {
try {
DatabusUserDeptData data = buildUserDeptData(userDept);
sendMessage(DatabusEventType.SYSTEM_USER_DEPT_DELETE, data);
log.info("[DatabusUserDeptChange] 发送用户-部门关系删除消息, userId={}, deptId={}",
userDept.getUserId(), userDept.getDeptId());
} catch (Exception e) {
log.error("[DatabusUserDeptChange] 发送用户-部门关系删除消息失败, userId={}, deptId={}",
userDept.getUserId(), userDept.getDeptId(), e);
}
}
/**
* 构建用户-部门关系数据
*/
private DatabusUserDeptData buildUserDeptData(UserDeptDO userDept) {
return DatabusUserDeptData.builder()
.id(userDept.getId())
.userId(userDept.getUserId())
.deptId(userDept.getDeptId())
.tenantId(userDept.getTenantId())
.remark(userDept.getRemark())
.build();
}
/**
* 发送消息到 MQ
*/
private void sendMessage(DatabusEventType eventType, DatabusUserDeptData data) {
DatabusMessage<DatabusUserDeptData> message = new DatabusMessage<>();
message.setEventType(eventType);
message.setData(data);
message.setTimestamp(java.time.LocalDateTime.now());
message.setMessageId(IdUtil.fastSimpleUUID());
String topic = topicPrefix + TOPIC_SUFFIX;
rocketMQTemplate.syncSend(topic, message);
}
}

View File

@@ -0,0 +1,108 @@
package com.zt.plat.module.system.mq.producer.databus;
import cn.hutool.core.util.IdUtil;
import com.zt.plat.module.databus.api.data.DatabusUserPostData;
import com.zt.plat.module.databus.api.message.DatabusMessage;
import com.zt.plat.module.databus.enums.DatabusEventType;
import com.zt.plat.module.system.dal.dataobject.dept.UserPostDO;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
/**
* 用户-岗位关系变更消息 Producer
* <p>
* 负责发送用户与岗位的关联关系变更事件
*
* @author ZT
*/
@Slf4j
@Component
public class DatabusUserPostChangeProducer {
@Resource
private RocketMQTemplate rocketMQTemplate;
@Value("${zt.databus.change.topic-prefix:databus-change}")
private String topicPrefix;
private static final String TOPIC_SUFFIX = "-system-user-post";
/**
* 发送用户-岗位关系创建消息
*
* @param userPost 用户-岗位关系对象
*/
public void sendUserPostCreatedMessage(UserPostDO userPost) {
try {
DatabusUserPostData data = buildUserPostData(userPost);
sendMessage(DatabusEventType.SYSTEM_USER_POST_CREATE, data);
log.info("[DatabusUserPostChange] 发送用户-岗位关系创建消息, userId={}, postId={}",
userPost.getUserId(), userPost.getPostId());
} catch (Exception e) {
log.error("[DatabusUserPostChange] 发送用户-岗位关系创建消息失败, userId={}, postId={}",
userPost.getUserId(), userPost.getPostId(), e);
}
}
/**
* 发送用户-岗位关系更新消息
*
* @param userPost 用户-岗位关系对象
*/
public void sendUserPostUpdatedMessage(UserPostDO userPost) {
try {
DatabusUserPostData data = buildUserPostData(userPost);
sendMessage(DatabusEventType.SYSTEM_USER_POST_UPDATE, data);
log.info("[DatabusUserPostChange] 发送用户-岗位关系更新消息, userId={}, postId={}",
userPost.getUserId(), userPost.getPostId());
} catch (Exception e) {
log.error("[DatabusUserPostChange] 发送用户-岗位关系更新消息失败, userId={}, postId={}",
userPost.getUserId(), userPost.getPostId(), e);
}
}
/**
* 发送用户-岗位关系删除消息
*
* @param userPost 用户-岗位关系对象(删除前查询的)
*/
public void sendUserPostDeletedMessage(UserPostDO userPost) {
try {
DatabusUserPostData data = buildUserPostData(userPost);
sendMessage(DatabusEventType.SYSTEM_USER_POST_DELETE, data);
log.info("[DatabusUserPostChange] 发送用户-岗位关系删除消息, userId={}, postId={}",
userPost.getUserId(), userPost.getPostId());
} catch (Exception e) {
log.error("[DatabusUserPostChange] 发送用户-岗位关系删除消息失败, userId={}, postId={}",
userPost.getUserId(), userPost.getPostId(), e);
}
}
/**
* 构建用户-岗位关系数据
*/
private DatabusUserPostData buildUserPostData(UserPostDO userPost) {
return DatabusUserPostData.builder()
.id(userPost.getId())
.userId(userPost.getUserId())
.postId(userPost.getPostId())
.build();
}
/**
* 发送消息到 MQ
*/
private void sendMessage(DatabusEventType eventType, DatabusUserPostData data) {
DatabusMessage<DatabusUserPostData> message = new DatabusMessage<>();
message.setEventType(eventType);
message.setData(data);
message.setTimestamp(java.time.LocalDateTime.now());
message.setMessageId(IdUtil.fastSimpleUUID());
String topic = topicPrefix + TOPIC_SUFFIX;
rocketMQTemplate.syncSend(topic, message);
}
}

View File

@@ -29,6 +29,7 @@ import com.zt.plat.module.system.dal.mysql.dept.UserPostMapper;
import com.zt.plat.module.system.dal.mysql.user.AdminUserMapper; import com.zt.plat.module.system.dal.mysql.user.AdminUserMapper;
import com.zt.plat.module.system.enums.user.PasswordStrategyEnum; import com.zt.plat.module.system.enums.user.PasswordStrategyEnum;
import com.zt.plat.module.system.enums.user.UserSourceEnum; import com.zt.plat.module.system.enums.user.UserSourceEnum;
import com.zt.plat.module.system.mq.producer.databus.DatabusChangeProducer;
import com.zt.plat.module.system.service.dept.DeptService; import com.zt.plat.module.system.service.dept.DeptService;
import com.zt.plat.module.system.service.dept.PostService; import com.zt.plat.module.system.service.dept.PostService;
import com.zt.plat.module.system.service.permission.PermissionService; import com.zt.plat.module.system.service.permission.PermissionService;
@@ -93,11 +94,10 @@ public class AdminUserServiceImpl implements AdminUserService {
@Resource @Resource
private ConfigApi configApi; private ConfigApi configApi;
@Resource
private com.zt.plat.module.system.mq.producer.databus.DatabusChangeProducer databusChangeProducer;
@Resource @Resource
private UserDeptService userDeptService; private UserDeptService userDeptService;
@Resource
private DatabusChangeProducer databusChangeProducer;
@Override @Override
@GlobalTransactional(rollbackFor = Exception.class) @GlobalTransactional(rollbackFor = Exception.class)
@@ -128,7 +128,6 @@ public class AdminUserServiceImpl implements AdminUserService {
if (user.getUserSource() == null) { if (user.getUserSource() == null) {
user.setUserSource(UserSourceEnum.EXTERNAL.getSource()); user.setUserSource(UserSourceEnum.EXTERNAL.getSource());
} }
user.setWorkcode(normalizeWorkcode(createReqVO.getWorkcode()));
PasswordStrategyEnum passwordStrategy = determinePasswordStrategy(user.getUserSource()); PasswordStrategyEnum passwordStrategy = determinePasswordStrategy(user.getUserSource());
user.setAvatar(normalizeAvatarValue(createReqVO.getAvatar())); user.setAvatar(normalizeAvatarValue(createReqVO.getAvatar()));
user.setPassword(encodePassword(createReqVO.getPassword(), passwordStrategy)); user.setPassword(encodePassword(createReqVO.getPassword(), passwordStrategy));
@@ -144,10 +143,10 @@ public class AdminUserServiceImpl implements AdminUserService {
postId -> new UserPostDO().setUserId(user.getId()).setPostId(postId))); postId -> new UserPostDO().setUserId(user.getId()).setPostId(postId)));
} }
// 2.4用户创建事件 // 3.用户创建消息到MQ供Databus消费转发
databusChangeProducer.sendUserCreatedMessage(user); databusChangeProducer.sendUserCreatedMessage(user);
// 3. 记录操作日志上下文 // 4. 记录操作日志上下文
LogRecordContext.putVariable("user", user); LogRecordContext.putVariable("user", user);
return user.getId(); return user.getId();
} }
@@ -237,13 +236,13 @@ public class AdminUserServiceImpl implements AdminUserService {
// 2.3 更新岗位 // 2.3 更新岗位
updateUserPost(updateReqVO, updateObj); updateUserPost(updateReqVO, updateObj);
// 2.4用户更新事件(重新查询获取完整数据 // 3.用户更新消息到MQ供Databus消费转发
AdminUserDO updatedUser = userMapper.selectById(updateObj.getId()); AdminUserDO updatedUser = userMapper.selectById(updateObj.getId());
if (updatedUser != null) { if (updatedUser != null) {
databusChangeProducer.sendUserUpdatedMessage(updatedUser); databusChangeProducer.sendUserUpdatedMessage(updatedUser);
} }
// 3. 记录操作日志上下文 // 4. 记录操作日志上下文
LogRecordContext.putVariable(DiffParseFunction.OLD_OBJECT, BeanUtils.toBean(oldUser, UserSaveReqVO.class)); LogRecordContext.putVariable(DiffParseFunction.OLD_OBJECT, BeanUtils.toBean(oldUser, UserSaveReqVO.class));
LogRecordContext.putVariable("user", oldUser); LogRecordContext.putVariable("user", oldUser);
} }
@@ -332,10 +331,10 @@ public class AdminUserServiceImpl implements AdminUserService {
// 2.2 删除用户岗位 // 2.2 删除用户岗位
userPostMapper.deleteByUserId(id); userPostMapper.deleteByUserId(id);
// 2.3 发用户删除事件 // 3.用户删除消息到MQ供Databus消费转发
databusChangeProducer.sendUserDeletedMessage(id, user.getTenantId()); databusChangeProducer.sendUserDeletedMessage(id, user.getTenantId());
// 3. 记录操作日志上下文 // 4. 记录操作日志上下文
LogRecordContext.putVariable("user", user); LogRecordContext.putVariable("user", user);
} }

View File

@@ -6,6 +6,7 @@ import com.zt.plat.framework.security.core.LoginUser;
import com.zt.plat.module.system.dal.dataobject.userdept.UserDeptDO; import com.zt.plat.module.system.dal.dataobject.userdept.UserDeptDO;
import com.zt.plat.module.system.dal.mysql.userdept.UserDeptMapper; import com.zt.plat.module.system.dal.mysql.userdept.UserDeptMapper;
import jakarta.annotation.Resource; import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.annotation.Transactional;
import org.springframework.validation.annotation.Validated; import org.springframework.validation.annotation.Validated;
@@ -23,18 +24,29 @@ import static com.zt.plat.module.system.enums.ErrorCodeConstants.USER_DEPT_NOT_E
* *
* @author 管理员 * @author 管理员
*/ */
@Slf4j
@Service @Service
@Validated @Validated
public class UserDeptServiceImpl implements UserDeptService { public class UserDeptServiceImpl implements UserDeptService {
@Resource @Resource
private UserDeptMapper userDeptMapper; private UserDeptMapper userDeptMapper;
@Resource
private com.zt.plat.module.system.mq.producer.databus.DatabusUserDeptChangeProducer databusUserDeptChangeProducer;
@Override @Override
public Long createUserDept(UserDeptDO createReqVO) { public Long createUserDept(UserDeptDO createReqVO) {
// 插入 // 插入
UserDeptDO userDept = BeanUtils.toBean(createReqVO, UserDeptDO.class); UserDeptDO userDept = BeanUtils.toBean(createReqVO, UserDeptDO.class);
userDeptMapper.insert(userDept); userDeptMapper.insert(userDept);
// 发送用户-部门关系创建消息
try {
databusUserDeptChangeProducer.sendUserDeptCreatedMessage(userDept);
} catch (Exception e) {
log.error("[createUserDept] 发送用户-部门关系创建消息失败", e);
}
// 返回 // 返回
return userDept.getId(); return userDept.getId();
} }
@@ -46,14 +58,34 @@ public class UserDeptServiceImpl implements UserDeptService {
// 更新 // 更新
UserDeptDO updateObj = BeanUtils.toBean(updateReqVO, UserDeptDO.class); UserDeptDO updateObj = BeanUtils.toBean(updateReqVO, UserDeptDO.class);
userDeptMapper.updateById(updateObj); userDeptMapper.updateById(updateObj);
// 发送用户-部门关系更新消息
try {
databusUserDeptChangeProducer.sendUserDeptUpdatedMessage(updateObj);
} catch (Exception e) {
log.error("[updateUserDept] 发送用户-部门关系更新消息失败", e);
}
} }
@Override @Override
public void deleteUserDept(Long id) { public void deleteUserDept(Long id) {
// 校验存在 // 校验存在
validateUserDeptExists(id); validateUserDeptExists(id);
// 查询完整对象(删除前查询,删除后无法查询)
UserDeptDO userDept = userDeptMapper.selectById(id);
// 删除 // 删除
userDeptMapper.deleteById(id); userDeptMapper.deleteById(id);
// 发送用户-部门关系删除消息
if (userDept != null) {
try {
databusUserDeptChangeProducer.sendUserDeptDeletedMessage(userDept);
} catch (Exception e) {
log.error("[deleteUserDept] 发送用户-部门关系删除消息失败", e);
}
}
} }
@Override @Override
@@ -111,6 +143,16 @@ public class UserDeptServiceImpl implements UserDeptService {
Long tenantId = Optional.ofNullable(getLoginUser()).orElse(new LoginUser()).getTenantId(); Long tenantId = Optional.ofNullable(getLoginUser()).orElse(new LoginUser()).getTenantId();
list.forEach(item -> item.setTenantId(tenantId)); list.forEach(item -> item.setTenantId(tenantId));
userDeptMapper.insertBatch(list); userDeptMapper.insertBatch(list);
// 发送用户-部门关系创建消息(为每个关系发送)
for (UserDeptDO userDept : list) {
try {
databusUserDeptChangeProducer.sendUserDeptCreatedMessage(userDept);
} catch (Exception e) {
log.error("[batchCreateUserDept] 发送用户-部门关系创建消息失败, userId={}, deptId={}",
userDept.getUserId(), userDept.getDeptId(), e);
}
}
} }
} }