Merge branch 'dev' into 'test'

feat(databus): 完成阶段一+二-数据契约层与数据提供者

See merge request jygk/dsc!4
This commit is contained in:
wencai he
2025-12-04 06:13:11 +00:00
163 changed files with 11680 additions and 8 deletions

View File

@@ -0,0 +1,132 @@
package com.zt.plat.module.databus.api.data;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
import java.time.LocalDateTime;
import java.util.List;
/**
* 用户数据对象DataBus
*
* @author ZT
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
@Schema(description = "用户数据")
public class DatabusAdminUserData implements Serializable {
private static final long serialVersionUID = 1L;
// ========== 基本信息 ==========
@Schema(description = "用户ID", requiredMode = Schema.RequiredMode.REQUIRED, example = "1001")
private Long id;
@Schema(description = "用户账号", requiredMode = Schema.RequiredMode.REQUIRED, example = "zhangsan")
private String username;
@Schema(description = "用户昵称", requiredMode = Schema.RequiredMode.REQUIRED, example = "张三")
private String nickname;
@Schema(description = "用户手机号", example = "13800138000")
private String mobile;
@Schema(description = "用户邮箱", example = "zhangsan@example.com")
private String email;
@Schema(description = "用户性别0-未知 1-男 2-女", example = "1")
private Integer sex;
@Schema(description = "用户头像URL", example = "https://example.com/avatar.jpg")
private String avatar;
@Schema(description = "状态0-启用 1-禁用", requiredMode = Schema.RequiredMode.REQUIRED, example = "0")
private Integer status;
@Schema(description = "备注", example = "测试用户")
private String remark;
@Schema(description = "用户来源类型", example = "1")
private Integer userSource;
// ========== 组织信息 ==========
@Schema(description = "所属部门ID列表", example = "[100, 101]")
private List<Long> deptIds;
@Schema(description = "所属部门信息列表")
private List<DeptSimpleInfo> depts;
// ========== 岗位信息 ==========
@Schema(description = "岗位ID列表", example = "[1, 2]")
private List<Long> postIds;
@Schema(description = "岗位信息列表")
private List<PostSimpleInfo> posts;
// ========== 多租户 ==========
@Schema(description = "租户ID", requiredMode = Schema.RequiredMode.REQUIRED, example = "1")
private Long tenantId;
// ========== 时间信息 ==========
@Schema(description = "创建时间")
private LocalDateTime createTime;
@Schema(description = "更新时间")
private LocalDateTime updateTime;
/**
* 部门简要信息
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
@Schema(description = "部门简要信息")
public static class DeptSimpleInfo implements Serializable {
private static final long serialVersionUID = 1L;
@Schema(description = "部门ID", example = "100")
private Long deptId;
@Schema(description = "部门编码", example = "DEPT_001")
private String deptCode;
@Schema(description = "部门名称", example = "技术部")
private String deptName;
}
/**
* 岗位简要信息
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
@Schema(description = "岗位简要信息")
public static class PostSimpleInfo implements Serializable {
private static final long serialVersionUID = 1L;
@Schema(description = "岗位ID", example = "1")
private Long postId;
@Schema(description = "岗位编码", example = "CEO")
private String postCode;
@Schema(description = "岗位名称", example = "首席执行官")
private String postName;
}
}

View File

@@ -0,0 +1,90 @@
package com.zt.plat.module.databus.api.data;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
import java.time.LocalDateTime;
/**
* 部门数据对象DataBus
*
* @author ZT
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
@Schema(description = "部门数据")
public class DatabusDeptData implements Serializable {
private static final long serialVersionUID = 1L;
// ========== 基本信息 ==========
@Schema(description = "部门ID", requiredMode = Schema.RequiredMode.REQUIRED, example = "100")
private Long id;
@Schema(description = "部门编码", example = "DEPT_001")
private String code;
@Schema(description = "部门名称", requiredMode = Schema.RequiredMode.REQUIRED, example = "技术部")
private String name;
@Schema(description = "部门简称", example = "技术")
private String shortName;
@Schema(description = "父部门ID顶级为0", requiredMode = Schema.RequiredMode.REQUIRED, example = "0")
private Long parentId;
@Schema(description = "排序号", example = "1")
private Integer sort;
@Schema(description = "状态0-启用 1-禁用", requiredMode = Schema.RequiredMode.REQUIRED, example = "0")
private Integer status;
// ========== 部门类型 ==========
@Schema(description = "部门类型28-公司 26-部门", example = "26")
private Integer deptType;
@Schema(description = "是否集团", example = "false")
private Boolean isGroup;
@Schema(description = "是否公司", example = "false")
private Boolean isCompany;
@Schema(description = "部门来源类型", example = "1")
private Integer deptSource;
// ========== 联系信息 ==========
@Schema(description = "负责人ID", example = "1001")
private Long leaderUserId;
@Schema(description = "负责人姓名", example = "张三")
private String leaderUserName;
@Schema(description = "联系电话", example = "010-12345678")
private String phone;
@Schema(description = "邮箱", example = "tech@example.com")
private String email;
// ========== 多租户 ==========
@Schema(description = "租户ID", requiredMode = Schema.RequiredMode.REQUIRED, example = "1")
private Long tenantId;
// ========== 时间信息 ==========
@Schema(description = "创建时间")
private LocalDateTime createTime;
@Schema(description = "更新时间")
private LocalDateTime updateTime;
}

View File

@@ -0,0 +1,67 @@
package com.zt.plat.module.databus.api.data;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
import java.time.LocalDateTime;
/**
* 岗位数据对象DataBus
*
* @author ZT
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class DatabusPostData implements Serializable {
/**
* 岗位ID
*/
private Long id;
/**
* 岗位编码
*/
private String code;
/**
* 岗位名称
*/
private String name;
/**
* 排序
*/
private Integer sort;
/**
* 状态0正常 1停用
*/
private Integer status;
/**
* 备注
*/
private String remark;
/**
* 租户ID
*/
private Long tenantId;
/**
* 创建时间
*/
private LocalDateTime createTime;
/**
* 更新时间
*/
private LocalDateTime updateTime;
}

View File

@@ -0,0 +1,48 @@
package com.zt.plat.module.databus.api.dto;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
import java.time.LocalDateTime;
/**
* 游标分页请求 DTO
* <p>
* 用于 Databus 全量同步时的断点续传
* 使用 create_time + id 复合游标解决雪花ID非连续问题
*
* @author ZT
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
@Schema(description = "游标分页请求")
public class CursorPageReqDTO implements Serializable {
private static final long serialVersionUID = 1L;
@Schema(description = "游标时间(上一批最后一条数据的创建时间)", example = "2024-01-01T00:00:00")
private LocalDateTime cursorTime;
@Schema(description = "游标ID上一批最后一条数据的ID用于同一时间戳内去重", example = "1234567890")
private Long cursorId;
@Schema(description = "批量大小", example = "100")
private Integer batchSize;
@Schema(description = "租户ID可选用于多租户过滤", example = "1")
private Long tenantId;
/**
* 是否为首次查询(无游标)
*/
public boolean isFirstPage() {
return cursorTime == null && cursorId == null;
}
}

View File

@@ -0,0 +1,81 @@
package com.zt.plat.module.databus.api.dto;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
import java.time.LocalDateTime;
import java.util.List;
/**
* 游标分页结果 DTO
* <p>
* 用于 Databus 全量同步时的断点续传响应
*
* @author ZT
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
@Schema(description = "游标分页结果")
public class CursorPageResult<T> implements Serializable {
private static final long serialVersionUID = 1L;
@Schema(description = "数据列表")
private List<T> list;
@Schema(description = "下一页游标时间(最后一条数据的创建时间)")
private LocalDateTime nextCursorTime;
@Schema(description = "下一页游标ID最后一条数据的ID")
private Long nextCursorId;
@Schema(description = "本次返回的数据量")
private Integer count;
@Schema(description = "是否还有更多数据")
private Boolean hasMore;
@Schema(description = "总数据量(可选,首次查询时返回)")
private Long total;
/**
* 构建首页结果
*/
public static <T> CursorPageResult<T> of(List<T> list, LocalDateTime nextCursorTime,
Long nextCursorId, boolean hasMore, Long total) {
return CursorPageResult.<T>builder()
.list(list)
.nextCursorTime(nextCursorTime)
.nextCursorId(nextCursorId)
.count(list != null ? list.size() : 0)
.hasMore(hasMore)
.total(total)
.build();
}
/**
* 构建后续页结果
*/
public static <T> CursorPageResult<T> of(List<T> list, LocalDateTime nextCursorTime,
Long nextCursorId, boolean hasMore) {
return of(list, nextCursorTime, nextCursorId, hasMore, null);
}
/**
* 构建空结果
*/
public static <T> CursorPageResult<T> empty() {
return CursorPageResult.<T>builder()
.list(List.of())
.count(0)
.hasMore(false)
.build();
}
}

View File

@@ -0,0 +1,106 @@
package com.zt.plat.module.databus.api.message;
import com.zt.plat.module.databus.enums.DatabusEventType;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
import java.time.LocalDateTime;
import java.util.List;
/**
* Databus 全量同步批量消息
* <p>
* 用于全量同步场景,支持分批传输
*
* @author ZT
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class DatabusBatchMessage<T> implements Serializable {
/**
* 消息ID用于幂等
*/
private String messageId;
/**
* 全量同步任务ID
*/
private String taskId;
/**
* 事件类型
*/
private DatabusEventType eventType;
/**
* 当前批次号从1开始
*/
private Integer batchNo;
/**
* 总批次数
*/
private Integer totalBatch;
/**
* 当前批次数据条数
*/
private Integer count;
/**
* 总数据条数
*/
private Integer totalCount;
/**
* 是否最后一批
*/
private Boolean isLastBatch;
/**
* 数据列<E68DAE><E58897>
*/
private List<T> dataList;
/**
* 消息产生时间
*/
private LocalDateTime timestamp;
/**
* 来源系统
*/
private String source;
/**
* 租户ID
*/
private Long tenantId;
/**
* 创建批量消息
*/
public static <T> DatabusBatchMessage<T> of(DatabusEventType eventType, String taskId,
int batchNo, int totalBatch,
List<T> dataList, int totalCount) {
DatabusBatchMessage<T> msg = new DatabusBatchMessage<>();
msg.setMessageId(java.util.UUID.randomUUID().toString());
msg.setTaskId(taskId);
msg.setEventType(eventType);
msg.setBatchNo(batchNo);
msg.setTotalBatch(totalBatch);
msg.setCount(dataList != null ? dataList.size() : 0);
msg.setTotalCount(totalCount);
msg.setIsLastBatch(batchNo >= totalBatch);
msg.setDataList(dataList);
msg.setTimestamp(LocalDateTime.now());
return msg;
}
}

View File

@@ -0,0 +1,73 @@
package com.zt.plat.module.databus.api.message;
import com.zt.plat.module.databus.enums.DatabusEventType;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
import java.time.LocalDateTime;
/**
* Databus 增量同步消息
* <p>
* 业务推送、服务端消费、服务端转发、客户端消费统一使用此消息体
*
* @author ZT
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class DatabusMessage<T> implements Serializable {
/**
* 消息ID用于幂等
*/
private String messageId;
/**
* 事件类型
*/
private DatabusEventType eventType;
/**
* 业务数据ID
*/
private Long dataId;
/**
* 业务数据(强类型)
*/
private T data;
/**
* 消息产生时间
*/
private LocalDateTime timestamp;
/**
* 来源系统
*/
private String source;
/**
* 租户ID
*/
private Long tenantId;
/**
* 创建简单消息
*/
public static <T> DatabusMessage<T> of(DatabusEventType eventType, Long dataId, T data) {
DatabusMessage<T> msg = new DatabusMessage<>();
msg.setMessageId(java.util.UUID.randomUUID().toString());
msg.setEventType(eventType);
msg.setDataId(dataId);
msg.setData(data);
msg.setTimestamp(LocalDateTime.now());
return msg;
}
}

View File

@@ -0,0 +1,71 @@
package com.zt.plat.module.databus.api.provider;
import com.zt.plat.framework.common.pojo.CommonResult;
import com.zt.plat.module.databus.api.data.DatabusDeptData;
import com.zt.plat.module.databus.api.dto.CursorPageReqDTO;
import com.zt.plat.module.databus.api.dto.CursorPageResult;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.tags.Tag;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.*;
import java.util.List;
/**
* Databus 部门数据提供者 API
* <p>
* 供 Databus 调用,获取部门数据用于全量/增量同步
*
* @author ZT
*/
@FeignClient(name = "${databus.provider.dept.service:system-server}")
@Tag(name = "RPC 服务 - Databus 部门数据提供者")
public interface DatabusDeptProviderApi {
String PREFIX = "/rpc/databus/dept";
/**
* 游标分页查询部门数据(用于全量同步)
*
* @param reqDTO 游标分页请求
* @return 部门数据分页结果
*/
@PostMapping(PREFIX + "/page-by-cursor")
@Operation(summary = "游标分页查询部门数据")
CommonResult<CursorPageResult<DatabusDeptData>> getPageByCursor(@RequestBody CursorPageReqDTO reqDTO);
/**
* 根据ID查询部门详情用于增量同步
*
* @param id 部门ID
* @return 部门数据
*/
@GetMapping(PREFIX + "/get")
@Operation(summary = "查询部门详情")
@Parameter(name = "id", description = "部门ID", required = true, example = "100")
CommonResult<DatabusDeptData> getById(@RequestParam("id") Long id);
/**
* 批量查询部门详情(用于增量同步批量获取)
*
* @param ids 部门ID列表
* @return 部门数据列表
*/
@GetMapping(PREFIX + "/list")
@Operation(summary = "批量查询部门详情")
@Parameter(name = "ids", description = "部门ID列表", required = true, example = "100,101,102")
CommonResult<List<DatabusDeptData>> getListByIds(@RequestParam("ids") List<Long> ids);
/**
* 统计部门总数(用于全量同步进度计算)
*
* @param tenantId 租户ID可选
* @return 部门总数
*/
@GetMapping(PREFIX + "/count")
@Operation(summary = "统计部门总数")
@Parameter(name = "tenantId", description = "租户ID", example = "1")
CommonResult<Long> count(@RequestParam(value = "tenantId", required = false) Long tenantId);
}

View File

@@ -0,0 +1,71 @@
package com.zt.plat.module.databus.api.provider;
import com.zt.plat.framework.common.pojo.CommonResult;
import com.zt.plat.module.databus.api.data.DatabusPostData;
import com.zt.plat.module.databus.api.dto.CursorPageReqDTO;
import com.zt.plat.module.databus.api.dto.CursorPageResult;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.tags.Tag;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.*;
import java.util.List;
/**
* Databus 岗位数据提供者 API
* <p>
* 供 Databus 调用,获取岗位数据用于全量/增量同步
*
* @author ZT
*/
@FeignClient(name = "${databus.provider.post.service:system-server}")
@Tag(name = "RPC 服务 - Databus 岗位数据提供者")
public interface DatabusPostProviderApi {
String PREFIX = "/rpc/databus/post";
/**
* 游标分页查询岗位数据(用于全量同步)
*
* @param reqDTO 游标分页请求
* @return 岗位数据分页结果
*/
@PostMapping(PREFIX + "/page-by-cursor")
@Operation(summary = "游标分页查询岗位数据")
CommonResult<CursorPageResult<DatabusPostData>> getPageByCursor(@RequestBody CursorPageReqDTO reqDTO);
/**
* 根据ID查询岗位详情用于增量同步
*
* @param id 岗位ID
* @return 岗位数据
*/
@GetMapping(PREFIX + "/get")
@Operation(summary = "查询岗位详情")
@Parameter(name = "id", description = "岗位ID", required = true, example = "1")
CommonResult<DatabusPostData> getById(@RequestParam("id") Long id);
/**
* 批量查询岗位详情(用于增量同步批量获取)
*
* @param ids 岗位ID列表
* @return 岗位数据列表
*/
@GetMapping(PREFIX + "/list")
@Operation(summary = "批量查询岗位详情")
@Parameter(name = "ids", description = "岗位ID列表", required = true, example = "1,2,3")
CommonResult<List<DatabusPostData>> getListByIds(@RequestParam("ids") List<Long> ids);
/**
* 统计岗位总数(用于全量同步进度计算)
*
* @param tenantId 租户ID可选
* @return 岗位总数
*/
@GetMapping(PREFIX + "/count")
@Operation(summary = "统计岗位总数")
@Parameter(name = "tenantId", description = "租户ID", example = "1")
CommonResult<Long> count(@RequestParam(value = "tenantId", required = false) Long tenantId);
}

View File

@@ -0,0 +1,71 @@
package com.zt.plat.module.databus.api.provider;
import com.zt.plat.framework.common.pojo.CommonResult;
import com.zt.plat.module.databus.api.data.DatabusAdminUserData;
import com.zt.plat.module.databus.api.dto.CursorPageReqDTO;
import com.zt.plat.module.databus.api.dto.CursorPageResult;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.tags.Tag;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.*;
import java.util.List;
/**
* Databus 用户数据提供者 API
* <p>
* 供 Databus 调用,获取用户数据用于全量/增量同步
*
* @author ZT
*/
@FeignClient(name = "${databus.provider.user.service:system-server}")
@Tag(name = "RPC 服务 - Databus 用户数据提供者")
public interface DatabusUserProviderApi {
String PREFIX = "/rpc/databus/user";
/**
* 游标分页查询用户数据(用于全量同步)
*
* @param reqDTO 游标分页请求
* @return 用户数据分页结果
*/
@PostMapping(PREFIX + "/page-by-cursor")
@Operation(summary = "游标分页查询用户数据")
CommonResult<CursorPageResult<DatabusAdminUserData>> getPageByCursor(@RequestBody CursorPageReqDTO reqDTO);
/**
* 根据ID查询用户详情用于增量同步
*
* @param id 用户ID
* @return 用户数据
*/
@GetMapping(PREFIX + "/get")
@Operation(summary = "查询用户详情")
@Parameter(name = "id", description = "用户ID", required = true, example = "1001")
CommonResult<DatabusAdminUserData> getById(@RequestParam("id") Long id);
/**
* 批量查询用户详情(用于增量同步批量获取)
*
* @param ids 用户ID列表
* @return 用户数据列表
*/
@GetMapping(PREFIX + "/list")
@Operation(summary = "批量查询用户详情")
@Parameter(name = "ids", description = "用户ID列表", required = true, example = "1001,1002,1003")
CommonResult<List<DatabusAdminUserData>> getListByIds(@RequestParam("ids") List<Long> ids);
/**
* 统计用户总数(用于全量同步进度计算)
*
* @param tenantId 租户ID可选
* @return 用户总数
*/
@GetMapping(PREFIX + "/count")
@Operation(summary = "统计用户总数")
@Parameter(name = "tenantId", description = "租户ID", example = "1")
CommonResult<Long> count(@RequestParam(value = "tenantId", required = false) Long tenantId);
}

View File

@@ -0,0 +1,334 @@
package com.zt.plat.module.databus.enums;
import lombok.AllArgsConstructor;
import lombok.Getter;
/**
* Databus 事件类型枚举
* <p>
* 三级结构: 模块_数据类型_操作
* <p>
* Topic 命名规则:
* - 业务推送到服务端: {topicBase}-{module}-{entity}-{action}
* - 服务端转发到客户端: {topicBase}-{module}-{entity}-{action}-{clientCode}
*
* @author ZT
*/
@Getter
@AllArgsConstructor
public enum DatabusEventType {
// ==================== SYSTEM 系统模块 ====================
/**
* 用户-创建
*/
SYSTEM_USER_CREATE("system", "user", "create", "用户创建"),
/**
* 用户-更新
*/
SYSTEM_USER_UPDATE("system", "user", "update", "用户更新"),
/**
* 用户-删除
*/
SYSTEM_USER_DELETE("system", "user", "delete", "用户删除"),
/**
* 用户-全量同步
*/
SYSTEM_USER_FULL("system", "user", "full", "用户全量同步"),
/**
* 部门-创建
*/
SYSTEM_DEPT_CREATE("system", "dept", "create", "部门创建"),
/**
* 部门-更新
*/
SYSTEM_DEPT_UPDATE("system", "dept", "update", "部门更新"),
/**
* 部门-删除
*/
SYSTEM_DEPT_DELETE("system", "dept", "delete", "部门删除"),
/**
* 部门-全量同步
*/
SYSTEM_DEPT_FULL("system", "dept", "full", "部门全量同步"),
/**
* 组织机构-创建(兼容老代码,保留但不推荐使用)
*/
@Deprecated
SYSTEM_ORG_CREATE("system", "org", "create", "组织机构创建"),
/**
* 组织机构-更新(兼容老代码,保留但不推荐使用)
*/
@Deprecated
SYSTEM_ORG_UPDATE("system", "org", "update", "组织机构更新"),
/**
* 组织机构-删除(兼容老代码,保留但不推荐使用)
*/
@Deprecated
SYSTEM_ORG_DELETE("system", "org", "delete", "组织机构删除"),
/**
* 组织机构-全量同步(兼容老代码,保留但不推荐使用)
*/
@Deprecated
SYSTEM_ORG_FULL("system", "org", "full", "组织机构全量同步"),
/**
* 岗位-创建
*/
SYSTEM_POST_CREATE("system", "post", "create", "岗位创建"),
/**
* 岗位-更新
*/
SYSTEM_POST_UPDATE("system", "post", "update", "岗位更新"),
/**
* 岗位-删除
*/
SYSTEM_POST_DELETE("system", "post", "delete", "岗位删除"),
/**
* 岗位-全量同步
*/
SYSTEM_POST_FULL("system", "post", "full", "岗位全量同步"),
/**
* 角色-创建
*/
SYSTEM_ROLE_CREATE("system", "role", "create", "角色创建"),
/**
* 角色-更新
*/
SYSTEM_ROLE_UPDATE("system", "role", "update", "角色更新"),
/**
* 角色-删除
*/
SYSTEM_ROLE_DELETE("system", "role", "delete", "角色删除"),
/**
* 角色-全量同步
*/
SYSTEM_ROLE_FULL("system", "role", "full", "角色全量同步"),
/**
* 字典-创建
*/
SYSTEM_DICT_CREATE("system", "dict", "create", "字典创建"),
/**
* 字典-更新
*/
SYSTEM_DICT_UPDATE("system", "dict", "update", "字典更新"),
/**
* 字典-删除
*/
SYSTEM_DICT_DELETE("system", "dict", "delete", "字典删除"),
/**
* 字典-全量同步
*/
SYSTEM_DICT_FULL("system", "dict", "full", "字典全量同步"),
// ==================== BASE 基础模块 ====================
/**
* 物料-创建
*/
BASE_MATERIAL_CREATE("base", "material", "create", "物料创建"),
/**
* 物料-更新
*/
BASE_MATERIAL_UPDATE("base", "material", "update", "物料更新"),
/**
* 物料-删除
*/
BASE_MATERIAL_DELETE("base", "material", "delete", "物料删除"),
/**
* 物料-全量同步
*/
BASE_MATERIAL_FULL("base", "material", "full", "物料全量同步"),
/**
* 供应<E4BE9B><E5BA94>-创建
*/
BASE_SUPPLIER_CREATE("base", "supplier", "create", "供应商创建"),
/**
* 供应商-更新
*/
BASE_SUPPLIER_UPDATE("base", "supplier", "update", "供应商更新"),
/**
* 供应商-删除
*/
BASE_SUPPLIER_DELETE("base", "supplier", "delete", "供应商删除"),
/**
* 供应商-全量同步
*/
BASE_SUPPLIER_FULL("base", "supplier", "full", "供应商全量同步"),
/**
* 客户-创建
*/
BASE_CUSTOMER_CREATE("base", "customer", "create", "客户创建"),
/**
* 客户-更新
*/
BASE_CUSTOMER_UPDATE("base", "customer", "update", "客户更新"),
/**
* 客户-删除
*/
BASE_CUSTOMER_DELETE("base", "customer", "delete", "客户删除"),
/**
* 客户-全量同步
*/
BASE_CUSTOMER_FULL("base", "customer", "full", "客户全量同步"),
;
/**
* 模块编码
*/
private final String module;
/**
* 实体编码
*/
private final String entity;
/**
* 操作编码
*/
private final String action;
/**
* 事件名称
*/
private final String name;
/**
* 获取Topic后缀不含topicBase和clientCode
* 格式: {module}-{entity}-{action}
*/
public String getTopicSuffix() {
return String.format("%s-%s-%s", module, entity, action);
}
/**
* 获取完整Topic名称服务端转发用
* 格式: {topicBase}-{clientCode}(简化版,所有事件共用一个 Topic
* 示例: databus-sync-branch-001
*
* 注意:不再为每个事件创建独立 Topic而是通过消息体中的 eventType 字段路由
*/
public String getTopic(String topicBase, String clientCode) {
return String.format("%s-%s", topicBase, clientCode);
}
/**
* 获取完整Topic名称业务推送用不带clientCode
* 格式: {topicBase}-{module}-{entity}-{action}
*/
public String getTopic(String topicBase) {
return String.format("%s-%s-%s-%s", topicBase, module, entity, action);
}
/**
* 根据Topic后缀获取枚举
*
* @param topicSuffix Topic后缀格式: module-entity-action
* @return 枚举值未找到返回null
*/
public static DatabusEventType getByTopicSuffix(String topicSuffix) {
if (topicSuffix == null) {
return null;
}
for (DatabusEventType type : values()) {
if (type.getTopicSuffix().equalsIgnoreCase(topicSuffix)) {
return type;
}
}
return null;
}
/**
* 根据事件类型字符串获取枚举(支持大写下划线格式)
* 例如SYSTEM_POST_FULL → DatabusEventType.SYSTEM_POST_FULL
*
* @param eventType 事件类型字符串(格式: MODULE_ENTITY_ACTION
* @return 枚举值未找到返回null
*/
public static DatabusEventType getByEventType(String eventType) {
if (eventType == null) {
return null;
}
// 先尝试直接匹配枚举名称
try {
return DatabusEventType.valueOf(eventType.toUpperCase());
} catch (IllegalArgumentException e) {
// 如果失败,尝试转换格式后匹配(如 system-post-full → SYSTEM_POST_FULL
String normalized = eventType.toUpperCase().replace("-", "_");
try {
return DatabusEventType.valueOf(normalized);
} catch (IllegalArgumentException ex) {
return null;
}
}
}
/**
* 根据模块、实体、操作获取枚举
*
* @param module 模块编码
* @param entity 实体编码
* @param action 操作编码
* @return 枚举值未找到返回null
*/
public static DatabusEventType getByModuleEntityAction(String module, String entity, String action) {
for (DatabusEventType type : values()) {
if (type.getModule().equalsIgnoreCase(module)
&& type.getEntity().equalsIgnoreCase(entity)
&& type.getAction().equalsIgnoreCase(action)) {
return type;
}
}
return null;
}
/**
* 判断是否为全量同步事件
*/
public boolean isFullSync() {
return "full".equalsIgnoreCase(this.action);
}
/**
* 判断是否为增量同步事件
*/
public boolean isIncrementalSync() {
return !isFullSync();
}
}

View File

@@ -42,6 +42,13 @@
<version>${revision}</version>
</dependency>
<!-- DataBus Server Starter -->
<dependency>
<groupId>com.zt.plat</groupId>
<artifactId>zt-spring-boot-starter-databus-server</artifactId>
<version>${revision}</version>
</dependency>
<!-- 业务组件 -->
<dependency>
<groupId>com.zt.plat</groupId>

View File

@@ -1,6 +1,9 @@
package com.zt.plat.module.databus.framework.rpc.config;
import com.zt.plat.framework.common.biz.system.oauth2.OAuth2TokenCommonApi;
import com.zt.plat.module.databus.api.provider.DatabusDeptProviderApi;
import com.zt.plat.module.databus.api.provider.DatabusPostProviderApi;
import com.zt.plat.module.databus.api.provider.DatabusUserProviderApi;
import com.zt.plat.module.system.api.user.AdminUserApi;
import org.springframework.cloud.openfeign.EnableFeignClients;
import org.springframework.context.annotation.Configuration;
@@ -9,6 +12,13 @@ import org.springframework.context.annotation.Configuration;
* Databus 模块的 RPC 配置,开启所需的 Feign 客户端。
*/
@Configuration(value = "databusRpcConfiguration", proxyBeanMethods = false)
@EnableFeignClients(clients = {AdminUserApi.class, OAuth2TokenCommonApi.class})
@EnableFeignClients(clients = {
AdminUserApi.class,
OAuth2TokenCommonApi.class,
// DataBus 数据提供者 Feign 客户端
DatabusDeptProviderApi.class,
DatabusUserProviderApi.class,
DatabusPostProviderApi.class
})
public class RpcConfiguration {
}

View File

@@ -76,9 +76,16 @@ management:
# 日志文件配置
logging:
file:
name: ${user.home}/logs/${spring.application.name}.log # 日志文件名,路径
name: ${LOG_PATH:./logs}/${spring.application.name}.log # 日志文件名,使用环境变量或相对路径
# RocketMQ 配置项
rocketmq:
name-server: ${ROCKETMQ_NAME_SERVER:172.16.46.63:30876} # RocketMQ Namesrv使用环境变量
producer:
group: databus-server-producer-group # 生产者组名
send-message-timeout: 10000 # 发送消息超时时间,单位:毫秒
justauth:
enabled: true
type:
@@ -106,3 +113,23 @@ justauth:
prefix: 'social_auth_state:' # 缓存前缀,目前只对 Redis 缓存生效,默认 JUSTAUTH::STATE::
timeout: 24h # 超时时长,目前只对 Redis 缓存生效,默认 3 分钟
zt:
databus:
sync:
server:
enabled: true
clients:
- company-b # 配置订阅的客户端与客户端的client-code一致
mq:
enabled: true
name-server: ${ROCKETMQ_NAME_SERVER:172.16.46.63:30876} # RocketMQ NameServer 地址,使用环境变量
topic-base: databus-sync
producer-group: databus-server-producer
send-msg-timeout: 10000
retry:
max-attempts: 5 # 最大重试次数
initial-delay: 1 # 初始重试延迟(秒)
multiplier: 2 # 重试延迟倍数
batch:
default-size: 500 # 默认批量大小
interval: 5 # 批量推送间隔(秒)

View File

@@ -86,6 +86,13 @@ mybatis-plus:
log-impl: org.apache.ibatis.logging.stdout.StdOutImpl
# RocketMQ 配置项
rocketmq:
name-server: ${ROCKETMQ_NAME_SERVER:172.16.46.63:30876} # RocketMQ Namesrv使用环境变量
producer:
group: databus-server-producer-group # 生产者组名
send-message-timeout: 10000 # 发送消息超时时间,单位:毫秒
# ZT配置项设置当前项目所有自定义的配置
zt:
env: # 多环境的配置项

View File

@@ -50,7 +50,7 @@ spring:
time-to-live: 1h # 设置过期时间为 1 小时
server:
port: 48100
port: 48105
logging:
file:
@@ -130,6 +130,11 @@ zt:
- /databus/api/portal/**
ignore-tables:
- databus_api_client_credential
# DataBus 数据同步服务端配置
databus:
sync:
server:
enabled: true # 启用 DataBus 同步服务端
databus:
gateway: