diff --git a/zt-module-databus/zt-module-databus-api/src/main/java/com/zt/plat/module/databus/api/data/DatabusAdminUserData.java b/zt-module-databus/zt-module-databus-api/src/main/java/com/zt/plat/module/databus/api/data/DatabusAdminUserData.java new file mode 100644 index 00000000..067207c4 --- /dev/null +++ b/zt-module-databus/zt-module-databus-api/src/main/java/com/zt/plat/module/databus/api/data/DatabusAdminUserData.java @@ -0,0 +1,132 @@ +package com.zt.plat.module.databus.api.data; + +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.io.Serializable; +import java.time.LocalDateTime; +import java.util.List; + +/** + * 用户数据对象(DataBus) + * + * @author ZT + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +@Schema(description = "用户数据") +public class DatabusAdminUserData implements Serializable { + + private static final long serialVersionUID = 1L; + + // ========== 基本信息 ========== + + @Schema(description = "用户ID", requiredMode = Schema.RequiredMode.REQUIRED, example = "1001") + private Long id; + + @Schema(description = "用户账号", requiredMode = Schema.RequiredMode.REQUIRED, example = "zhangsan") + private String username; + + @Schema(description = "用户昵称", requiredMode = Schema.RequiredMode.REQUIRED, example = "张三") + private String nickname; + + @Schema(description = "用户手机号", example = "13800138000") + private String mobile; + + @Schema(description = "用户邮箱", example = "zhangsan@example.com") + private String email; + + @Schema(description = "用户性别:0-未知 1-男 2-女", example = "1") + private Integer sex; + + @Schema(description = "用户头像URL", example = "https://example.com/avatar.jpg") + private String avatar; + + @Schema(description = "状态:0-启用 1-禁用", requiredMode = Schema.RequiredMode.REQUIRED, example = "0") + private Integer status; + + @Schema(description = "备注", example = "测试用户") + private String remark; + + @Schema(description = "用户来源类型", example = "1") + private Integer userSource; + + // ========== 组织信息 ========== + + @Schema(description = "所属部门ID列表", example = "[100, 101]") + private List deptIds; + + @Schema(description = "所属部门信息列表") + private List depts; + + // ========== 岗位信息 ========== + + @Schema(description = "岗位ID列表", example = "[1, 2]") + private List postIds; + + @Schema(description = "岗位信息列表") + private List posts; + + // ========== 多租户 ========== + + @Schema(description = "租户ID", requiredMode = Schema.RequiredMode.REQUIRED, example = "1") + private Long tenantId; + + // ========== 时间信息 ========== + + @Schema(description = "创建时间") + private LocalDateTime createTime; + + @Schema(description = "更新时间") + private LocalDateTime updateTime; + + /** + * 部门简要信息 + */ + @Data + @Builder + @NoArgsConstructor + @AllArgsConstructor + @Schema(description = "部门简要信息") + public static class DeptSimpleInfo implements Serializable { + + private static final long serialVersionUID = 1L; + + @Schema(description = "部门ID", example = "100") + private Long deptId; + + @Schema(description = "部门编码", example = "DEPT_001") + private String deptCode; + + @Schema(description = "部门名称", example = "技术部") + private String deptName; + } + + /** + * 岗位简要信息 + */ + @Data + @Builder + @NoArgsConstructor + @AllArgsConstructor + @Schema(description = "岗位简要信息") + public static class PostSimpleInfo implements Serializable { + + private static final long serialVersionUID = 1L; + + @Schema(description = "岗位ID", example = "1") + private Long postId; + + @Schema(description = "岗位编码", example = "CEO") + private String postCode; + + @Schema(description = "岗位名称", example = "首席执行官") + private String postName; + } + +} diff --git a/zt-module-databus/zt-module-databus-api/src/main/java/com/zt/plat/module/databus/api/data/DatabusDeptData.java b/zt-module-databus/zt-module-databus-api/src/main/java/com/zt/plat/module/databus/api/data/DatabusDeptData.java new file mode 100644 index 00000000..c12abeaa --- /dev/null +++ b/zt-module-databus/zt-module-databus-api/src/main/java/com/zt/plat/module/databus/api/data/DatabusDeptData.java @@ -0,0 +1,90 @@ +package com.zt.plat.module.databus.api.data; + +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.io.Serializable; +import java.time.LocalDateTime; + +/** + * 部门数据对象(DataBus) + * + * @author ZT + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +@Schema(description = "部门数据") +public class DatabusDeptData implements Serializable { + + private static final long serialVersionUID = 1L; + + // ========== 基本信息 ========== + + @Schema(description = "部门ID", requiredMode = Schema.RequiredMode.REQUIRED, example = "100") + private Long id; + + @Schema(description = "部门编码", example = "DEPT_001") + private String code; + + @Schema(description = "部门名称", requiredMode = Schema.RequiredMode.REQUIRED, example = "技术部") + private String name; + + @Schema(description = "部门简称", example = "技术") + private String shortName; + + @Schema(description = "父部门ID(顶级为0)", requiredMode = Schema.RequiredMode.REQUIRED, example = "0") + private Long parentId; + + @Schema(description = "排序号", example = "1") + private Integer sort; + + @Schema(description = "状态:0-启用 1-禁用", requiredMode = Schema.RequiredMode.REQUIRED, example = "0") + private Integer status; + + // ========== 部门类型 ========== + + @Schema(description = "部门类型:28-公司 26-部门", example = "26") + private Integer deptType; + + @Schema(description = "是否集团", example = "false") + private Boolean isGroup; + + @Schema(description = "是否公司", example = "false") + private Boolean isCompany; + + @Schema(description = "部门来源类型", example = "1") + private Integer deptSource; + + // ========== 联系信息 ========== + + @Schema(description = "负责人ID", example = "1001") + private Long leaderUserId; + + @Schema(description = "负责人姓名", example = "张三") + private String leaderUserName; + + @Schema(description = "联系电话", example = "010-12345678") + private String phone; + + @Schema(description = "邮箱", example = "tech@example.com") + private String email; + + // ========== 多租户 ========== + + @Schema(description = "租户ID", requiredMode = Schema.RequiredMode.REQUIRED, example = "1") + private Long tenantId; + + // ========== 时间信息 ========== + + @Schema(description = "创建时间") + private LocalDateTime createTime; + + @Schema(description = "更新时间") + private LocalDateTime updateTime; + +} diff --git a/zt-module-databus/zt-module-databus-api/src/main/java/com/zt/plat/module/databus/api/data/DatabusPostData.java b/zt-module-databus/zt-module-databus-api/src/main/java/com/zt/plat/module/databus/api/data/DatabusPostData.java new file mode 100644 index 00000000..de4361fb --- /dev/null +++ b/zt-module-databus/zt-module-databus-api/src/main/java/com/zt/plat/module/databus/api/data/DatabusPostData.java @@ -0,0 +1,67 @@ +package com.zt.plat.module.databus.api.data; + +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.io.Serializable; +import java.time.LocalDateTime; + +/** + * 岗位数据对象(DataBus) + * + * @author ZT + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +public class DatabusPostData implements Serializable { + + /** + * 岗位ID + */ + private Long id; + + /** + * 岗位编码 + */ + private String code; + + /** + * 岗位名称 + */ + private String name; + + /** + * 排序 + */ + private Integer sort; + + /** + * 状态(0正常 1停用) + */ + private Integer status; + + /** + * 备注 + */ + private String remark; + + /** + * 租户ID + */ + private Long tenantId; + + /** + * 创建时间 + */ + private LocalDateTime createTime; + + /** + * 更新时间 + */ + private LocalDateTime updateTime; + +} diff --git a/zt-module-databus/zt-module-databus-api/src/main/java/com/zt/plat/module/databus/api/dto/CursorPageReqDTO.java b/zt-module-databus/zt-module-databus-api/src/main/java/com/zt/plat/module/databus/api/dto/CursorPageReqDTO.java new file mode 100644 index 00000000..8182e677 --- /dev/null +++ b/zt-module-databus/zt-module-databus-api/src/main/java/com/zt/plat/module/databus/api/dto/CursorPageReqDTO.java @@ -0,0 +1,48 @@ +package com.zt.plat.module.databus.api.dto; + +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.io.Serializable; +import java.time.LocalDateTime; + +/** + * 游标分页请求 DTO + *

+ * 用于 Databus 全量同步时的断点续传 + * 使用 create_time + id 复合游标解决雪花ID非连续问题 + * + * @author ZT + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +@Schema(description = "游标分页请求") +public class CursorPageReqDTO implements Serializable { + + private static final long serialVersionUID = 1L; + + @Schema(description = "游标时间(上一批最后一条数据的创建时间)", example = "2024-01-01T00:00:00") + private LocalDateTime cursorTime; + + @Schema(description = "游标ID(上一批最后一条数据的ID,用于同一时间戳内去重)", example = "1234567890") + private Long cursorId; + + @Schema(description = "批量大小", example = "100") + private Integer batchSize; + + @Schema(description = "租户ID(可选,用于多租户过滤)", example = "1") + private Long tenantId; + + /** + * 是否为首次查询(无游标) + */ + public boolean isFirstPage() { + return cursorTime == null && cursorId == null; + } + +} diff --git a/zt-module-databus/zt-module-databus-api/src/main/java/com/zt/plat/module/databus/api/dto/CursorPageResult.java b/zt-module-databus/zt-module-databus-api/src/main/java/com/zt/plat/module/databus/api/dto/CursorPageResult.java new file mode 100644 index 00000000..e3bc4a36 --- /dev/null +++ b/zt-module-databus/zt-module-databus-api/src/main/java/com/zt/plat/module/databus/api/dto/CursorPageResult.java @@ -0,0 +1,81 @@ +package com.zt.plat.module.databus.api.dto; + +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.io.Serializable; +import java.time.LocalDateTime; +import java.util.List; + +/** + * 游标分页结果 DTO + *

+ * 用于 Databus 全量同步时的断点续传响应 + * + * @author ZT + */ +@Data +@Builder +@NoArgsConstructor +@AllArgsConstructor +@Schema(description = "游标分页结果") +public class CursorPageResult implements Serializable { + + private static final long serialVersionUID = 1L; + + @Schema(description = "数据列表") + private List list; + + @Schema(description = "下一页游标时间(最后一条数据的创建时间)") + private LocalDateTime nextCursorTime; + + @Schema(description = "下一页游标ID(最后一条数据的ID)") + private Long nextCursorId; + + @Schema(description = "本次返回的数据量") + private Integer count; + + @Schema(description = "是否还有更多数据") + private Boolean hasMore; + + @Schema(description = "总数据量(可选,首次查询时返回)") + private Long total; + + /** + * 构建首页结果 + */ + public static CursorPageResult of(List list, LocalDateTime nextCursorTime, + Long nextCursorId, boolean hasMore, Long total) { + return CursorPageResult.builder() + .list(list) + .nextCursorTime(nextCursorTime) + .nextCursorId(nextCursorId) + .count(list != null ? list.size() : 0) + .hasMore(hasMore) + .total(total) + .build(); + } + + /** + * 构建后续页结果 + */ + public static CursorPageResult of(List list, LocalDateTime nextCursorTime, + Long nextCursorId, boolean hasMore) { + return of(list, nextCursorTime, nextCursorId, hasMore, null); + } + + /** + * 构建空结果 + */ + public static CursorPageResult empty() { + return CursorPageResult.builder() + .list(List.of()) + .count(0) + .hasMore(false) + .build(); + } + +} diff --git a/zt-module-databus/zt-module-databus-api/src/main/java/com/zt/plat/module/databus/api/provider/DatabusDeptProviderApi.java b/zt-module-databus/zt-module-databus-api/src/main/java/com/zt/plat/module/databus/api/provider/DatabusDeptProviderApi.java new file mode 100644 index 00000000..a9cafe03 --- /dev/null +++ b/zt-module-databus/zt-module-databus-api/src/main/java/com/zt/plat/module/databus/api/provider/DatabusDeptProviderApi.java @@ -0,0 +1,71 @@ +package com.zt.plat.module.databus.api.provider; + +import com.zt.plat.framework.common.pojo.CommonResult; +import com.zt.plat.module.databus.api.data.DatabusDeptData; +import com.zt.plat.module.databus.api.dto.CursorPageReqDTO; +import com.zt.plat.module.databus.api.dto.CursorPageResult; +import io.swagger.v3.oas.annotations.Operation; +import io.swagger.v3.oas.annotations.Parameter; +import io.swagger.v3.oas.annotations.tags.Tag; +import org.springframework.cloud.openfeign.FeignClient; +import org.springframework.web.bind.annotation.*; + +import java.util.List; + +/** + * Databus 部门数据提供者 API + *

+ * 供 Databus 调用,获取部门数据用于全量/增量同步 + * + * @author ZT + */ +@FeignClient(name = "${databus.provider.dept.service:system-server}") +@Tag(name = "RPC 服务 - Databus 部门数据提供者") +public interface DatabusDeptProviderApi { + + String PREFIX = "/rpc/databus/dept"; + + /** + * 游标分页查询部门数据(用于全量同步) + * + * @param reqDTO 游标分页请求 + * @return 部门数据分页结果 + */ + @PostMapping(PREFIX + "/page-by-cursor") + @Operation(summary = "游标分页查询部门数据") + CommonResult> getPageByCursor(@RequestBody CursorPageReqDTO reqDTO); + + /** + * 根据ID查询部门详情(用于增量同步) + * + * @param id 部门ID + * @return 部门数据 + */ + @GetMapping(PREFIX + "/get") + @Operation(summary = "查询部门详情") + @Parameter(name = "id", description = "部门ID", required = true, example = "100") + CommonResult getById(@RequestParam("id") Long id); + + /** + * 批量查询部门详情(用于增量同步批量获取) + * + * @param ids 部门ID列表 + * @return 部门数据列表 + */ + @GetMapping(PREFIX + "/list") + @Operation(summary = "批量查询部门详情") + @Parameter(name = "ids", description = "部门ID列表", required = true, example = "100,101,102") + CommonResult> getListByIds(@RequestParam("ids") List ids); + + /** + * 统计部门总数(用于全量同步进度计算) + * + * @param tenantId 租户ID(可选) + * @return 部门总数 + */ + @GetMapping(PREFIX + "/count") + @Operation(summary = "统计部门总数") + @Parameter(name = "tenantId", description = "租户ID", example = "1") + CommonResult count(@RequestParam(value = "tenantId", required = false) Long tenantId); + +} diff --git a/zt-module-databus/zt-module-databus-api/src/main/java/com/zt/plat/module/databus/api/provider/DatabusPostProviderApi.java b/zt-module-databus/zt-module-databus-api/src/main/java/com/zt/plat/module/databus/api/provider/DatabusPostProviderApi.java new file mode 100644 index 00000000..632c312b --- /dev/null +++ b/zt-module-databus/zt-module-databus-api/src/main/java/com/zt/plat/module/databus/api/provider/DatabusPostProviderApi.java @@ -0,0 +1,71 @@ +package com.zt.plat.module.databus.api.provider; + +import com.zt.plat.framework.common.pojo.CommonResult; +import com.zt.plat.module.databus.api.data.DatabusPostData; +import com.zt.plat.module.databus.api.dto.CursorPageReqDTO; +import com.zt.plat.module.databus.api.dto.CursorPageResult; +import io.swagger.v3.oas.annotations.Operation; +import io.swagger.v3.oas.annotations.Parameter; +import io.swagger.v3.oas.annotations.tags.Tag; +import org.springframework.cloud.openfeign.FeignClient; +import org.springframework.web.bind.annotation.*; + +import java.util.List; + +/** + * Databus 岗位数据提供者 API + *

+ * 供 Databus 调用,获取岗位数据用于全量/增量同步 + * + * @author ZT + */ +@FeignClient(name = "${databus.provider.post.service:system-server}") +@Tag(name = "RPC 服务 - Databus 岗位数据提供者") +public interface DatabusPostProviderApi { + + String PREFIX = "/rpc/databus/post"; + + /** + * 游标分页查询岗位数据(用于全量同步) + * + * @param reqDTO 游标分页请求 + * @return 岗位数据分页结果 + */ + @PostMapping(PREFIX + "/page-by-cursor") + @Operation(summary = "游标分页查询岗位数据") + CommonResult> getPageByCursor(@RequestBody CursorPageReqDTO reqDTO); + + /** + * 根据ID查询岗位详情(用于增量同步) + * + * @param id 岗位ID + * @return 岗位数据 + */ + @GetMapping(PREFIX + "/get") + @Operation(summary = "查询岗位详情") + @Parameter(name = "id", description = "岗位ID", required = true, example = "1") + CommonResult getById(@RequestParam("id") Long id); + + /** + * 批量查询岗位详情(用于增量同步批量获取) + * + * @param ids 岗位ID列表 + * @return 岗位数据列表 + */ + @GetMapping(PREFIX + "/list") + @Operation(summary = "批量查询岗位详情") + @Parameter(name = "ids", description = "岗位ID列表", required = true, example = "1,2,3") + CommonResult> getListByIds(@RequestParam("ids") List ids); + + /** + * 统计岗位总数(用于全量同步进度计算) + * + * @param tenantId 租户ID(可选) + * @return 岗位总数 + */ + @GetMapping(PREFIX + "/count") + @Operation(summary = "统计岗位总数") + @Parameter(name = "tenantId", description = "租户ID", example = "1") + CommonResult count(@RequestParam(value = "tenantId", required = false) Long tenantId); + +} diff --git a/zt-module-databus/zt-module-databus-api/src/main/java/com/zt/plat/module/databus/api/provider/DatabusUserProviderApi.java b/zt-module-databus/zt-module-databus-api/src/main/java/com/zt/plat/module/databus/api/provider/DatabusUserProviderApi.java new file mode 100644 index 00000000..96061c43 --- /dev/null +++ b/zt-module-databus/zt-module-databus-api/src/main/java/com/zt/plat/module/databus/api/provider/DatabusUserProviderApi.java @@ -0,0 +1,71 @@ +package com.zt.plat.module.databus.api.provider; + +import com.zt.plat.framework.common.pojo.CommonResult; +import com.zt.plat.module.databus.api.data.DatabusAdminUserData; +import com.zt.plat.module.databus.api.dto.CursorPageReqDTO; +import com.zt.plat.module.databus.api.dto.CursorPageResult; +import io.swagger.v3.oas.annotations.Operation; +import io.swagger.v3.oas.annotations.Parameter; +import io.swagger.v3.oas.annotations.tags.Tag; +import org.springframework.cloud.openfeign.FeignClient; +import org.springframework.web.bind.annotation.*; + +import java.util.List; + +/** + * Databus 用户数据提供者 API + *

+ * 供 Databus 调用,获取用户数据用于全量/增量同步 + * + * @author ZT + */ +@FeignClient(name = "${databus.provider.user.service:system-server}") +@Tag(name = "RPC 服务 - Databus 用户数据提供者") +public interface DatabusUserProviderApi { + + String PREFIX = "/rpc/databus/user"; + + /** + * 游标分页查询用户数据(用于全量同步) + * + * @param reqDTO 游标分页请求 + * @return 用户数据分页结果 + */ + @PostMapping(PREFIX + "/page-by-cursor") + @Operation(summary = "游标分页查询用户数据") + CommonResult> getPageByCursor(@RequestBody CursorPageReqDTO reqDTO); + + /** + * 根据ID查询用户详情(用于增量同步) + * + * @param id 用户ID + * @return 用户数据 + */ + @GetMapping(PREFIX + "/get") + @Operation(summary = "查询用户详情") + @Parameter(name = "id", description = "用户ID", required = true, example = "1001") + CommonResult getById(@RequestParam("id") Long id); + + /** + * 批量查询用户详情(用于增量同步批量获取) + * + * @param ids 用户ID列表 + * @return 用户数据列表 + */ + @GetMapping(PREFIX + "/list") + @Operation(summary = "批量查询用户详情") + @Parameter(name = "ids", description = "用户ID列表", required = true, example = "1001,1002,1003") + CommonResult> getListByIds(@RequestParam("ids") List ids); + + /** + * 统计用户总数(用于全量同步进度计算) + * + * @param tenantId 租户ID(可选) + * @return 用户总数 + */ + @GetMapping(PREFIX + "/count") + @Operation(summary = "统计用户总数") + @Parameter(name = "tenantId", description = "租户ID", example = "1") + CommonResult count(@RequestParam(value = "tenantId", required = false) Long tenantId); + +} diff --git a/zt-module-system/zt-module-system-api/pom.xml b/zt-module-system/zt-module-system-api/pom.xml index f8862e90..0e3e130f 100644 --- a/zt-module-system/zt-module-system-api/pom.xml +++ b/zt-module-system/zt-module-system-api/pom.xml @@ -22,6 +22,13 @@ zt-common + + + com.zt.plat + zt-module-databus-api + ${revision} + + org.springdoc diff --git a/zt-module-system/zt-module-system-api/src/main/java/com/zt/plat/module/system/api/mq/DatabusDeptChangeMessage.java b/zt-module-system/zt-module-system-api/src/main/java/com/zt/plat/module/system/api/mq/DatabusDeptChangeMessage.java new file mode 100644 index 00000000..5bc3bce3 --- /dev/null +++ b/zt-module-system/zt-module-system-api/src/main/java/com/zt/plat/module/system/api/mq/DatabusDeptChangeMessage.java @@ -0,0 +1,160 @@ +package com.zt.plat.module.system.api.mq; + +import lombok.Data; +import lombok.experimental.Accessors; + +import java.io.Serializable; +import java.time.LocalDateTime; + +/** + * Databus 部门变更消息 + *

+ * 用于跨服务传递部门变更通知 + * + * @author ZT + */ +@Data +@Accessors(chain = true) +public class DatabusDeptChangeMessage implements Serializable { + + /** + * 消息 Topic + */ + public static final String TOPIC = "databus-change-system-dept"; + + /** + * 事件动作:create-创建 update-更新 delete-删除 + */ + private String action; + + /** + * 部门ID + */ + private Long deptId; + + /** + * 部门编码 + */ + private String deptCode; + + /** + * 部门名称 + */ + private String deptName; + + /** + * 部门简称 + */ + private String shortName; + + /** + * 上级部门ID + */ + private Long parentId; + + /** + * 排序 + */ + private Integer sort; + + /** + * 负责人用户ID + */ + private Long leaderUserId; + + /** + * 联系电话 + */ + private String phone; + + /** + * 邮箱 + */ + private String email; + + /** + * 状态(0正常 1停用) + */ + private Integer status; + + /** + * 是否公司 + */ + private Boolean isCompany; + + /** + * 是否集团 + */ + private Boolean isGroup; + + /** + * 部门来源类型 + */ + private Integer deptSource; + + /** + * 租户ID + */ + private Long tenantId; + + /** + * 事件时间 + */ + private LocalDateTime eventTime; + + // ==================== 静态工厂方法 ==================== + + public static DatabusDeptChangeMessage create(Long deptId, String deptCode, String deptName, String shortName, + Long parentId, Integer sort, Long leaderUserId, String phone, + String email, Integer status, Boolean isCompany, Boolean isGroup, + Integer deptSource, Long tenantId, LocalDateTime createTime) { + return new DatabusDeptChangeMessage() + .setAction("create") + .setDeptId(deptId) + .setDeptCode(deptCode) + .setDeptName(deptName) + .setShortName(shortName) + .setParentId(parentId) + .setSort(sort) + .setLeaderUserId(leaderUserId) + .setPhone(phone) + .setEmail(email) + .setStatus(status) + .setIsCompany(isCompany) + .setIsGroup(isGroup) + .setDeptSource(deptSource) + .setTenantId(tenantId) + .setEventTime(createTime != null ? createTime : LocalDateTime.now()); + } + + public static DatabusDeptChangeMessage update(Long deptId, String deptCode, String deptName, String shortName, + Long parentId, Integer sort, Long leaderUserId, String phone, + String email, Integer status, Boolean isCompany, Boolean isGroup, + Integer deptSource, Long tenantId, LocalDateTime updateTime) { + return new DatabusDeptChangeMessage() + .setAction("update") + .setDeptId(deptId) + .setDeptCode(deptCode) + .setDeptName(deptName) + .setShortName(shortName) + .setParentId(parentId) + .setSort(sort) + .setLeaderUserId(leaderUserId) + .setPhone(phone) + .setEmail(email) + .setStatus(status) + .setIsCompany(isCompany) + .setIsGroup(isGroup) + .setDeptSource(deptSource) + .setTenantId(tenantId) + .setEventTime(updateTime != null ? updateTime : LocalDateTime.now()); + } + + public static DatabusDeptChangeMessage delete(Long deptId, Long tenantId) { + return new DatabusDeptChangeMessage() + .setAction("delete") + .setDeptId(deptId) + .setTenantId(tenantId) + .setEventTime(LocalDateTime.now()); + } +} diff --git a/zt-module-system/zt-module-system-api/src/main/java/com/zt/plat/module/system/api/mq/DatabusPostChangeMessage.java b/zt-module-system/zt-module-system-api/src/main/java/com/zt/plat/module/system/api/mq/DatabusPostChangeMessage.java new file mode 100644 index 00000000..1d5803ed --- /dev/null +++ b/zt-module-system/zt-module-system-api/src/main/java/com/zt/plat/module/system/api/mq/DatabusPostChangeMessage.java @@ -0,0 +1,109 @@ +package com.zt.plat.module.system.api.mq; + +import lombok.Data; +import lombok.experimental.Accessors; + +import java.io.Serializable; +import java.time.LocalDateTime; + +/** + * Databus 岗位变更消息 + *

+ * 用于跨服务传递岗位变更通知 + * + * @author ZT + */ +@Data +@Accessors(chain = true) +public class DatabusPostChangeMessage implements Serializable { + + /** + * 消息 Topic + */ + public static final String TOPIC = "databus-change-system-post"; + + /** + * 事件动作:create-创建 update-更新 delete-删除 + */ + private String action; + + /** + * 岗位ID + */ + private Long postId; + + /** + * 岗位编码 + */ + private String postCode; + + /** + * 岗位名称 + */ + private String postName; + + /** + * 排序 + */ + private Integer sort; + + /** + * 状态(0正常 1停用) + */ + private Integer status; + + /** + * 备注 + */ + private String remark; + + /** + * 租户ID(PostDO 不支持多租户,固定为 null) + */ + private Long tenantId; + + /** + * 事件时间 + */ + private LocalDateTime eventTime; + + // ==================== 静态工厂方法 ==================== + + public static DatabusPostChangeMessage create(Long postId, String postCode, String postName, + Integer sort, Integer status, String remark, + LocalDateTime createTime) { + return new DatabusPostChangeMessage() + .setAction("create") + .setPostId(postId) + .setPostCode(postCode) + .setPostName(postName) + .setSort(sort) + .setStatus(status) + .setRemark(remark) + .setTenantId(null) // PostDO 不支持多租户 + .setEventTime(createTime != null ? createTime : LocalDateTime.now()); + } + + public static DatabusPostChangeMessage update(Long postId, String postCode, String postName, + Integer sort, Integer status, String remark, + LocalDateTime updateTime) { + return new DatabusPostChangeMessage() + .setAction("update") + .setPostId(postId) + .setPostCode(postCode) + .setPostName(postName) + .setSort(sort) + .setStatus(status) + .setRemark(remark) + .setTenantId(null) // PostDO 不支持多租户 + .setEventTime(updateTime != null ? updateTime : LocalDateTime.now()); + } + + public static DatabusPostChangeMessage delete(Long postId) { + return new DatabusPostChangeMessage() + .setAction("delete") + .setPostId(postId) + .setTenantId(null) // PostDO 不支持多租户 + .setEventTime(LocalDateTime.now()); + } +} diff --git a/zt-module-system/zt-module-system-api/src/main/java/com/zt/plat/module/system/api/mq/DatabusUserChangeMessage.java b/zt-module-system/zt-module-system-api/src/main/java/com/zt/plat/module/system/api/mq/DatabusUserChangeMessage.java new file mode 100644 index 00000000..9209b6f8 --- /dev/null +++ b/zt-module-system/zt-module-system-api/src/main/java/com/zt/plat/module/system/api/mq/DatabusUserChangeMessage.java @@ -0,0 +1,154 @@ +package com.zt.plat.module.system.api.mq; + +import lombok.Data; +import lombok.experimental.Accessors; + +import java.io.Serializable; +import java.time.LocalDateTime; +import java.util.Set; + +/** + * Databus 用户变更消息 + *

+ * 用于跨服务传递用户变更通知 + * + * @author ZT + */ +@Data +@Accessors(chain = true) +public class DatabusUserChangeMessage implements Serializable { + + /** + * 消息 Topic + */ + public static final String TOPIC = "databus-change-system-user"; + + /** + * 事件动作:create-创建 update-更新 delete-删除 + */ + private String action; + + /** + * 用户ID + */ + private Long userId; + + /** + * 用户名 + */ + private String username; + + /** + * 昵称 + */ + private String nickname; + + /** + * 备注 + */ + private String remark; + + /** + * 部门ID集合 + */ + private Set deptIds; + + /** + * 岗位ID集合 + */ + private Set postIds; + + /** + * 邮箱 + */ + private String email; + + /** + * 手机号码 + */ + private String mobile; + + /** + * 用户性别(0未知 1男 2女) + */ + private Integer sex; + + /** + * 头像地址 + */ + private String avatar; + + /** + * 状态(0正常 1停用) + */ + private Integer status; + + /** + * 用户来源类型 + */ + private Integer userSource; + + /** + * 租户ID + */ + private Long tenantId; + + /** + * 事件时间 + */ + private LocalDateTime eventTime; + + // ==================== 静态工厂方法 ==================== + + public static DatabusUserChangeMessage create(Long userId, String username, String nickname, String remark, + Set deptIds, Set postIds, String email, String mobile, + Integer sex, String avatar, Integer status, Integer userSource, + Long tenantId, LocalDateTime createTime) { + return new DatabusUserChangeMessage() + .setAction("create") + .setUserId(userId) + .setUsername(username) + .setNickname(nickname) + .setRemark(remark) + .setDeptIds(deptIds) + .setPostIds(postIds) + .setEmail(email) + .setMobile(mobile) + .setSex(sex) + .setAvatar(avatar) + .setStatus(status) + .setUserSource(userSource) + .setTenantId(tenantId) + .setEventTime(createTime != null ? createTime : LocalDateTime.now()); + } + + public static DatabusUserChangeMessage update(Long userId, String username, String nickname, String remark, + Set deptIds, Set postIds, String email, String mobile, + Integer sex, String avatar, Integer status, Integer userSource, + Long tenantId, LocalDateTime updateTime) { + return new DatabusUserChangeMessage() + .setAction("update") + .setUserId(userId) + .setUsername(username) + .setNickname(nickname) + .setRemark(remark) + .setDeptIds(deptIds) + .setPostIds(postIds) + .setEmail(email) + .setMobile(mobile) + .setSex(sex) + .setAvatar(avatar) + .setStatus(status) + .setUserSource(userSource) + .setTenantId(tenantId) + .setEventTime(updateTime != null ? updateTime : LocalDateTime.now()); + } + + public static DatabusUserChangeMessage delete(Long userId, Long tenantId) { + return new DatabusUserChangeMessage() + .setAction("delete") + .setUserId(userId) + .setTenantId(tenantId) + .setEventTime(LocalDateTime.now()); + } +} diff --git a/zt-module-system/zt-module-system-server/src/main/java/com/zt/plat/module/system/api/databus/DatabusDeptProviderApiImpl.java b/zt-module-system/zt-module-system-server/src/main/java/com/zt/plat/module/system/api/databus/DatabusDeptProviderApiImpl.java new file mode 100644 index 00000000..909845dc --- /dev/null +++ b/zt-module-system/zt-module-system-server/src/main/java/com/zt/plat/module/system/api/databus/DatabusDeptProviderApiImpl.java @@ -0,0 +1,218 @@ +package com.zt.plat.module.system.api.databus; + +import cn.hutool.core.collection.CollUtil; +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import com.zt.plat.framework.common.pojo.CommonResult; +import com.zt.plat.module.databus.api.data.DatabusDeptData; +import com.zt.plat.module.databus.api.dto.CursorPageReqDTO; +import com.zt.plat.module.databus.api.dto.CursorPageResult; +import com.zt.plat.module.databus.api.provider.DatabusDeptProviderApi; +import com.zt.plat.module.system.dal.dataobject.dept.DeptDO; +import com.zt.plat.module.system.dal.dataobject.user.AdminUserDO; +import com.zt.plat.module.system.dal.mysql.dept.DeptMapper; +import com.zt.plat.module.system.dal.mysql.user.AdminUserMapper; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.springframework.validation.annotation.Validated; +import org.springframework.web.bind.annotation.RestController; + +import java.util.*; +import java.util.stream.Collectors; + +import static com.zt.plat.framework.common.pojo.CommonResult.success; + +/** + * Databus 部门数据提供者 API 实现 + * + * @author ZT + */ +@Slf4j +@RestController +@Validated +public class DatabusDeptProviderApiImpl implements DatabusDeptProviderApi { + + @Resource + private DeptMapper deptMapper; + + @Resource + private AdminUserMapper userMapper; + + @Override + public CommonResult> getPageByCursor(CursorPageReqDTO reqDTO) { + // 构建游标查询条件 + LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper<>(); + + // 游标条件:create_time > cursorTime OR (create_time = cursorTime AND id > cursorId) + if (!reqDTO.isFirstPage()) { + queryWrapper.and(w -> w + .gt(DeptDO::getCreateTime, reqDTO.getCursorTime()) + .or(o -> o + .eq(DeptDO::getCreateTime, reqDTO.getCursorTime()) + .gt(DeptDO::getId, reqDTO.getCursorId()) + ) + ); + } + + // 租户过滤(如果指定) + if (reqDTO.getTenantId() != null) { + queryWrapper.eq(DeptDO::getTenantId, reqDTO.getTenantId()); + } + + // 按 create_time, id 升序排列,确保顺序稳定 + queryWrapper.orderByAsc(DeptDO::getCreateTime) + .orderByAsc(DeptDO::getId); + + // 多查一条判断是否有更多数据 + int limit = reqDTO.getBatchSize() != null ? reqDTO.getBatchSize() : 100; + queryWrapper.last("LIMIT " + (limit + 1)); + + List deptList = deptMapper.selectList(queryWrapper); + + // 判断是否有更多 + boolean hasMore = deptList.size() > limit; + if (hasMore) { + deptList = deptList.subList(0, limit); + } + + if (CollUtil.isEmpty(deptList)) { + return success(CursorPageResult.empty()); + } + + // 收集负责人用户ID + Set leaderUserIds = deptList.stream() + .map(DeptDO::getLeaderUserId) + .filter(Objects::nonNull) + .collect(Collectors.toSet()); + + // 批量查询负责人用户名 + Map userNameMap = new HashMap<>(); + if (CollUtil.isNotEmpty(leaderUserIds)) { + List users = userMapper.selectBatchIds(leaderUserIds); + userNameMap = users.stream() + .collect(Collectors.toMap(AdminUserDO::getId, AdminUserDO::getNickname, (v1, v2) -> v1)); + } + + // 转换为同步数据 + Map finalUserNameMap = userNameMap; + List dataList = deptList.stream() + .map(dept -> convertToDatabusDeptData(dept, finalUserNameMap)) + .collect(Collectors.toList()); + + // 获取最后一条数据的游标 + DeptDO lastDept = deptList.get(deptList.size() - 1); + + // 首次查询时返回总数 + Long total = null; + if (reqDTO.isFirstPage()) { + LambdaQueryWrapper countWrapper = new LambdaQueryWrapper<>(); + if (reqDTO.getTenantId() != null) { + countWrapper.eq(DeptDO::getTenantId, reqDTO.getTenantId()); + } + total = deptMapper.selectCount(countWrapper); + } + + return success(CursorPageResult.of( + dataList, + lastDept.getCreateTime(), + lastDept.getId(), + hasMore, + total + )); + } + + @Override + public CommonResult getById(Long id) { + DeptDO dept = deptMapper.selectById(id); + if (dept == null) { + return success(null); + } + + // 查询负责人用户名 + Map userNameMap = new HashMap<>(); + if (dept.getLeaderUserId() != null) { + AdminUserDO user = userMapper.selectById(dept.getLeaderUserId()); + if (user != null) { + userNameMap.put(user.getId(), user.getNickname()); + } + } + + return success(convertToDatabusDeptData(dept, userNameMap)); + } + + @Override + public CommonResult> getListByIds(List ids) { + if (CollUtil.isEmpty(ids)) { + return success(Collections.emptyList()); + } + + List deptList = deptMapper.selectBatchIds(ids); + if (CollUtil.isEmpty(deptList)) { + return success(Collections.emptyList()); + } + + // 收集负责人用户ID + Set leaderUserIds = deptList.stream() + .map(DeptDO::getLeaderUserId) + .filter(Objects::nonNull) + .collect(Collectors.toSet()); + + // 批量查询负责人用户名 + Map userNameMap = new HashMap<>(); + if (CollUtil.isNotEmpty(leaderUserIds)) { + List users = userMapper.selectBatchIds(leaderUserIds); + userNameMap = users.stream() + .collect(Collectors.toMap(AdminUserDO::getId, AdminUserDO::getNickname, (v1, v2) -> v1)); + } + + Map finalUserNameMap = userNameMap; + List dataList = deptList.stream() + .map(dept -> convertToDatabusDeptData(dept, finalUserNameMap)) + .collect(Collectors.toList()); + + return success(dataList); + } + + @Override + public CommonResult count(Long tenantId) { + LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper<>(); + if (tenantId != null) { + queryWrapper.eq(DeptDO::getTenantId, tenantId); + } + return success(deptMapper.selectCount(queryWrapper)); + } + + /** + * 将 DeptDO 转换为 DatabusDeptData + */ + private DatabusDeptData convertToDatabusDeptData(DeptDO dept, Map userNameMap) { + // 根据 isCompany 反推 deptType + Integer deptType = null; + if (Boolean.TRUE.equals(dept.getIsCompany())) { + deptType = 28; // 公司 + } else if (Boolean.FALSE.equals(dept.getIsCompany())) { + deptType = 26; // 部门 + } + + return DatabusDeptData.builder() + .id(dept.getId()) + .code(dept.getCode()) + .name(dept.getName()) + .shortName(dept.getShortName()) + .parentId(dept.getParentId()) + .sort(dept.getSort()) + .status(dept.getStatus()) + .deptType(deptType) + .isGroup(dept.getIsGroup()) + .isCompany(dept.getIsCompany()) + .deptSource(dept.getDeptSource()) + .leaderUserId(dept.getLeaderUserId()) + .leaderUserName(dept.getLeaderUserId() != null ? userNameMap.get(dept.getLeaderUserId()) : null) + .phone(dept.getPhone()) + .email(dept.getEmail()) + .tenantId(dept.getTenantId()) + .createTime(dept.getCreateTime()) + .updateTime(dept.getUpdateTime()) + .build(); + } + +} diff --git a/zt-module-system/zt-module-system-server/src/main/java/com/zt/plat/module/system/api/databus/DatabusPostProviderApiImpl.java b/zt-module-system/zt-module-system-server/src/main/java/com/zt/plat/module/system/api/databus/DatabusPostProviderApiImpl.java new file mode 100644 index 00000000..5e31b2f9 --- /dev/null +++ b/zt-module-system/zt-module-system-server/src/main/java/com/zt/plat/module/system/api/databus/DatabusPostProviderApiImpl.java @@ -0,0 +1,148 @@ +package com.zt.plat.module.system.api.databus; + +import cn.hutool.core.collection.CollUtil; +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import com.zt.plat.framework.common.pojo.CommonResult; +import com.zt.plat.module.databus.api.data.DatabusPostData; +import com.zt.plat.module.databus.api.dto.CursorPageReqDTO; +import com.zt.plat.module.databus.api.dto.CursorPageResult; +import com.zt.plat.module.databus.api.provider.DatabusPostProviderApi; +import com.zt.plat.module.system.dal.dataobject.dept.PostDO; +import com.zt.plat.module.system.dal.mysql.dept.PostMapper; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.springframework.validation.annotation.Validated; +import org.springframework.web.bind.annotation.RestController; + +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +import static com.zt.plat.framework.common.pojo.CommonResult.success; + +/** + * Databus 岗位数据提供者 API 实现 + * + * @author ZT + */ +@Slf4j +@RestController +@Validated +public class DatabusPostProviderApiImpl implements DatabusPostProviderApi { + + @Resource + private PostMapper postMapper; + + @Override + public CommonResult> getPageByCursor(CursorPageReqDTO reqDTO) { + // 构建游标查询条件 + LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper<>(); + + // 游标条件:create_time > cursorTime OR (create_time = cursorTime AND id > cursorId) + if (!reqDTO.isFirstPage()) { + queryWrapper.and(w -> w + .gt(PostDO::getCreateTime, reqDTO.getCursorTime()) + .or(o -> o + .eq(PostDO::getCreateTime, reqDTO.getCursorTime()) + .gt(PostDO::getId, reqDTO.getCursorId()) + ) + ); + } + + // PostDO 不支持多租户,忽略租户过滤 + + // 按 create_time, id 升序排列,确保顺序稳定 + queryWrapper.orderByAsc(PostDO::getCreateTime) + .orderByAsc(PostDO::getId); + + // 多查一条判断是否有更多数据 + int limit = reqDTO.getBatchSize() != null ? reqDTO.getBatchSize() : 100; + queryWrapper.last("LIMIT " + (limit + 1)); + + List postList = postMapper.selectList(queryWrapper); + + // 判断是否有更多 + boolean hasMore = postList.size() > limit; + if (hasMore) { + postList = postList.subList(0, limit); + } + + if (CollUtil.isEmpty(postList)) { + return success(CursorPageResult.empty()); + } + + // 转换为同步数据 + List dataList = postList.stream() + .map(this::convertToDatabusPostData) + .collect(Collectors.toList()); + + // 获取最后一条数据的游标 + PostDO lastPost = postList.get(postList.size() - 1); + + // 首次查询时返回总数 + Long total = null; + if (reqDTO.isFirstPage()) { + total = postMapper.selectCount(new LambdaQueryWrapper<>()); + } + + return success(CursorPageResult.of( + dataList, + lastPost.getCreateTime(), + lastPost.getId(), + hasMore, + total + )); + } + + @Override + public CommonResult getById(Long id) { + PostDO post = postMapper.selectById(id); + if (post == null) { + return success(null); + } + + return success(convertToDatabusPostData(post)); + } + + @Override + public CommonResult> getListByIds(List ids) { + if (CollUtil.isEmpty(ids)) { + return success(Collections.emptyList()); + } + + List postList = postMapper.selectBatchIds(ids); + if (CollUtil.isEmpty(postList)) { + return success(Collections.emptyList()); + } + + List dataList = postList.stream() + .map(this::convertToDatabusPostData) + .collect(Collectors.toList()); + + return success(dataList); + } + + @Override + public CommonResult count(Long tenantId) { + // PostDO 不支持多租户,忽略租户参数 + return success(postMapper.selectCount(new LambdaQueryWrapper<>())); + } + + /** + * 将 PostDO 转换为 DatabusPostData + */ + private DatabusPostData convertToDatabusPostData(PostDO post) { + return DatabusPostData.builder() + .id(post.getId()) + .code(post.getCode()) + .name(post.getName()) + .sort(post.getSort()) + .status(post.getStatus()) + .remark(post.getRemark()) + .tenantId(null) // PostDO 不支持多租户 + .createTime(post.getCreateTime()) + .updateTime(post.getUpdateTime()) + .build(); + } + +} diff --git a/zt-module-system/zt-module-system-server/src/main/java/com/zt/plat/module/system/api/databus/DatabusUserProviderApiImpl.java b/zt-module-system/zt-module-system-server/src/main/java/com/zt/plat/module/system/api/databus/DatabusUserProviderApiImpl.java new file mode 100644 index 00000000..286b9984 --- /dev/null +++ b/zt-module-system/zt-module-system-server/src/main/java/com/zt/plat/module/system/api/databus/DatabusUserProviderApiImpl.java @@ -0,0 +1,267 @@ +package com.zt.plat.module.system.api.databus; + +import cn.hutool.core.collection.CollUtil; +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import com.zt.plat.framework.common.pojo.CommonResult; +import com.zt.plat.module.databus.api.data.DatabusAdminUserData; +import com.zt.plat.module.databus.api.data.DatabusAdminUserData.DeptSimpleInfo; +import com.zt.plat.module.databus.api.data.DatabusAdminUserData.PostSimpleInfo; +import com.zt.plat.module.databus.api.dto.CursorPageReqDTO; +import com.zt.plat.module.databus.api.dto.CursorPageResult; +import com.zt.plat.module.databus.api.provider.DatabusUserProviderApi; +import com.zt.plat.module.system.dal.dataobject.dept.DeptDO; +import com.zt.plat.module.system.dal.dataobject.dept.PostDO; +import com.zt.plat.module.system.dal.dataobject.user.AdminUserDO; +import com.zt.plat.module.system.dal.dataobject.userdept.UserDeptDO; +import com.zt.plat.module.system.dal.mysql.dept.DeptMapper; +import com.zt.plat.module.system.dal.mysql.dept.PostMapper; +import com.zt.plat.module.system.dal.mysql.user.AdminUserMapper; +import com.zt.plat.module.system.dal.mysql.userdept.UserDeptMapper; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.springframework.validation.annotation.Validated; +import org.springframework.web.bind.annotation.RestController; + +import java.util.*; +import java.util.stream.Collectors; + +import static com.zt.plat.framework.common.pojo.CommonResult.success; + +/** + * Databus 用户数据提供者 API 实现 + * + * @author ZT + */ +@Slf4j +@RestController +@Validated +public class DatabusUserProviderApiImpl implements DatabusUserProviderApi { + + @Resource + private AdminUserMapper userMapper; + + @Resource + private UserDeptMapper userDeptMapper; + + @Resource + private DeptMapper deptMapper; + + @Resource + private PostMapper postMapper; + + @Override + public CommonResult> getPageByCursor(CursorPageReqDTO reqDTO) { + // 构建游标查询条件 + LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper<>(); + + // 游标条件:create_time > cursorTime OR (create_time = cursorTime AND id > cursorId) + if (!reqDTO.isFirstPage()) { + queryWrapper.and(w -> w + .gt(AdminUserDO::getCreateTime, reqDTO.getCursorTime()) + .or(o -> o + .eq(AdminUserDO::getCreateTime, reqDTO.getCursorTime()) + .gt(AdminUserDO::getId, reqDTO.getCursorId()) + ) + ); + } + + // 租户过滤(如果指定) + if (reqDTO.getTenantId() != null) { + queryWrapper.eq(AdminUserDO::getTenantId, reqDTO.getTenantId()); + } + + // 按 create_time, id 升序排列,确保顺序稳定 + queryWrapper.orderByAsc(AdminUserDO::getCreateTime) + .orderByAsc(AdminUserDO::getId); + + // 多查一条判断是否有更多数据 + int limit = reqDTO.getBatchSize() != null ? reqDTO.getBatchSize() : 100; + queryWrapper.last("LIMIT " + (limit + 1)); + + List userList = userMapper.selectList(queryWrapper); + + // 判断是否有更多 + boolean hasMore = userList.size() > limit; + if (hasMore) { + userList = userList.subList(0, limit); + } + + if (CollUtil.isEmpty(userList)) { + return success(CursorPageResult.empty()); + } + + // 批量聚合用户关联数据 + List dataList = aggregateUserData(userList); + + // 获取最后一条数据的游标 + AdminUserDO lastUser = userList.get(userList.size() - 1); + + // 首次查询时返回总数 + Long total = null; + if (reqDTO.isFirstPage()) { + LambdaQueryWrapper countWrapper = new LambdaQueryWrapper<>(); + if (reqDTO.getTenantId() != null) { + countWrapper.eq(AdminUserDO::getTenantId, reqDTO.getTenantId()); + } + total = userMapper.selectCount(countWrapper); + } + + return success(CursorPageResult.of( + dataList, + lastUser.getCreateTime(), + lastUser.getId(), + hasMore, + total + )); + } + + @Override + public CommonResult getById(Long id) { + AdminUserDO user = userMapper.selectById(id); + if (user == null) { + return success(null); + } + + List dataList = aggregateUserData(Collections.singletonList(user)); + return success(dataList.isEmpty() ? null : dataList.get(0)); + } + + @Override + public CommonResult> getListByIds(List ids) { + if (CollUtil.isEmpty(ids)) { + return success(Collections.emptyList()); + } + + List userList = userMapper.selectBatchIds(ids); + if (CollUtil.isEmpty(userList)) { + return success(Collections.emptyList()); + } + + return success(aggregateUserData(userList)); + } + + @Override + public CommonResult count(Long tenantId) { + LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper<>(); + if (tenantId != null) { + queryWrapper.eq(AdminUserDO::getTenantId, tenantId); + } + return success(userMapper.selectCount(queryWrapper)); + } + + /** + * 批量聚合用户关联数据(部门、岗位) + */ + private List aggregateUserData(List userList) { + if (CollUtil.isEmpty(userList)) { + return Collections.emptyList(); + } + + // 收集所有用户ID + List userIds = userList.stream() + .map(AdminUserDO::getId) + .collect(Collectors.toList()); + + // 批量查询用户-部门关系 + List userDeptList = userDeptMapper.selectValidListByUserIds(userIds); + Map> userDeptIdsMap = userDeptList.stream() + .collect(Collectors.groupingBy( + UserDeptDO::getUserId, + Collectors.mapping(UserDeptDO::getDeptId, Collectors.toList()) + )); + + // 收集所有部门ID + Set allDeptIds = userDeptList.stream() + .map(UserDeptDO::getDeptId) + .collect(Collectors.toSet()); + + // 批量查询部门信息 + Map deptMap = new HashMap<>(); + if (CollUtil.isNotEmpty(allDeptIds)) { + List deptList = deptMapper.selectBatchIds(allDeptIds); + deptMap = deptList.stream() + .collect(Collectors.toMap(DeptDO::getId, d -> d, (v1, v2) -> v1)); + } + + // 收集所有岗位ID + Set allPostIds = userList.stream() + .map(AdminUserDO::getPostIds) + .filter(Objects::nonNull) + .flatMap(Set::stream) + .collect(Collectors.toSet()); + + // 批量查询岗位信息 + Map postMap = new HashMap<>(); + if (CollUtil.isNotEmpty(allPostIds)) { + List postList = postMapper.selectBatchIds(allPostIds); + postMap = postList.stream() + .collect(Collectors.toMap(PostDO::getId, p -> p, (v1, v2) -> v1)); + } + + // 转换为同步数据 + Map finalDeptMap = deptMap; + Map finalPostMap = postMap; + + return userList.stream() + .map(user -> convertToDatabusAdminUserData(user, userDeptIdsMap, finalDeptMap, finalPostMap)) + .collect(Collectors.toList()); + } + + /** + * 将 AdminUserDO 转换为 DatabusAdminUserData + */ + private DatabusAdminUserData convertToDatabusAdminUserData(AdminUserDO user, + Map> userDeptIdsMap, + Map deptMap, + Map postMap) { + // 获取用户的部门ID列表 + List deptIds = userDeptIdsMap.getOrDefault(user.getId(), Collections.emptyList()); + + // 构建部门简要信息列表 + List depts = deptIds.stream() + .map(deptMap::get) + .filter(Objects::nonNull) + .map(dept -> DeptSimpleInfo.builder() + .deptId(dept.getId()) + .deptCode(dept.getCode()) + .deptName(dept.getName()) + .build()) + .collect(Collectors.toList()); + + // 获取用户的岗位ID列表 + Set postIds = user.getPostIds(); + List postIdList = postIds != null ? new ArrayList<>(postIds) : Collections.emptyList(); + + // 构建岗位简要信息列表 + List posts = postIdList.stream() + .map(postMap::get) + .filter(Objects::nonNull) + .map(post -> PostSimpleInfo.builder() + .postId(post.getId()) + .postCode(post.getCode()) + .postName(post.getName()) + .build()) + .collect(Collectors.toList()); + + return DatabusAdminUserData.builder() + .id(user.getId()) + .username(user.getUsername()) + .nickname(user.getNickname()) + .mobile(user.getMobile()) + .email(user.getEmail()) + .sex(user.getSex()) + .avatar(user.getAvatar()) + .status(user.getStatus()) + .remark(user.getRemark()) + .userSource(user.getUserSource()) + .deptIds(deptIds) + .depts(depts) + .postIds(postIdList) + .posts(posts) + .tenantId(user.getTenantId()) + .createTime(user.getCreateTime()) + .updateTime(user.getUpdateTime()) + .build(); + } + +} diff --git a/zt-module-system/zt-module-system-server/src/main/java/com/zt/plat/module/system/mq/producer/databus/DatabusChangeProducer.java b/zt-module-system/zt-module-system-server/src/main/java/com/zt/plat/module/system/mq/producer/databus/DatabusChangeProducer.java new file mode 100644 index 00000000..76c5b624 --- /dev/null +++ b/zt-module-system/zt-module-system-server/src/main/java/com/zt/plat/module/system/mq/producer/databus/DatabusChangeProducer.java @@ -0,0 +1,193 @@ +package com.zt.plat.module.system.mq.producer.databus; + +import com.zt.plat.module.system.api.mq.DatabusDeptChangeMessage; +import com.zt.plat.module.system.api.mq.DatabusPostChangeMessage; +import com.zt.plat.module.system.api.mq.DatabusUserChangeMessage; +import com.zt.plat.module.system.dal.dataobject.dept.DeptDO; +import com.zt.plat.module.system.dal.dataobject.dept.PostDO; +import com.zt.plat.module.system.dal.dataobject.user.AdminUserDO; +import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.spring.core.RocketMQTemplate; +import org.springframework.stereotype.Component; + +import jakarta.annotation.Resource; + +/** + * Databus 数据变更消息生产者 + *

+ * 用于发送部门、用户、岗位变更消息到 MQ,供 databus-server 消费 + * + * @author ZT + */ +@Slf4j +@Component +public class DatabusChangeProducer { + + @Resource + private RocketMQTemplate rocketMQTemplate; + + // ==================== 部门变更消息 ==================== + + /** + * 发送部门创建消息 + */ + public void sendDeptCreatedMessage(DeptDO dept) { + DatabusDeptChangeMessage message = DatabusDeptChangeMessage.create( + dept.getId(), dept.getCode(), dept.getName(), dept.getShortName(), + dept.getParentId(), dept.getSort(), dept.getLeaderUserId(), dept.getPhone(), + dept.getEmail(), dept.getStatus(), dept.getIsCompany(), dept.getIsGroup(), + dept.getDeptSource(), dept.getTenantId(), dept.getCreateTime()); + sendDeptChangeMessage(message); + } + + /** + * 发送部门更新消息 + */ + public void sendDeptUpdatedMessage(DeptDO dept) { + DatabusDeptChangeMessage message = DatabusDeptChangeMessage.update( + dept.getId(), dept.getCode(), dept.getName(), dept.getShortName(), + dept.getParentId(), dept.getSort(), dept.getLeaderUserId(), dept.getPhone(), + dept.getEmail(), dept.getStatus(), dept.getIsCompany(), dept.getIsGroup(), + dept.getDeptSource(), dept.getTenantId(), dept.getUpdateTime()); + sendDeptChangeMessage(message); + } + + /** + * 发送部门删除消息 + */ + public void sendDeptDeletedMessage(Long deptId, Long tenantId) { + DatabusDeptChangeMessage message = DatabusDeptChangeMessage.delete(deptId, tenantId); + sendDeptChangeMessage(message); + } + + private void sendDeptChangeMessage(DatabusDeptChangeMessage message) { + try { + rocketMQTemplate.asyncSend(DatabusDeptChangeMessage.TOPIC, message, new org.apache.rocketmq.client.producer.SendCallback() { + @Override + public void onSuccess(org.apache.rocketmq.client.producer.SendResult sendResult) { + log.info("[Databus] 部门变更消息发送成功, action={}, deptId={}, msgId={}", + message.getAction(), message.getDeptId(), sendResult.getMsgId()); + } + + @Override + public void onException(Throwable e) { + log.error("[Databus] 部门变更消息发送失败, action={}, deptId={}", + message.getAction(), message.getDeptId(), e); + } + }); + } catch (Exception e) { + log.error("[Databus] 部门变更消息发送异常, action={}, deptId={}", + message.getAction(), message.getDeptId(), e); + } + } + + // ==================== 用户变更消息 ==================== + + /** + * 发送用户创建消息(完整数据) + */ + public void sendUserCreatedMessage(AdminUserDO user) { + DatabusUserChangeMessage message = DatabusUserChangeMessage.create( + user.getId(), user.getUsername(), user.getNickname(), + user.getRemark(), user.getDeptIds(), user.getPostIds(), + user.getEmail(), user.getMobile(), user.getSex(), user.getAvatar(), + user.getStatus(), user.getUserSource(), + user.getTenantId(), user.getCreateTime()); + sendUserChangeMessage(message); + } + + /** + * 发送用户更新消息(完整数据) + */ + public void sendUserUpdatedMessage(AdminUserDO user) { + DatabusUserChangeMessage message = DatabusUserChangeMessage.update( + user.getId(), user.getUsername(), user.getNickname(), + user.getRemark(), user.getDeptIds(), user.getPostIds(), + user.getEmail(), user.getMobile(), user.getSex(), user.getAvatar(), + user.getStatus(), user.getUserSource(), + user.getTenantId(), user.getUpdateTime()); + sendUserChangeMessage(message); + } + + /** + * 发送用户删除消息 + */ + public void sendUserDeletedMessage(Long userId, Long tenantId) { + DatabusUserChangeMessage message = DatabusUserChangeMessage.delete(userId, tenantId); + sendUserChangeMessage(message); + } + + private void sendUserChangeMessage(DatabusUserChangeMessage message) { + try { + rocketMQTemplate.asyncSend(DatabusUserChangeMessage.TOPIC, message, new org.apache.rocketmq.client.producer.SendCallback() { + @Override + public void onSuccess(org.apache.rocketmq.client.producer.SendResult sendResult) { + log.info("[Databus] 用户变更消息发送成功, action={}, userId={}, msgId={}", + message.getAction(), message.getUserId(), sendResult.getMsgId()); + } + + @Override + public void onException(Throwable e) { + log.error("[Databus] 用户变更消息发送失败, action={}, userId={}", + message.getAction(), message.getUserId(), e); + } + }); + } catch (Exception e) { + log.error("[Databus] 用户变更消息发送异常, action={}, userId={}", + message.getAction(), message.getUserId(), e); + } + } + + // ==================== 岗位变更消息 ==================== + + /** + * 发送岗位创建消息 + */ + public void sendPostCreatedMessage(PostDO post) { + DatabusPostChangeMessage message = DatabusPostChangeMessage.create( + post.getId(), post.getCode(), post.getName(), + post.getSort(), post.getStatus(), post.getRemark(), + post.getCreateTime()); + sendPostChangeMessage(message); + } + + /** + * 发送岗位更新消息 + */ + public void sendPostUpdatedMessage(PostDO post) { + DatabusPostChangeMessage message = DatabusPostChangeMessage.update( + post.getId(), post.getCode(), post.getName(), + post.getSort(), post.getStatus(), post.getRemark(), + post.getUpdateTime()); + sendPostChangeMessage(message); + } + + /** + * 发送岗位删除消息 + */ + public void sendPostDeletedMessage(Long postId) { + DatabusPostChangeMessage message = DatabusPostChangeMessage.delete(postId); + sendPostChangeMessage(message); + } + + private void sendPostChangeMessage(DatabusPostChangeMessage message) { + try { + rocketMQTemplate.asyncSend(DatabusPostChangeMessage.TOPIC, message, new org.apache.rocketmq.client.producer.SendCallback() { + @Override + public void onSuccess(org.apache.rocketmq.client.producer.SendResult sendResult) { + log.info("[Databus] 岗位变更消息发送成功, action={}, postId={}, msgId={}", + message.getAction(), message.getPostId(), sendResult.getMsgId()); + } + + @Override + public void onException(Throwable e) { + log.error("[Databus] 岗位变更消息发送失败, action={}, postId={}", + message.getAction(), message.getPostId(), e); + } + }); + } catch (Exception e) { + log.error("[Databus] 岗位变更消息发送异常, action={}, postId={}", + message.getAction(), message.getPostId(), e); + } + } +} diff --git a/zt-module-system/zt-module-system-server/src/main/java/com/zt/plat/module/system/service/dept/DeptServiceImpl.java b/zt-module-system/zt-module-system-server/src/main/java/com/zt/plat/module/system/service/dept/DeptServiceImpl.java index 0d1da94f..3cf54bee 100644 --- a/zt-module-system/zt-module-system-server/src/main/java/com/zt/plat/module/system/service/dept/DeptServiceImpl.java +++ b/zt-module-system/zt-module-system-server/src/main/java/com/zt/plat/module/system/service/dept/DeptServiceImpl.java @@ -50,6 +50,8 @@ public class DeptServiceImpl implements DeptService { private DeptMapper deptMapper; @Resource private UserDeptMapper userDeptMapper; + @Resource + private com.zt.plat.module.system.mq.producer.databus.DatabusChangeProducer databusChangeProducer; private static final String ROOT_CODE_PREFIX = "ZT"; private static final int CODE_SEGMENT_LENGTH = 3; @@ -99,6 +101,10 @@ public class DeptServiceImpl implements DeptService { dept.setDeptSource(DeptSourceEnum.EXTERNAL.getSource()); } deptMapper.insert(dept); + + // 发布部门创建事件 + databusChangeProducer.sendDeptCreatedMessage(dept); + return dept.getId(); } @@ -153,6 +159,12 @@ public class DeptServiceImpl implements DeptService { DeptDO updateObj = BeanUtils.toBean(updateReqVO, DeptDO.class); deptMapper.updateById(updateObj); + // 发布部门更新事件(重新查询获取完整数据) + DeptDO updatedDept = deptMapper.selectById(updateObj.getId()); + if (updatedDept != null) { + databusChangeProducer.sendDeptUpdatedMessage(updatedDept); + } + if (parentChanged) { refreshChildCodesRecursively(updateObj.getId(), updateReqVO.getCode()); } @@ -168,8 +180,16 @@ public class DeptServiceImpl implements DeptService { if (deptMapper.selectCountByParentId(id) > 0) { throw exception(DEPT_EXITS_CHILDREN); } + + // 删除前先查询部门信息(用于发布事件) + DeptDO dept = deptMapper.selectById(id); + Long tenantId = (dept != null) ? dept.getTenantId() : null; + // 删除部门 deptMapper.deleteById(id); + + // 发布部门删除事件 + databusChangeProducer.sendDeptDeletedMessage(id, tenantId); } @VisibleForTesting diff --git a/zt-module-system/zt-module-system-server/src/main/java/com/zt/plat/module/system/service/dept/PostServiceImpl.java b/zt-module-system/zt-module-system-server/src/main/java/com/zt/plat/module/system/service/dept/PostServiceImpl.java index 4d551fce..ff0aa7b4 100644 --- a/zt-module-system/zt-module-system-server/src/main/java/com/zt/plat/module/system/service/dept/PostServiceImpl.java +++ b/zt-module-system/zt-module-system-server/src/main/java/com/zt/plat/module/system/service/dept/PostServiceImpl.java @@ -34,6 +34,9 @@ public class PostServiceImpl implements PostService { @Resource private PostMapper postMapper; + @Resource + private com.zt.plat.module.system.mq.producer.databus.DatabusChangeProducer databusChangeProducer; + @Override public Long createPost(PostSaveReqVO createReqVO) { // 校验正确性 @@ -42,6 +45,10 @@ public class PostServiceImpl implements PostService { // 插入岗位 PostDO post = BeanUtils.toBean(createReqVO, PostDO.class); postMapper.insert(post); + + // 发布岗位创建事件 + databusChangeProducer.sendPostCreatedMessage(post); + return post.getId(); } @@ -53,14 +60,24 @@ public class PostServiceImpl implements PostService { // 更新岗位 PostDO updateObj = BeanUtils.toBean(updateReqVO, PostDO.class); postMapper.updateById(updateObj); + + // 发布岗位更新事件(重新查询获取完整数据) + PostDO updatedPost = postMapper.selectById(updateObj.getId()); + if (updatedPost != null) { + databusChangeProducer.sendPostUpdatedMessage(updatedPost); + } } @Override public void deletePost(Long id) { // 校验是否存在 validatePostExists(id); + // 删除部门 postMapper.deleteById(id); + + // 发布岗位删除事件(PostDO 不支持多租户,tenantId 为 null) + databusChangeProducer.sendPostDeletedMessage(id); } private void validatePostForCreateOrUpdate(Long id, String name, String code) { diff --git a/zt-module-system/zt-module-system-server/src/main/java/com/zt/plat/module/system/service/user/AdminUserServiceImpl.java b/zt-module-system/zt-module-system-server/src/main/java/com/zt/plat/module/system/service/user/AdminUserServiceImpl.java index 4384f9ed..0175d5ed 100644 --- a/zt-module-system/zt-module-system-server/src/main/java/com/zt/plat/module/system/service/user/AdminUserServiceImpl.java +++ b/zt-module-system/zt-module-system-server/src/main/java/com/zt/plat/module/system/service/user/AdminUserServiceImpl.java @@ -93,6 +93,9 @@ public class AdminUserServiceImpl implements AdminUserService { @Resource private ConfigApi configApi; + + @Resource + private com.zt.plat.module.system.mq.producer.databus.DatabusChangeProducer databusChangeProducer; @Resource private UserDeptService userDeptService; @@ -140,6 +143,9 @@ public class AdminUserServiceImpl implements AdminUserService { postId -> new UserPostDO().setUserId(user.getId()).setPostId(postId))); } + // 2.4 发布用户创建事件 + databusChangeProducer.sendUserCreatedMessage(user); + // 3. 记录操作日志上下文 LogRecordContext.putVariable("user", user); return user.getId(); @@ -228,6 +234,12 @@ public class AdminUserServiceImpl implements AdminUserService { // 2.3 更新岗位 updateUserPost(updateReqVO, updateObj); + // 2.4 发布用户更新事件(重新查询获取完整数据) + AdminUserDO updatedUser = userMapper.selectById(updateObj.getId()); + if (updatedUser != null) { + databusChangeProducer.sendUserUpdatedMessage(updatedUser); + } + // 3. 记录操作日志上下文 LogRecordContext.putVariable(DiffParseFunction.OLD_OBJECT, BeanUtils.toBean(oldUser, UserSaveReqVO.class)); LogRecordContext.putVariable("user", oldUser); @@ -317,6 +329,9 @@ public class AdminUserServiceImpl implements AdminUserService { // 2.2 删除用户岗位 userPostMapper.deleteByUserId(id); + // 2.3 发布用户删除事件 + databusChangeProducer.sendUserDeletedMessage(id, user.getTenantId()); + // 3. 记录操作日志上下文 LogRecordContext.putVariable("user", user); }