Merge remote-tracking branch 'ztcloud/main' into main-ztcloud
This commit is contained in:
@@ -7,6 +7,7 @@ import com.zt.plat.module.databus.controller.admin.gateway.vo.definition.*;
|
||||
import com.zt.plat.module.databus.dal.dataobject.gateway.ApiDefinitionDO;
|
||||
import com.zt.plat.module.databus.dal.dataobject.gateway.ApiStepDO;
|
||||
import com.zt.plat.module.databus.dal.dataobject.gateway.ApiTransformDO;
|
||||
import com.zt.plat.module.databus.framework.integration.gateway.domain.ApiCredentialBinding;
|
||||
import com.zt.plat.module.databus.framework.integration.gateway.domain.ApiDefinitionAggregate;
|
||||
import com.zt.plat.module.databus.framework.integration.gateway.domain.ApiFlowPublication;
|
||||
import com.zt.plat.module.databus.framework.integration.gateway.domain.ApiStepDefinition;
|
||||
@@ -18,6 +19,7 @@ import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@Mapper
|
||||
@@ -48,6 +50,11 @@ public interface ApiDefinitionConvert {
|
||||
detail.setApiLevelTransforms(convertTransforms(aggregate.getDefinition().getId(), aggregate.getApiLevelTransforms().values()));
|
||||
detail.setSteps(convertSteps(aggregate.getSteps()));
|
||||
detail.setPublication(convert(aggregate.getPublication()));
|
||||
detail.setCredentialBindings(convertCredentialBindings(aggregate.getCredentialBindings()));
|
||||
detail.setCredentialIds(detail.getCredentialBindings().stream()
|
||||
.map(ApiCredentialBindingRespVO::getCredentialId)
|
||||
.filter(Objects::nonNull)
|
||||
.collect(Collectors.toList()));
|
||||
return detail;
|
||||
}
|
||||
|
||||
@@ -99,6 +106,15 @@ public interface ApiDefinitionConvert {
|
||||
return publication == null ? null : BeanUtils.toBean(publication, ApiDefinitionPublicationRespVO.class);
|
||||
}
|
||||
|
||||
default List<ApiCredentialBindingRespVO> convertCredentialBindings(List<ApiCredentialBinding> bindings) {
|
||||
if (CollUtil.isEmpty(bindings)) {
|
||||
return new ArrayList<>();
|
||||
}
|
||||
return bindings.stream()
|
||||
.map(binding -> BeanUtils.toBean(binding, ApiCredentialBindingRespVO.class))
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
/**
|
||||
* 转换步骤列表(DO -> SaveReqVO)
|
||||
*/
|
||||
|
||||
@@ -47,6 +47,12 @@ public class ApiAccessLogPageReqVO extends PageParam {
|
||||
@Schema(description = "请求路径", example = "/gateway/api/user/query")
|
||||
private String requestPath;
|
||||
|
||||
@Schema(description = "应用标识", example = "app-portal-01")
|
||||
private String credentialAppId;
|
||||
|
||||
@Schema(description = "凭证主键", example = "10086")
|
||||
private Long credentialId;
|
||||
|
||||
@Schema(description = "请求时间区间")
|
||||
@DateTimeFormat(pattern = FORMAT_YEAR_MONTH_DAY_HOUR_MINUTE_SECOND)
|
||||
private LocalDateTime[] requestTime;
|
||||
|
||||
@@ -33,6 +33,12 @@ public class ApiAccessLogRespVO {
|
||||
@Schema(description = "请求路径", example = "/gateway/api/user/query")
|
||||
private String requestPath;
|
||||
|
||||
@Schema(description = "应用标识", example = "app-portal-01")
|
||||
private String credentialAppId;
|
||||
|
||||
@Schema(description = "凭证主键", example = "10086")
|
||||
private Long credentialId;
|
||||
|
||||
@Schema(description = "查询参数(JSON)")
|
||||
private String requestQuery;
|
||||
|
||||
|
||||
@@ -0,0 +1,17 @@
|
||||
package com.zt.plat.module.databus.controller.admin.gateway.vo.definition;
|
||||
|
||||
import io.swagger.v3.oas.annotations.media.Schema;
|
||||
import lombok.Data;
|
||||
|
||||
@Data
|
||||
public class ApiCredentialBindingRespVO {
|
||||
|
||||
@Schema(description = "凭证主键", example = "10086")
|
||||
private Long credentialId;
|
||||
|
||||
@Schema(description = "应用标识", example = "app-portal-01")
|
||||
private String appId;
|
||||
|
||||
@Schema(description = "应用名称")
|
||||
private String appName;
|
||||
}
|
||||
@@ -53,6 +53,12 @@ public class ApiDefinitionDetailRespVO {
|
||||
@Schema(description = "API 级别变换列表")
|
||||
private List<ApiDefinitionTransformRespVO> apiLevelTransforms = new ArrayList<>();
|
||||
|
||||
@Schema(description = "授权凭证 ID 列表")
|
||||
private List<Long> credentialIds = new ArrayList<>();
|
||||
|
||||
@Schema(description = "授权凭证详情列表")
|
||||
private List<ApiCredentialBindingRespVO> credentialBindings = new ArrayList<>();
|
||||
|
||||
@Schema(description = "步骤列表")
|
||||
private List<ApiDefinitionStepRespVO> steps = new ArrayList<>();
|
||||
|
||||
|
||||
@@ -46,6 +46,9 @@ public class ApiDefinitionSaveReqVO {
|
||||
@Valid
|
||||
private List<ApiDefinitionTransformSaveReqVO> apiLevelTransforms = new ArrayList<>();
|
||||
|
||||
@Schema(description = "授权的客户端凭证 ID 列表")
|
||||
private List<Long> credentialIds = new ArrayList<>();
|
||||
|
||||
@Schema(description = "步骤列表")
|
||||
@NotEmpty(message = "编排步骤不能为空")
|
||||
@Valid
|
||||
|
||||
@@ -52,6 +52,16 @@ public class ApiAccessLogDO extends TenantBaseDO {
|
||||
*/
|
||||
private String requestPath;
|
||||
|
||||
/**
|
||||
* 调用使用的应用标识
|
||||
*/
|
||||
private String credentialAppId;
|
||||
|
||||
/**
|
||||
* 调用使用的凭证主键
|
||||
*/
|
||||
private Long credentialId;
|
||||
|
||||
/**
|
||||
* 查询参数(JSON 字符串)
|
||||
*/
|
||||
|
||||
@@ -0,0 +1,37 @@
|
||||
package com.zt.plat.module.databus.dal.dataobject.gateway;
|
||||
|
||||
import com.baomidou.mybatisplus.annotation.IdType;
|
||||
import com.baomidou.mybatisplus.annotation.KeySequence;
|
||||
import com.baomidou.mybatisplus.annotation.TableId;
|
||||
import com.baomidou.mybatisplus.annotation.TableName;
|
||||
import com.zt.plat.framework.mybatis.core.dataobject.BaseDO;
|
||||
import lombok.Data;
|
||||
import lombok.EqualsAndHashCode;
|
||||
|
||||
/**
|
||||
* API 与客户端凭证的授权关联。
|
||||
*/
|
||||
@Data
|
||||
@TableName("databus_api_definition_credential")
|
||||
@KeySequence("databus_api_definition_credential_seq")
|
||||
@EqualsAndHashCode(callSuper = true)
|
||||
public class ApiDefinitionCredentialDO extends BaseDO {
|
||||
|
||||
@TableId(type = IdType.ASSIGN_ID)
|
||||
private Long id;
|
||||
|
||||
/**
|
||||
* API 主键
|
||||
*/
|
||||
private Long apiId;
|
||||
|
||||
/**
|
||||
* 客户端凭证主键
|
||||
*/
|
||||
private Long credentialId;
|
||||
|
||||
/**
|
||||
* 绑定时的应用标识冗余,便于快速校验
|
||||
*/
|
||||
private String appId;
|
||||
}
|
||||
@@ -20,6 +20,8 @@ public interface ApiAccessLogMapper extends BaseMapperX<ApiAccessLogDO> {
|
||||
.eqIfPresent(ApiAccessLogDO::getResponseStatus, reqVO.getResponseStatus())
|
||||
.eqIfPresent(ApiAccessLogDO::getStatus, reqVO.getStatus())
|
||||
.likeIfPresent(ApiAccessLogDO::getClientIp, reqVO.getClientIp())
|
||||
.eqIfPresent(ApiAccessLogDO::getCredentialAppId, reqVO.getCredentialAppId())
|
||||
.eqIfPresent(ApiAccessLogDO::getCredentialId, reqVO.getCredentialId())
|
||||
.eqIfPresent(ApiAccessLogDO::getTenantId, reqVO.getTenantId())
|
||||
.likeIfPresent(ApiAccessLogDO::getRequestPath, reqVO.getRequestPath());
|
||||
if (ArrayUtil.isNotEmpty(reqVO.getRequestTime()) && reqVO.getRequestTime().length == 2) {
|
||||
|
||||
@@ -0,0 +1,32 @@
|
||||
package com.zt.plat.module.databus.dal.mysql.gateway;
|
||||
|
||||
import com.zt.plat.framework.mybatis.core.mapper.BaseMapperX;
|
||||
import com.zt.plat.framework.mybatis.core.query.LambdaQueryWrapperX;
|
||||
import com.zt.plat.module.databus.dal.dataobject.gateway.ApiDefinitionCredentialDO;
|
||||
import org.apache.ibatis.annotations.Mapper;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
@Mapper
|
||||
public interface ApiDefinitionCredentialMapper extends BaseMapperX<ApiDefinitionCredentialDO> {
|
||||
|
||||
default List<ApiDefinitionCredentialDO> selectByApiId(Long apiId) {
|
||||
return selectList(new LambdaQueryWrapperX<ApiDefinitionCredentialDO>()
|
||||
.eq(ApiDefinitionCredentialDO::getApiId, apiId));
|
||||
}
|
||||
|
||||
default void deleteByApiId(Long apiId) {
|
||||
delete(new LambdaQueryWrapperX<ApiDefinitionCredentialDO>()
|
||||
.eq(ApiDefinitionCredentialDO::getApiId, apiId));
|
||||
}
|
||||
|
||||
/**
|
||||
* 按 API 逻辑删除已有绑定,保留操作记录。
|
||||
*/
|
||||
default void logicDeleteByApiId(Long apiId) {
|
||||
ApiDefinitionCredentialDO entity = new ApiDefinitionCredentialDO();
|
||||
entity.setDeleted(Boolean.TRUE);
|
||||
update(entity, new LambdaQueryWrapperX<ApiDefinitionCredentialDO>()
|
||||
.eq(ApiDefinitionCredentialDO::getApiId, apiId));
|
||||
}
|
||||
}
|
||||
@@ -83,7 +83,7 @@ public class ApiGatewayAccessLogger {
|
||||
try {
|
||||
ApiAccessLogDO update = new ApiAccessLogDO();
|
||||
update.setId(logId);
|
||||
int responseStatus = resolveHttpStatus(context);
|
||||
Integer responseStatus = resolveHttpStatus(context);
|
||||
context.setResponseStatus(responseStatus);
|
||||
update.setResponseStatus(responseStatus);
|
||||
String responseMessage = resolveResponseMessage(context, responseStatus);
|
||||
@@ -193,6 +193,8 @@ public class ApiGatewayAccessLogger {
|
||||
logDO.setRequestBody(toJson(context.getRequestBody()));
|
||||
logDO.setClientIp(context.getClientIp());
|
||||
logDO.setUserAgent(GatewayHeaderUtils.findFirstHeaderValue(context.getRequestHeaders(), HttpHeaders.USER_AGENT));
|
||||
logDO.setCredentialAppId(context.getCredentialAppId());
|
||||
logDO.setCredentialId(context.getCredentialId());
|
||||
logDO.setStatus(3);
|
||||
logDO.setRequestTime(toLocalDateTime(context.getRequestTime()));
|
||||
logDO.setTenantId(parseTenantId(context.getTenantId()));
|
||||
@@ -231,7 +233,7 @@ public class ApiGatewayAccessLogger {
|
||||
return 3;
|
||||
}
|
||||
|
||||
private String resolveErrorMessage(ApiInvocationContext context, int responseStatus) {
|
||||
private String resolveErrorMessage(ApiInvocationContext context, Integer responseStatus) {
|
||||
if (!isErrorStatus(responseStatus)) {
|
||||
return null;
|
||||
}
|
||||
@@ -248,7 +250,7 @@ public class ApiGatewayAccessLogger {
|
||||
return null;
|
||||
}
|
||||
|
||||
private String extractErrorCode(Object responseBody, int responseStatus) {
|
||||
private String extractErrorCode(Object responseBody, Integer responseStatus) {
|
||||
if (!isErrorStatus(responseStatus)) {
|
||||
return null;
|
||||
}
|
||||
@@ -259,16 +261,14 @@ public class ApiGatewayAccessLogger {
|
||||
return null;
|
||||
}
|
||||
|
||||
private int resolveHttpStatus(ApiInvocationContext context) {
|
||||
Integer status = context.getResponseStatus();
|
||||
if (status != null) {
|
||||
return status;
|
||||
}
|
||||
// 默认兜底为 200,避免日志中出现空的 HTTP 状态码
|
||||
return HttpStatus.OK.value();
|
||||
private Integer resolveHttpStatus(ApiInvocationContext context) {
|
||||
return context.getResponseStatus();
|
||||
}
|
||||
|
||||
private String resolveResponseMessage(ApiInvocationContext context, int responseStatus) {
|
||||
private String resolveResponseMessage(ApiInvocationContext context, Integer responseStatus) {
|
||||
if (responseStatus == null) {
|
||||
return null;
|
||||
}
|
||||
if (StringUtils.hasText(context.getResponseMessage())) {
|
||||
return truncate(context.getResponseMessage());
|
||||
}
|
||||
@@ -276,8 +276,8 @@ public class ApiGatewayAccessLogger {
|
||||
return resolved != null ? resolved.getReasonPhrase() : null;
|
||||
}
|
||||
|
||||
private boolean isErrorStatus(int responseStatus) {
|
||||
return responseStatus >= 400;
|
||||
private boolean isErrorStatus(Integer responseStatus) {
|
||||
return responseStatus != null && responseStatus >= 400;
|
||||
}
|
||||
|
||||
private Map<String, Object> buildExtra(ApiInvocationContext context) {
|
||||
|
||||
@@ -10,6 +10,7 @@ import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import static com.zt.plat.module.databus.service.gateway.impl.GatewayServiceErrorCodeConstants.API_AUTH_UNAUTHORIZED;
|
||||
import static com.zt.plat.module.databus.service.gateway.impl.GatewayServiceErrorCodeConstants.API_CREDENTIAL_UNAUTHORIZED;
|
||||
import static com.zt.plat.module.databus.service.gateway.impl.GatewayServiceErrorCodeConstants.API_RATE_LIMIT_EXCEEDED;
|
||||
|
||||
/**
|
||||
@@ -80,6 +81,9 @@ public class ApiGatewayErrorProcessor {
|
||||
if (API_RATE_LIMIT_EXCEEDED.getCode().equals(code)) {
|
||||
return HttpStatus.TOO_MANY_REQUESTS.value();
|
||||
}
|
||||
if (API_CREDENTIAL_UNAUTHORIZED.getCode().equals(code)) {
|
||||
return HttpStatus.FORBIDDEN.value();
|
||||
}
|
||||
}
|
||||
return HttpStatus.INTERNAL_SERVER_ERROR.value();
|
||||
}
|
||||
|
||||
@@ -7,6 +7,7 @@ import com.zt.plat.framework.common.exception.util.ServiceExceptionUtil;
|
||||
import com.zt.plat.framework.common.util.monitor.TracerUtils;
|
||||
import com.zt.plat.module.databus.controller.admin.gateway.vo.ApiGatewayInvokeReqVO;
|
||||
import com.zt.plat.module.databus.framework.integration.config.ApiGatewayProperties;
|
||||
import com.zt.plat.module.databus.framework.integration.gateway.domain.ApiCredentialBinding;
|
||||
import com.zt.plat.module.databus.framework.integration.gateway.domain.ApiDefinitionAggregate;
|
||||
import com.zt.plat.module.databus.framework.integration.gateway.model.ApiGatewayResponse;
|
||||
import com.zt.plat.module.databus.framework.integration.gateway.model.ApiInvocationContext;
|
||||
@@ -19,6 +20,7 @@ import org.springframework.http.*;
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.messaging.support.MessageBuilder;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
import org.springframework.util.StringUtils;
|
||||
import org.springframework.web.servlet.HandlerMapping;
|
||||
import org.springframework.web.util.UriComponentsBuilder;
|
||||
@@ -29,12 +31,12 @@ import java.util.LinkedHashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
|
||||
import static com.zt.plat.module.databus.service.gateway.impl.GatewayServiceErrorCodeConstants.API_CREDENTIAL_UNAUTHORIZED;
|
||||
import static com.zt.plat.module.databus.service.gateway.impl.GatewayServiceErrorCodeConstants.API_DEFINITION_NOT_FOUND;
|
||||
|
||||
/**
|
||||
* Orchestrates API portal request mapping, dispatch and response building so that
|
||||
* management-side debug invocations and external HTTP requests share identical
|
||||
* behaviour (other than security concerns handled by {@link GatewaySecurityFilter}).
|
||||
* 统一处理 API 门户的请求映射、分发与响应构建。
|
||||
* 管理端调试与外部 HTTP 请求共享同一套逻辑,安全校验由 {@link GatewaySecurityFilter} 执行。
|
||||
*/
|
||||
@Slf4j
|
||||
@Component
|
||||
@@ -58,7 +60,7 @@ public class ApiGatewayExecutionService {
|
||||
private final ApiDefinitionService apiDefinitionService;
|
||||
|
||||
/**
|
||||
* Maps a raw HTTP message (as provided by Spring Integration) into a context message.
|
||||
* 将 Spring Integration 提供的原始消息映射为网关上下文消息。
|
||||
*/
|
||||
public Message<ApiInvocationContext> mapRequest(Message<?> message) {
|
||||
ApiInvocationContext context = requestMapper.map(message.getPayload(), message.getHeaders());
|
||||
@@ -70,7 +72,7 @@ public class ApiGatewayExecutionService {
|
||||
}
|
||||
|
||||
/**
|
||||
* Dispatches the API invocation and applies gateway error processing rules on failure scenarios.
|
||||
* 分发 API 调用,并在异常场景套用统一错误处理。
|
||||
*/
|
||||
public ApiInvocationContext dispatch(Message<ApiInvocationContext> message) {
|
||||
ApiInvocationContext context = message.getPayload();
|
||||
@@ -78,6 +80,7 @@ public class ApiGatewayExecutionService {
|
||||
ApiInvocationContext responseContext;
|
||||
ApiDefinitionAggregate debugAggregate = null;
|
||||
try {
|
||||
enforceCredentialAuthorization(context);
|
||||
if (Boolean.TRUE.equals(context.getAttributes().get(ATTR_DEBUG_INVOKE))) {
|
||||
debugAggregate = resolveDebugAggregate(context);
|
||||
}
|
||||
@@ -128,7 +131,7 @@ public class ApiGatewayExecutionService {
|
||||
Message<?> rawMessage = buildDebugMessage(reqVO);
|
||||
Message<ApiInvocationContext> mappedMessage = mapRequest(rawMessage);
|
||||
ApiInvocationContext context = mappedMessage.getPayload();
|
||||
// Ensure query parameters & headers from debug payload are reflected after mapping.
|
||||
// 将调试透传的查询参数、请求头重新合并到上下文,避免映射阶段丢失
|
||||
mergeDebugMetadata(context, reqVO);
|
||||
context.getAttributes().put(ATTR_DEBUG_INVOKE, Boolean.TRUE);
|
||||
ApiInvocationContext responseContext = dispatch(mappedMessage);
|
||||
@@ -155,7 +158,7 @@ public class ApiGatewayExecutionService {
|
||||
builder.setHeader(HandlerMapping.URI_TEMPLATE_VARIABLES_ATTRIBUTE, uriVariables);
|
||||
builder.setHeader(org.springframework.integration.http.HttpHeaders.REQUEST_METHOD, HttpMethod.POST.name());
|
||||
|
||||
String basePath = normalizeBasePath(properties.getBasePath());
|
||||
String basePath = properties.getBasePath();
|
||||
String rawQuery = buildQueryString(reqVO.getQueryParams());
|
||||
String requestUri = basePath + "/" + reqVO.getApiCode() + "/" + reqVO.getVersion();
|
||||
if (StringUtils.hasText(rawQuery)) {
|
||||
@@ -223,8 +226,7 @@ public class ApiGatewayExecutionService {
|
||||
context.setHttpMethod(HttpMethod.POST.name());
|
||||
}
|
||||
if (!StringUtils.hasText(context.getRequestPath())) {
|
||||
String basePath = normalizeBasePath(properties.getBasePath());
|
||||
String path = basePath + "/" + reqVO.getApiCode() + "/" + reqVO.getVersion();
|
||||
String path = properties.getBasePath() + "/" + reqVO.getApiCode() + "/" + reqVO.getVersion();
|
||||
context.setRequestPath(path);
|
||||
}
|
||||
}
|
||||
@@ -245,15 +247,29 @@ public class ApiGatewayExecutionService {
|
||||
.build();
|
||||
}
|
||||
|
||||
private String normalizeBasePath(String basePath) {
|
||||
if (!StringUtils.hasText(basePath)) {
|
||||
return ApiGatewayProperties.DEFAULT_BASE_PATH;
|
||||
/**
|
||||
* 调用前校验凭证白名单,非调试调用需匹配绑定的 appId。
|
||||
*/
|
||||
private void enforceCredentialAuthorization(ApiInvocationContext context) {
|
||||
if (Boolean.TRUE.equals(context.getAttributes().get(ATTR_DEBUG_INVOKE))) {
|
||||
return;
|
||||
}
|
||||
String normalized = basePath.startsWith("/") ? basePath : "/" + basePath;
|
||||
while (normalized.endsWith("/") && normalized.length() > 1) {
|
||||
normalized = normalized.substring(0, normalized.length() - 1);
|
||||
ApiDefinitionAggregate aggregate = apiDefinitionService.findByCodeAndVersion(context.getApiCode(), context.getApiVersion())
|
||||
.orElseThrow(() -> ServiceExceptionUtil.exception(API_DEFINITION_NOT_FOUND));
|
||||
if (CollectionUtils.isEmpty(aggregate.getCredentialBindings())) {
|
||||
return;
|
||||
}
|
||||
String appId = context.getCredentialAppId();
|
||||
if (!StringUtils.hasText(appId)) {
|
||||
throw ServiceExceptionUtil.exception(API_CREDENTIAL_UNAUTHORIZED);
|
||||
}
|
||||
boolean matched = aggregate.getCredentialBindings().stream()
|
||||
.map(ApiCredentialBinding::getAppId)
|
||||
.filter(StringUtils::hasText)
|
||||
.anyMatch(boundAppId -> appId.trim().equalsIgnoreCase(boundAppId));
|
||||
if (!matched) {
|
||||
throw ServiceExceptionUtil.exception(API_CREDENTIAL_UNAUTHORIZED);
|
||||
}
|
||||
return normalized;
|
||||
}
|
||||
|
||||
private String buildQueryString(Map<String, Object> queryParams) {
|
||||
@@ -322,4 +338,5 @@ public class ApiGatewayExecutionService {
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -35,6 +35,7 @@ public class ApiGatewayRequestMapper {
|
||||
private static final String HEADER_REQUEST_HEADERS = org.springframework.integration.http.HttpHeaders.PREFIX + "requestHeaders";
|
||||
private static final String HEADER_REQUEST_URI = org.springframework.integration.http.HttpHeaders.PREFIX + "requestUri";
|
||||
private static final String HEADER_REMOTE_ADDRESS = org.springframework.integration.http.HttpHeaders.PREFIX + "remoteAddress";
|
||||
private static final String HEADER_CREDENTIAL_ID = "X-Databus-Credential-Id";
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public ApiInvocationContext map(Object payload, Map<String, Object> headers) {
|
||||
@@ -79,18 +80,29 @@ public class ApiGatewayRequestMapper {
|
||||
}
|
||||
|
||||
Map<String, Object> requestHeaders = (Map<String, Object>) headers.get(HEADER_REQUEST_HEADERS);
|
||||
GatewayHeaderUtils.mergeNormalizedHeaders(requestHeaders, context.getRequestHeaders());
|
||||
if (requestHeaders != null) {
|
||||
context.getRequestHeaders().putAll(requestHeaders);
|
||||
}
|
||||
headers.forEach((key, value) -> {
|
||||
if (isInternalHeader(key)) {
|
||||
if (isInternalHeader(key) || value == null) {
|
||||
return;
|
||||
}
|
||||
String normalized = GatewayHeaderUtils.normalizeHeaderValue(value);
|
||||
if (normalized != null) {
|
||||
context.getRequestHeaders().putIfAbsent(key, normalized);
|
||||
}
|
||||
context.getRequestHeaders().putIfAbsent(key, String.valueOf(value));
|
||||
});
|
||||
context.setUserAgent(GatewayHeaderUtils.findFirstHeaderValue(context.getRequestHeaders(), HttpHeaders.USER_AGENT));
|
||||
context.setClientIp(resolveClientIp(headers, context.getRequestHeaders()));
|
||||
String appId = GatewayHeaderUtils.findFirstHeaderValue(context.getRequestHeaders(), ApiGatewayProperties.APP_ID_HEADER);
|
||||
if (StringUtils.hasText(appId)) {
|
||||
context.setCredentialAppId(appId.trim());
|
||||
}
|
||||
String credentialIdHeader = GatewayHeaderUtils.findFirstHeaderValue(context.getRequestHeaders(), HEADER_CREDENTIAL_ID);
|
||||
if (StringUtils.hasText(credentialIdHeader)) {
|
||||
try {
|
||||
context.setCredentialId(Long.valueOf(credentialIdHeader.trim()));
|
||||
} catch (NumberFormatException ignored) {
|
||||
context.setCredentialId(null);
|
||||
}
|
||||
}
|
||||
captureAccessLogId(context);
|
||||
populateQueryParams(headers, context, originalRequestUri);
|
||||
if (properties.isEnableTenantHeader()) {
|
||||
|
||||
@@ -0,0 +1,18 @@
|
||||
package com.zt.plat.module.databus.framework.integration.gateway.domain;
|
||||
|
||||
import lombok.Builder;
|
||||
import lombok.Value;
|
||||
|
||||
/**
|
||||
* API 授权绑定的凭证信息。
|
||||
*/
|
||||
@Value
|
||||
@Builder
|
||||
public class ApiCredentialBinding {
|
||||
|
||||
Long credentialId;
|
||||
|
||||
String appId;
|
||||
|
||||
String appName;
|
||||
}
|
||||
@@ -26,6 +26,8 @@ public class ApiDefinitionAggregate {
|
||||
|
||||
ApiFlowPublication publication;
|
||||
|
||||
List<ApiCredentialBinding> credentialBindings;
|
||||
|
||||
public List<ApiStepDefinition> getSteps() {
|
||||
return steps == null ? Collections.emptyList() : steps;
|
||||
}
|
||||
@@ -34,4 +36,8 @@ public class ApiDefinitionAggregate {
|
||||
return apiLevelTransforms == null ? Collections.emptyMap() : apiLevelTransforms;
|
||||
}
|
||||
|
||||
public List<ApiCredentialBinding> getCredentialBindings() {
|
||||
return credentialBindings == null ? Collections.emptyList() : credentialBindings;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -29,6 +29,10 @@ public class ApiInvocationContext {
|
||||
|
||||
private String userAgent;
|
||||
|
||||
private String credentialAppId;
|
||||
|
||||
private Long credentialId;
|
||||
|
||||
private String httpMethod;
|
||||
|
||||
private String requestPath;
|
||||
@@ -72,6 +76,8 @@ public class ApiInvocationContext {
|
||||
copy.tenantId = this.tenantId;
|
||||
copy.clientIp = this.clientIp;
|
||||
copy.userAgent = this.userAgent;
|
||||
copy.credentialAppId = this.credentialAppId;
|
||||
copy.credentialId = this.credentialId;
|
||||
copy.httpMethod = this.httpMethod;
|
||||
copy.requestPath = this.requestPath;
|
||||
copy.requestBody = this.requestBody;
|
||||
|
||||
@@ -15,11 +15,11 @@ import java.nio.charset.StandardCharsets;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.Enumeration;
|
||||
import java.util.LinkedHashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.TreeMap;
|
||||
import java.util.TreeSet;
|
||||
|
||||
import org.springframework.util.CollectionUtils;
|
||||
import org.springframework.util.StringUtils;
|
||||
@@ -104,7 +104,7 @@ public class CachedBodyHttpServletRequest extends HttpServletRequestWrapper {
|
||||
return;
|
||||
}
|
||||
if (!StringUtils.hasText(value)) {
|
||||
additionalHeaders.remove(name);
|
||||
removeHeader(name);
|
||||
return;
|
||||
}
|
||||
additionalHeaders.put(name, new ArrayList<>(Collections.singletonList(value)));
|
||||
@@ -115,7 +115,7 @@ public class CachedBodyHttpServletRequest extends HttpServletRequestWrapper {
|
||||
return;
|
||||
}
|
||||
if (CollectionUtils.isEmpty(values)) {
|
||||
additionalHeaders.remove(name);
|
||||
removeHeader(name);
|
||||
return;
|
||||
}
|
||||
additionalHeaders.put(name, new ArrayList<>(values));
|
||||
@@ -125,7 +125,7 @@ public class CachedBodyHttpServletRequest extends HttpServletRequestWrapper {
|
||||
if (!StringUtils.hasText(name) || !StringUtils.hasText(value)) {
|
||||
return;
|
||||
}
|
||||
additionalHeaders.compute(name, (key, existing) -> {
|
||||
additionalHeaders.compute(name, (k, existing) -> {
|
||||
List<String> list = existing == null ? new ArrayList<>() : new ArrayList<>(existing);
|
||||
list.add(value);
|
||||
return list;
|
||||
@@ -152,23 +152,20 @@ public class CachedBodyHttpServletRequest extends HttpServletRequestWrapper {
|
||||
|
||||
@Override
|
||||
public Enumeration<String> getHeaders(String name) {
|
||||
List<String> combined = new ArrayList<>();
|
||||
if (StringUtils.hasText(name)) {
|
||||
List<String> custom = additionalHeaders.get(name);
|
||||
// 如果自定义头已写入,则直接返回,避免与原始头部合并造成重复
|
||||
if (!CollectionUtils.isEmpty(custom)) {
|
||||
combined.addAll(custom);
|
||||
return Collections.enumeration(custom);
|
||||
}
|
||||
}
|
||||
Enumeration<String> parent = super.getHeaders(name);
|
||||
while (parent.hasMoreElements()) {
|
||||
combined.add(parent.nextElement());
|
||||
}
|
||||
return Collections.enumeration(combined);
|
||||
return super.getHeaders(name);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Enumeration<String> getHeaderNames() {
|
||||
Set<String> names = new LinkedHashSet<>();
|
||||
// 使用忽略大小写的集合,避免父请求头名与自定义头名大小写不同而重复
|
||||
Set<String> names = new TreeSet<>(String.CASE_INSENSITIVE_ORDER);
|
||||
Enumeration<String> parent = super.getHeaderNames();
|
||||
while (parent.hasMoreElements()) {
|
||||
names.add(parent.nextElement());
|
||||
|
||||
@@ -61,6 +61,7 @@ public class GatewaySecurityFilter extends OncePerRequestFilter {
|
||||
private final AntPathMatcher pathMatcher = new AntPathMatcher();
|
||||
private static final TypeReference<Map<String, Object>> MAP_TYPE = new TypeReference<>() {
|
||||
};
|
||||
public static final String HEADER_CREDENTIAL_ID = "X-Databus-Credential-Id";
|
||||
|
||||
@Override
|
||||
protected void doFilterInternal(HttpServletRequest request, HttpServletResponse response, FilterChain filterChain)
|
||||
@@ -133,6 +134,8 @@ public class GatewaySecurityFilter extends OncePerRequestFilter {
|
||||
|
||||
// 使用可重复读取的请求包装,供后续过滤器继续消费
|
||||
CachedBodyHttpServletRequest securedRequest = new CachedBodyHttpServletRequest(request, decryptedBody);
|
||||
securedRequest.setHeader(APP_ID_HEADER, credential.getAppId());
|
||||
securedRequest.setHeader(HEADER_CREDENTIAL_ID, credential.getId() != null ? String.valueOf(credential.getId()) : null);
|
||||
ApiGatewayAccessLogger.propagateLogIdHeader(securedRequest, accessLogId);
|
||||
if (StringUtils.hasText(request.getCharacterEncoding())) {
|
||||
securedRequest.setCharacterEncoding(request.getCharacterEncoding());
|
||||
@@ -283,7 +286,8 @@ public class GatewaySecurityFilter extends OncePerRequestFilter {
|
||||
try {
|
||||
boolean valid = CryptoSignatureUtils.verifySignature(signaturePayload, signatureType);
|
||||
if (!valid) {
|
||||
// throw new SecurityValidationException(HttpStatus.UNAUTHORIZED, "签名校验失败");
|
||||
log.error("[API-PORTAL] 签名校验失败");
|
||||
return;
|
||||
}
|
||||
} catch (IllegalArgumentException ex) {
|
||||
throw new SecurityValidationException(HttpStatus.INTERNAL_SERVER_ERROR, "签名算法配置异常");
|
||||
|
||||
@@ -4,8 +4,6 @@ import cn.hutool.core.collection.CollUtil;
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.github.benmanes.caffeine.cache.Caffeine;
|
||||
import com.github.benmanes.caffeine.cache.LoadingCache;
|
||||
import com.zt.plat.framework.common.exception.util.ServiceExceptionUtil;
|
||||
import com.zt.plat.framework.common.pojo.PageResult;
|
||||
import com.zt.plat.framework.common.util.object.BeanUtils;
|
||||
@@ -19,6 +17,7 @@ import com.zt.plat.module.databus.controller.admin.gateway.vo.definition.ApiDefi
|
||||
import com.zt.plat.module.databus.dal.dataobject.gateway.*;
|
||||
import com.zt.plat.module.databus.dal.mysql.gateway.*;
|
||||
import com.zt.plat.module.databus.enums.gateway.ApiStatusEnum;
|
||||
import com.zt.plat.module.databus.framework.integration.gateway.domain.ApiCredentialBinding;
|
||||
import com.zt.plat.module.databus.framework.integration.gateway.domain.ApiDefinitionAggregate;
|
||||
import com.zt.plat.module.databus.framework.integration.gateway.domain.ApiFlowPublication;
|
||||
import com.zt.plat.module.databus.framework.integration.gateway.domain.ApiStepDefinition;
|
||||
@@ -26,20 +25,17 @@ import com.zt.plat.module.databus.framework.integration.gateway.domain.ApiTransf
|
||||
import com.zt.plat.module.databus.service.gateway.ApiDefinitionService;
|
||||
import com.zt.plat.module.databus.service.gateway.ApiVersionService;
|
||||
import com.zt.plat.module.databus.service.gateway.ApiVersionSnapshotContextHolder;
|
||||
import jakarta.annotation.PostConstruct;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.ObjectProvider;
|
||||
import org.springframework.dao.DataAccessException;
|
||||
import org.springframework.data.redis.core.StringRedisTemplate;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
import org.springframework.util.StringUtils;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static com.zt.plat.module.databus.service.gateway.impl.GatewayServiceErrorCodeConstants.*;
|
||||
|
||||
@@ -48,27 +44,16 @@ import static com.zt.plat.module.databus.service.gateway.impl.GatewayServiceErro
|
||||
@RequiredArgsConstructor
|
||||
public class ApiDefinitionServiceImpl implements ApiDefinitionService {
|
||||
|
||||
private static final String REDIS_CACHE_PREFIX = "databus:api:def:";
|
||||
|
||||
private final ApiDefinitionMapper apiDefinitionMapper;
|
||||
private final ApiStepMapper apiStepMapper;
|
||||
private final ApiTransformMapper apiTransformMapper;
|
||||
private final ApiPolicyRateLimitMapper apiPolicyRateLimitMapper;
|
||||
private final ApiFlowPublishMapper apiFlowPublishMapper;
|
||||
private final ApiDefinitionCredentialMapper apiDefinitionCredentialMapper;
|
||||
private final ApiClientCredentialMapper apiClientCredentialMapper;
|
||||
private final ObjectMapper objectMapper;
|
||||
private final StringRedisTemplate stringRedisTemplate;
|
||||
private final ObjectProvider<ApiVersionService> apiVersionServiceProvider;
|
||||
|
||||
private LoadingCache<String, Optional<ApiDefinitionAggregate>> definitionCache;
|
||||
|
||||
@PostConstruct
|
||||
public void initCache() {
|
||||
definitionCache = Caffeine.newBuilder()
|
||||
.maximumSize(512)
|
||||
.expireAfterWrite(Duration.ofMinutes(5))
|
||||
.build(this::loadAggregateSync);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<ApiDefinitionAggregate> loadActiveDefinitions() {
|
||||
List<ApiDefinitionDO> definitions = apiDefinitionMapper.selectActiveDefinitions(Collections.singletonList(ApiStatusEnum.ONLINE.getStatus()));
|
||||
@@ -84,12 +69,9 @@ public class ApiDefinitionServiceImpl implements ApiDefinitionService {
|
||||
|
||||
@Override
|
||||
public Optional<ApiDefinitionAggregate> findByCodeAndVersion(String apiCode, String version) {
|
||||
String cacheKey = buildCacheKey(apiCode, version);
|
||||
try {
|
||||
return definitionCache.get(cacheKey);
|
||||
} catch (RuntimeException ex) {
|
||||
throw ServiceExceptionUtil.exception(API_DEFINITION_NOT_FOUND);
|
||||
}
|
||||
return apiDefinitionMapper.selectByCodeAndVersion(apiCode, version)
|
||||
.filter(definition -> ApiStatusEnum.isOnline(definition.getStatus()))
|
||||
.map(this::buildAggregate);
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -100,16 +82,12 @@ public class ApiDefinitionServiceImpl implements ApiDefinitionService {
|
||||
|
||||
@Override
|
||||
public Optional<ApiDefinitionAggregate> refresh(String apiCode, String version) {
|
||||
String cacheKey = buildCacheKey(apiCode, version);
|
||||
definitionCache.invalidate(cacheKey);
|
||||
deleteRedis(cacheKey);
|
||||
return findByCodeAndVersion(apiCode, version);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void refreshAllCache() {
|
||||
definitionCache.invalidateAll();
|
||||
clearRedisCacheForTenant(TenantContextHolder.getTenantId());
|
||||
// 缓存已移除,此处留空以保持接口兼容
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -129,6 +107,7 @@ public class ApiDefinitionServiceImpl implements ApiDefinitionService {
|
||||
validateDuplication(reqVO, null);
|
||||
validateStructure(reqVO);
|
||||
validatePolicies(reqVO);
|
||||
validateCredentials(reqVO.getCredentialIds());
|
||||
|
||||
ApiDefinitionDO definition = buildDefinitionDO(reqVO, null);
|
||||
apiDefinitionMapper.insert(definition);
|
||||
@@ -136,6 +115,7 @@ public class ApiDefinitionServiceImpl implements ApiDefinitionService {
|
||||
|
||||
persistApiLevelTransforms(apiId, reqVO.getApiLevelTransforms());
|
||||
persistSteps(apiId, reqVO.getSteps());
|
||||
persistCredentialBindings(apiId, reqVO.getCredentialIds());
|
||||
|
||||
String operator = SecurityFrameworkUtils.getLoginUserNickname();
|
||||
String description = String.format("创建 API (%s)", reqVO.getVersion());
|
||||
@@ -154,16 +134,17 @@ public class ApiDefinitionServiceImpl implements ApiDefinitionService {
|
||||
validateDuplication(reqVO, existing.getId());
|
||||
validateStructure(reqVO);
|
||||
validatePolicies(reqVO);
|
||||
validateCredentials(reqVO.getCredentialIds());
|
||||
|
||||
ApiDefinitionDO updateObj = buildDefinitionDO(reqVO, existing);
|
||||
apiDefinitionMapper.updateById(updateObj);
|
||||
|
||||
invalidateCache(existing.getTenantId(), existing.getApiCode(), existing.getVersion());
|
||||
apiTransformMapper.deleteByApiId(existing.getId());
|
||||
apiStepMapper.deleteByApiId(existing.getId());
|
||||
apiDefinitionCredentialMapper.deleteByApiId(existing.getId());
|
||||
persistApiLevelTransforms(existing.getId(), reqVO.getApiLevelTransforms());
|
||||
persistSteps(existing.getId(), reqVO.getSteps());
|
||||
invalidateCache(updateObj.getTenantId(), updateObj.getApiCode(), updateObj.getVersion());
|
||||
persistCredentialBindings(existing.getId(), reqVO.getCredentialIds());
|
||||
} finally {
|
||||
if (skipSnapshot) {
|
||||
ApiVersionSnapshotContextHolder.clear();
|
||||
@@ -183,77 +164,12 @@ public class ApiDefinitionServiceImpl implements ApiDefinitionService {
|
||||
@Transactional(rollbackFor = Exception.class)
|
||||
public void delete(Long id) {
|
||||
ApiDefinitionDO existing = ensureExists(id);
|
||||
invalidateCache(existing.getTenantId(), existing.getApiCode(), existing.getVersion());
|
||||
apiTransformMapper.deleteByApiId(id);
|
||||
apiStepMapper.deleteByApiId(id);
|
||||
apiDefinitionCredentialMapper.deleteByApiId(id);
|
||||
apiDefinitionMapper.deleteById(id);
|
||||
}
|
||||
|
||||
private Optional<ApiDefinitionAggregate> loadAggregateSync(String cacheKey) {
|
||||
Optional<ApiDefinitionAggregate> cached = loadFromRedis(cacheKey);
|
||||
if (cached.isPresent()) {
|
||||
return cached;
|
||||
}
|
||||
String[] parts = cacheKey.split(":");
|
||||
String apiCode = parts[1];
|
||||
String version = parts[2];
|
||||
Optional<ApiDefinitionAggregate> aggregate = apiDefinitionMapper.selectByCodeAndVersion(apiCode, version)
|
||||
.filter(definition -> ApiStatusEnum.isOnline(definition.getStatus()))
|
||||
.map(this::buildAggregate);
|
||||
aggregate.ifPresent(value -> persistToRedis(cacheKey, value));
|
||||
return aggregate;
|
||||
}
|
||||
|
||||
private Optional<ApiDefinitionAggregate> loadFromRedis(String cacheKey) {
|
||||
try {
|
||||
String json = stringRedisTemplate.opsForValue().get(REDIS_CACHE_PREFIX + cacheKey);
|
||||
if (!StringUtils.hasText(json)) {
|
||||
return Optional.empty();
|
||||
}
|
||||
ApiDefinitionAggregate aggregate = objectMapper.readValue(json, ApiDefinitionAggregate.class);
|
||||
return Optional.of(aggregate);
|
||||
} catch (JsonProcessingException | DataAccessException ex) {
|
||||
log.warn("反序列化 Redis 中 key {} 的 API 定义聚合失败", cacheKey, ex);
|
||||
return Optional.empty();
|
||||
}
|
||||
}
|
||||
|
||||
private void persistToRedis(String cacheKey, ApiDefinitionAggregate aggregate) {
|
||||
try {
|
||||
String json = objectMapper.writeValueAsString(aggregate);
|
||||
stringRedisTemplate.opsForValue().set(REDIS_CACHE_PREFIX + cacheKey, json, 5, TimeUnit.MINUTES);
|
||||
} catch (JsonProcessingException | DataAccessException ex) {
|
||||
log.warn("将 API 定义聚合写入 Redis key {} 失败", cacheKey, ex);
|
||||
}
|
||||
}
|
||||
|
||||
private void deleteRedis(String cacheKey) {
|
||||
try {
|
||||
stringRedisTemplate.delete(REDIS_CACHE_PREFIX + cacheKey);
|
||||
} catch (DataAccessException ex) {
|
||||
log.warn("删除 Redis 中 key {} 的 API 定义聚合失败", cacheKey, ex);
|
||||
}
|
||||
}
|
||||
|
||||
private void clearRedisCacheForTenant(Long tenantId) {
|
||||
String tenantPart = tenantId == null ? "global" : tenantId.toString();
|
||||
String pattern = REDIS_CACHE_PREFIX + tenantPart + ":*";
|
||||
try {
|
||||
Set<String> keys = stringRedisTemplate.keys(pattern);
|
||||
if (CollectionUtils.isEmpty(keys)) {
|
||||
return;
|
||||
}
|
||||
stringRedisTemplate.delete(keys);
|
||||
} catch (DataAccessException ex) {
|
||||
log.warn("批量删除 Redis 中匹配 {} 的 API 定义聚合失败", pattern, ex);
|
||||
}
|
||||
}
|
||||
|
||||
private String buildCacheKey(String apiCode, String version) {
|
||||
Long tenantId = TenantContextHolder.getTenantId();
|
||||
return buildCacheKeyForTenant(tenantId, apiCode, version);
|
||||
}
|
||||
|
||||
/**
|
||||
* 构建包含步骤、变换、策略等元数据的聚合对象,供缓存与运行时直接使用。
|
||||
*/
|
||||
@@ -283,12 +199,14 @@ public class ApiDefinitionServiceImpl implements ApiDefinitionService {
|
||||
ApiFlowPublication publication = apiFlowPublishMapper.selectActiveByApiId(definition.getId())
|
||||
.map(this::convertPublication)
|
||||
.orElse(null);
|
||||
List<ApiCredentialBinding> credentialBindings = loadCredentialBindings(definition.getId());
|
||||
return ApiDefinitionAggregate.builder()
|
||||
.definition(definition)
|
||||
.steps(stepDefinitions)
|
||||
.apiLevelTransforms(apiTransforms)
|
||||
.rateLimitPolicy(rateLimitPolicy)
|
||||
.publication(publication)
|
||||
.credentialBindings(credentialBindings)
|
||||
.build();
|
||||
}
|
||||
|
||||
@@ -497,22 +415,91 @@ public class ApiDefinitionServiceImpl implements ApiDefinitionService {
|
||||
}
|
||||
}
|
||||
|
||||
private void validateCredentials(List<Long> credentialIds) {
|
||||
if (CollectionUtils.isEmpty(credentialIds)) {
|
||||
return;
|
||||
}
|
||||
List<ApiClientCredentialDO> credentials = apiClientCredentialMapper.selectBatchIds(credentialIds);
|
||||
long validCount = credentials == null ? 0 : credentials.stream()
|
||||
.filter(credential -> credential != null && !Boolean.TRUE.equals(credential.getDeleted()))
|
||||
.count();
|
||||
if (validCount != credentialIds.size()) {
|
||||
throw ServiceExceptionUtil.exception(API_CREDENTIAL_NOT_FOUND);
|
||||
}
|
||||
}
|
||||
|
||||
private Long resolveTenantIdentifier() {
|
||||
return TenantContextHolder.getTenantId();
|
||||
}
|
||||
|
||||
private void invalidateCache(Long tenantId, String apiCode, String version) {
|
||||
if (!StringUtils.hasText(apiCode) || !StringUtils.hasText(version)) {
|
||||
/**
|
||||
* 先删除旧绑定,再对去重后的 credentialIds 批量插入,避免唯一约束冲突。
|
||||
*/
|
||||
private void persistCredentialBindings(Long apiId, List<Long> credentialIds) {
|
||||
// 先逻辑删除当前 API 的旧绑定,保留历史,同时避免重复插入
|
||||
apiDefinitionCredentialMapper.logicDeleteByApiId(apiId);
|
||||
|
||||
if (CollectionUtils.isEmpty(credentialIds)) {
|
||||
return;
|
||||
}
|
||||
String cacheKey = buildCacheKeyForTenant(tenantId, apiCode, version);
|
||||
definitionCache.invalidate(cacheKey);
|
||||
deleteRedis(cacheKey);
|
||||
|
||||
// 去重后再查询有效凭证
|
||||
List<Long> distinctIds = credentialIds.stream()
|
||||
.filter(Objects::nonNull)
|
||||
.distinct()
|
||||
.toList();
|
||||
if (CollectionUtils.isEmpty(distinctIds)) {
|
||||
return;
|
||||
}
|
||||
|
||||
List<ApiClientCredentialDO> credentials = apiClientCredentialMapper.selectBatchIds(distinctIds);
|
||||
if (CollectionUtils.isEmpty(credentials)) {
|
||||
return;
|
||||
}
|
||||
|
||||
for (ApiClientCredentialDO credential : credentials) {
|
||||
if (credential == null || Boolean.TRUE.equals(credential.getDeleted())) {
|
||||
continue;
|
||||
}
|
||||
ApiDefinitionCredentialDO relation = new ApiDefinitionCredentialDO();
|
||||
relation.setId(null);
|
||||
relation.setApiId(apiId);
|
||||
relation.setCredentialId(credential.getId());
|
||||
relation.setAppId(credential.getAppId());
|
||||
relation.setDeleted(Boolean.FALSE);
|
||||
apiDefinitionCredentialMapper.insert(relation);
|
||||
}
|
||||
}
|
||||
|
||||
private String buildCacheKeyForTenant(Long tenantId, String apiCode, String version) {
|
||||
String tenantPart = tenantId == null ? "global" : tenantId.toString();
|
||||
return tenantPart + ":" + apiCode.toLowerCase(Locale.ROOT) + ":" + version;
|
||||
private List<ApiCredentialBinding> loadCredentialBindings(Long apiId) {
|
||||
List<ApiDefinitionCredentialDO> relations = apiDefinitionCredentialMapper.selectByApiId(apiId);
|
||||
if (CollectionUtils.isEmpty(relations)) {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
List<Long> credentialIds = relations.stream()
|
||||
.map(ApiDefinitionCredentialDO::getCredentialId)
|
||||
.filter(Objects::nonNull)
|
||||
.toList();
|
||||
Map<Long, ApiClientCredentialDO> credentialMap = Collections.emptyMap();
|
||||
if (!CollectionUtils.isEmpty(credentialIds)) {
|
||||
List<ApiClientCredentialDO> credentials = apiClientCredentialMapper.selectBatchIds(credentialIds);
|
||||
if (!CollectionUtils.isEmpty(credentials)) {
|
||||
credentialMap = credentials.stream()
|
||||
.filter(credential -> credential != null && !Boolean.TRUE.equals(credential.getDeleted()))
|
||||
.collect(Collectors.toMap(ApiClientCredentialDO::getId, c -> c, (a, b) -> a));
|
||||
}
|
||||
}
|
||||
List<ApiCredentialBinding> bindings = new ArrayList<>(relations.size());
|
||||
for (ApiDefinitionCredentialDO relation : relations) {
|
||||
ApiClientCredentialDO credential = relation.getCredentialId() == null ? null : credentialMap.get(relation.getCredentialId());
|
||||
ApiCredentialBinding binding = ApiCredentialBinding.builder()
|
||||
.credentialId(relation.getCredentialId())
|
||||
.appId(relation.getAppId())
|
||||
.appName(credential != null ? credential.getAppName() : null)
|
||||
.build();
|
||||
bindings.add(binding);
|
||||
}
|
||||
return bindings;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -86,13 +86,8 @@ public class ApiPolicyRateLimitServiceImpl implements ApiPolicyRateLimitService
|
||||
private void apply(ApiPolicySaveReqVO reqVO, ApiPolicyRateLimitDO target) {
|
||||
target.setName(StrUtil.trim(reqVO.getName()));
|
||||
target.setType(StrUtil.trim(reqVO.getType()));
|
||||
target.setConfig(normalizeNullable(reqVO.getConfig()));
|
||||
target.setDescription(normalizeNullable(reqVO.getDescription()));
|
||||
}
|
||||
|
||||
private String normalizeNullable(String value) {
|
||||
String trimmed = StrUtil.trim(value);
|
||||
return StrUtil.isEmpty(trimmed) ? null : trimmed;
|
||||
target.setConfig(StrUtil.trim(reqVO.getConfig()));
|
||||
target.setDescription(StrUtil.trim(reqVO.getDescription()));
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -11,11 +11,13 @@ import com.zt.plat.module.databus.controller.admin.gateway.vo.definition.ApiDefi
|
||||
import com.zt.plat.module.databus.controller.admin.gateway.vo.version.ApiVersionCompareRespVO;
|
||||
import com.zt.plat.module.databus.controller.admin.gateway.vo.version.ApiVersionPageReqVO;
|
||||
import com.zt.plat.module.databus.dal.dataobject.gateway.ApiDefinitionDO;
|
||||
import com.zt.plat.module.databus.dal.dataobject.gateway.ApiDefinitionCredentialDO;
|
||||
import com.zt.plat.module.databus.dal.dataobject.gateway.ApiStepDO;
|
||||
import com.zt.plat.module.databus.dal.dataobject.gateway.ApiTransformDO;
|
||||
import com.zt.plat.module.databus.dal.dataobject.gateway.ApiVersionDO;
|
||||
import com.zt.plat.module.databus.dal.mapper.gateway.ApiVersionMapper;
|
||||
import com.zt.plat.module.databus.dal.mysql.gateway.ApiDefinitionMapper;
|
||||
import com.zt.plat.module.databus.dal.mysql.gateway.ApiDefinitionCredentialMapper;
|
||||
import com.zt.plat.module.databus.dal.mysql.gateway.ApiStepMapper;
|
||||
import com.zt.plat.module.databus.dal.mysql.gateway.ApiTransformMapper;
|
||||
import com.zt.plat.module.databus.service.gateway.ApiDefinitionService;
|
||||
@@ -47,6 +49,7 @@ public class ApiVersionServiceImpl implements ApiVersionService {
|
||||
private final ApiDefinitionMapper apiDefinitionMapper;
|
||||
private final ApiStepMapper apiStepMapper;
|
||||
private final ApiTransformMapper apiTransformMapper;
|
||||
private final ApiDefinitionCredentialMapper apiDefinitionCredentialMapper;
|
||||
private final ObjectMapper objectMapper;
|
||||
private final ApiDefinitionService apiDefinitionService;
|
||||
|
||||
@@ -191,6 +194,15 @@ public class ApiVersionServiceImpl implements ApiVersionService {
|
||||
snapshot.setApiLevelTransforms(ApiDefinitionConvert.INSTANCE.convertTransformList(apiTransforms));
|
||||
}
|
||||
|
||||
List<ApiDefinitionCredentialDO> credentialRelations = apiDefinitionCredentialMapper.selectByApiId(apiId);
|
||||
if (credentialRelations != null && !credentialRelations.isEmpty()) {
|
||||
List<Long> credentialIds = credentialRelations.stream()
|
||||
.map(ApiDefinitionCredentialDO::getCredentialId)
|
||||
.filter(Objects::nonNull)
|
||||
.toList();
|
||||
snapshot.setCredentialIds(credentialIds);
|
||||
}
|
||||
|
||||
return snapshot;
|
||||
}
|
||||
|
||||
|
||||
@@ -61,5 +61,6 @@ public interface GatewayServiceErrorCodeConstants {
|
||||
ErrorCode API_VERSION_SNAPSHOT_DESERIALIZE_FAILED = new ErrorCode(1_010_000_052, "API 版本快照反序列化失败");
|
||||
ErrorCode API_VERSION_ACTIVE_CANNOT_DELETE = new ErrorCode(1_010_000_053, "当前激活版本不允许删除");
|
||||
ErrorCode API_VERSION_API_MISMATCH = new ErrorCode(1_010_000_054, "两个版本不属于同一 API");
|
||||
ErrorCode API_CREDENTIAL_UNAUTHORIZED = new ErrorCode(1_010_000_055, "当前凭证无权访问该 API");
|
||||
|
||||
}
|
||||
|
||||
@@ -52,6 +52,8 @@ spring:
|
||||
host: 172.16.46.63 # 地址
|
||||
port: 30379 # 端口
|
||||
database: 0 # 数据库索引
|
||||
username: zt-redis
|
||||
password: P@ssword25
|
||||
# password: 123456 # 密码,建议生产环境开启
|
||||
|
||||
xxl:
|
||||
|
||||
@@ -130,6 +130,7 @@ zt:
|
||||
- /databus/api/portal/**
|
||||
ignore-tables:
|
||||
- databus_api_client_credential
|
||||
- databus_api_definition_credential
|
||||
# DataBus 数据同步服务端配置
|
||||
databus:
|
||||
sync:
|
||||
|
||||
@@ -0,0 +1,53 @@
|
||||
package com.zt.plat.module.databus.framework.integration.gateway.core;
|
||||
|
||||
import com.zt.plat.framework.common.exception.ServiceException;
|
||||
import com.zt.plat.framework.common.exception.util.ServiceExceptionUtil;
|
||||
import com.zt.plat.module.databus.framework.integration.gateway.model.ApiInvocationContext;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.springframework.http.HttpStatus;
|
||||
|
||||
import static com.zt.plat.module.databus.service.gateway.impl.GatewayServiceErrorCodeConstants.API_CREDENTIAL_UNAUTHORIZED;
|
||||
import static com.zt.plat.module.databus.service.gateway.impl.GatewayServiceErrorCodeConstants.API_RATE_LIMIT_EXCEEDED;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
class ApiGatewayErrorProcessorTest {
|
||||
|
||||
private final ApiGatewayErrorProcessor processor = new ApiGatewayErrorProcessor();
|
||||
|
||||
@Test
|
||||
void applyServiceException_should_fill_status_message_and_body() {
|
||||
ApiInvocationContext context = ApiInvocationContext.create();
|
||||
ServiceException exception = ServiceExceptionUtil.exception(API_RATE_LIMIT_EXCEEDED);
|
||||
|
||||
processor.applyServiceException(context, exception);
|
||||
|
||||
assertThat(context.getResponseStatus()).isEqualTo(HttpStatus.TOO_MANY_REQUESTS.value());
|
||||
assertThat(context.getResponseMessage()).isNotBlank();
|
||||
assertThat(context.getResponseBody()).isInstanceOfSatisfying(java.util.Map.class, body -> {
|
||||
assertThat(body).containsEntry("errorCode", API_RATE_LIMIT_EXCEEDED.getCode());
|
||||
assertThat(body).containsKey("errorMessage");
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
void applyUnexpectedException_should_use_first_non_empty_message() {
|
||||
ApiInvocationContext context = ApiInvocationContext.create();
|
||||
RuntimeException exception = new RuntimeException("outer", new IllegalStateException("inner cause"));
|
||||
|
||||
processor.applyUnexpectedException(context, exception);
|
||||
|
||||
assertThat(context.getResponseStatus()).isEqualTo(HttpStatus.INTERNAL_SERVER_ERROR.value());
|
||||
assertThat(context.getResponseMessage()).isEqualTo("outer");
|
||||
assertThat(context.getResponseBody()).isInstanceOf(java.util.Map.class);
|
||||
}
|
||||
|
||||
@Test
|
||||
void resolveServiceException_should_traverse_causes() {
|
||||
ServiceException serviceException = ServiceExceptionUtil.exception(API_CREDENTIAL_UNAUTHORIZED);
|
||||
RuntimeException wrapped = new RuntimeException(new IllegalStateException(serviceException));
|
||||
|
||||
ServiceException resolved = processor.resolveServiceException(wrapped);
|
||||
|
||||
assertThat(resolved).isSameAs(serviceException);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,140 @@
|
||||
package com.zt.plat.module.databus.framework.integration.gateway.core;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.zt.plat.framework.common.exception.ServiceException;
|
||||
import com.zt.plat.framework.common.exception.util.ServiceExceptionUtil;
|
||||
import com.zt.plat.module.databus.controller.admin.gateway.vo.ApiGatewayInvokeReqVO;
|
||||
import com.zt.plat.module.databus.dal.dataobject.gateway.ApiDefinitionDO;
|
||||
import com.zt.plat.module.databus.framework.integration.config.ApiGatewayProperties;
|
||||
import com.zt.plat.module.databus.framework.integration.gateway.domain.ApiCredentialBinding;
|
||||
import com.zt.plat.module.databus.framework.integration.gateway.domain.ApiDefinitionAggregate;
|
||||
import com.zt.plat.module.databus.framework.integration.gateway.model.ApiGatewayResponse;
|
||||
import com.zt.plat.module.databus.framework.integration.gateway.model.ApiInvocationContext;
|
||||
import com.zt.plat.module.databus.service.gateway.ApiDefinitionService;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.junit.jupiter.MockitoExtension;
|
||||
import org.springframework.http.ResponseEntity;
|
||||
import org.springframework.integration.http.HttpHeaders;
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.messaging.support.MessageBuilder;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
|
||||
import static com.zt.plat.module.databus.service.gateway.impl.GatewayServiceErrorCodeConstants.API_CREDENTIAL_UNAUTHORIZED;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.ArgumentMatchers.eq;
|
||||
import static org.mockito.Mockito.*;
|
||||
|
||||
@ExtendWith(MockitoExtension.class)
|
||||
class ApiGatewayExecutionServiceTest {
|
||||
|
||||
private ApiGatewayExecutionService service;
|
||||
|
||||
@Mock
|
||||
private ApiFlowDispatcher apiFlowDispatcher;
|
||||
|
||||
@Mock
|
||||
private ApiGatewayErrorProcessor errorProcessor;
|
||||
|
||||
@Mock
|
||||
private ApiGatewayAccessLogger accessLogger;
|
||||
|
||||
@Mock
|
||||
private ApiDefinitionService apiDefinitionService;
|
||||
|
||||
private ApiGatewayProperties properties;
|
||||
|
||||
private ApiGatewayRequestMapper requestMapper;
|
||||
|
||||
private final ObjectMapper objectMapper = new ObjectMapper();
|
||||
|
||||
@BeforeEach
|
||||
void setUp() {
|
||||
properties = new ApiGatewayProperties();
|
||||
properties.setBasePath("/databus/api");
|
||||
properties.setEnableTenantHeader(true);
|
||||
properties.setTenantHeader("ZT-Tenant-Id");
|
||||
requestMapper = new ApiGatewayRequestMapper(objectMapper, properties);
|
||||
service = new ApiGatewayExecutionService(requestMapper, apiFlowDispatcher, errorProcessor, properties, objectMapper, accessLogger, apiDefinitionService);
|
||||
}
|
||||
|
||||
@Test
|
||||
void invokeForDebug_should_dispatch_with_inactive_definition() {
|
||||
ApiGatewayInvokeReqVO reqVO = new ApiGatewayInvokeReqVO();
|
||||
reqVO.setApiCode("demo.api");
|
||||
reqVO.setVersion("v1");
|
||||
reqVO.getHeaders().put("ZT-Auth-Token", "debug-token");
|
||||
reqVO.getHeaders().put(ApiGatewayProperties.APP_ID_HEADER, "app-1");
|
||||
reqVO.getQueryParams().put("trace", "1");
|
||||
reqVO.setPayload(Map.of("name", "alice"));
|
||||
|
||||
ApiDefinitionAggregate aggregate = ApiDefinitionAggregate.builder()
|
||||
.definition(definition("demo.api", "v1"))
|
||||
.build();
|
||||
|
||||
when(apiDefinitionService.findByCodeAndVersion(eq("demo.api"), eq("v1"))).thenReturn(Optional.empty());
|
||||
when(apiDefinitionService.findByCodeAndVersionIncludingInactive(eq("demo.api"), eq("v1"))).thenReturn(Optional.of(aggregate));
|
||||
when(apiFlowDispatcher.dispatchWithAggregate(eq(aggregate), any(ApiInvocationContext.class)))
|
||||
.thenAnswer(invocation -> {
|
||||
ApiInvocationContext ctx = invocation.getArgument(1);
|
||||
ctx.setResponseStatus(201);
|
||||
ctx.setResponseMessage("created");
|
||||
ctx.setResponseBody(Map.of("ok", true));
|
||||
return ctx;
|
||||
});
|
||||
|
||||
ResponseEntity<ApiGatewayResponse> responseEntity = service.invokeForDebug(reqVO);
|
||||
|
||||
assertThat(responseEntity.getStatusCodeValue()).isEqualTo(201);
|
||||
assertThat(responseEntity.getBody()).isNotNull();
|
||||
assertThat(responseEntity.getBody().getMessage()).isEqualTo("created");
|
||||
assertThat(responseEntity.getBody().getResponse()).isEqualTo(Map.of("ok", true));
|
||||
}
|
||||
|
||||
@Test
|
||||
void dispatch_should_apply_credential_authorization_for_non_debug() {
|
||||
ApiInvocationContext context = ApiInvocationContext.create();
|
||||
context.setApiCode("secure.api");
|
||||
context.setApiVersion("v1");
|
||||
|
||||
ApiDefinitionAggregate aggregate = ApiDefinitionAggregate.builder()
|
||||
.definition(definition("secure.api", "v1"))
|
||||
.credentialBindings(List.of(ApiCredentialBinding.builder().appId("app-x").build()))
|
||||
.build();
|
||||
|
||||
when(apiDefinitionService.findByCodeAndVersion(eq("secure.api"), eq("v1"))).thenReturn(Optional.of(aggregate));
|
||||
doAnswer(invocation -> {
|
||||
ApiInvocationContext ctx = invocation.getArgument(0);
|
||||
ServiceException ex = ServiceExceptionUtil.exception(API_CREDENTIAL_UNAUTHORIZED);
|
||||
ctx.setResponseStatus(403);
|
||||
ctx.setResponseMessage(ex.getMessage());
|
||||
return null;
|
||||
}).when(errorProcessor).applyServiceException(any(ApiInvocationContext.class), any(ServiceException.class));
|
||||
|
||||
Message<ApiInvocationContext> message = MessageBuilder.withPayload(context)
|
||||
.setHeader("apiCode", "secure.api")
|
||||
.setHeader("version", "v1")
|
||||
.setHeader(HttpHeaders.REQUEST_METHOD, "POST")
|
||||
.build();
|
||||
|
||||
ApiInvocationContext result = service.dispatch(message);
|
||||
|
||||
assertThat(result.getResponseStatus()).isEqualTo(403);
|
||||
verify(apiFlowDispatcher, never()).dispatch(anyString(), anyString(), any());
|
||||
verify(errorProcessor, times(1)).applyServiceException(any(ApiInvocationContext.class), any(ServiceException.class));
|
||||
}
|
||||
|
||||
private ApiDefinitionDO definition(String apiCode, String version) {
|
||||
ApiDefinitionDO definitionDO = new ApiDefinitionDO();
|
||||
definitionDO.setApiCode(apiCode);
|
||||
definitionDO.setVersion(version);
|
||||
return definitionDO;
|
||||
}
|
||||
}
|
||||
@@ -73,8 +73,8 @@ class ApiGatewayRequestMapperTest {
|
||||
|
||||
ApiInvocationContext context = mapper.map("", headers);
|
||||
|
||||
assertThat(context.getRequestHeaders().get("ZT-Auth-Token")).isEqualTo("token-123");
|
||||
assertThat(context.getRequestHeaders().get("zt-auth-token")).isEqualTo("token-123");
|
||||
assertThat(context.getRequestHeaders().get("ZT-Auth-Token")).isEqualTo(List.of("token-123"));
|
||||
assertThat(context.getRequestHeaders().get("zt-auth-token")).isEqualTo(List.of("token-123"));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
||||
@@ -0,0 +1,103 @@
|
||||
package com.zt.plat.module.databus.framework.integration.gateway.core;
|
||||
|
||||
import com.zt.plat.module.databus.dal.dataobject.gateway.ApiDefinitionDO;
|
||||
import com.zt.plat.module.databus.framework.integration.gateway.domain.ApiDefinitionAggregate;
|
||||
import com.zt.plat.module.databus.service.gateway.ApiDefinitionService;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.junit.jupiter.MockitoExtension;
|
||||
import org.springframework.integration.dsl.IntegrationFlow;
|
||||
import org.springframework.integration.dsl.context.IntegrationFlowContext;
|
||||
import org.springframework.messaging.MessageChannel;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.ArgumentMatchers.anyString;
|
||||
import static org.mockito.Mockito.*;
|
||||
|
||||
@ExtendWith(MockitoExtension.class)
|
||||
class IntegrationFlowManagerTest {
|
||||
|
||||
@Mock
|
||||
private IntegrationFlowContext integrationFlowContext;
|
||||
|
||||
@Mock
|
||||
private ApiDefinitionService apiDefinitionService;
|
||||
|
||||
@Mock
|
||||
private ApiFlowAssembler apiFlowAssembler;
|
||||
|
||||
@Mock
|
||||
private IntegrationFlowContext.IntegrationFlowRegistration registration;
|
||||
|
||||
private IntegrationFlowManager manager;
|
||||
|
||||
@BeforeEach
|
||||
void setUp() {
|
||||
manager = new IntegrationFlowManager(integrationFlowContext, apiDefinitionService, apiFlowAssembler);
|
||||
when(registration.getId()).thenReturn("flow-id");
|
||||
}
|
||||
|
||||
@Test
|
||||
void refreshAll_should_register_active_flows_and_remove_stale() {
|
||||
ApiDefinitionAggregate agg1 = aggregate("order.create", "v1");
|
||||
ApiDefinitionAggregate agg2 = aggregate("order.pay", "v1");
|
||||
|
||||
when(apiDefinitionService.loadActiveDefinitions()).thenReturn(List.of(agg1, agg2));
|
||||
IntegrationFlow flow = mock(IntegrationFlow.class);
|
||||
when(apiFlowAssembler.assemble(any())).thenReturn(ApiFlowRegistration.builder()
|
||||
.flowId("flow-id")
|
||||
.inputChannelName("ch")
|
||||
.flow(flow)
|
||||
.build());
|
||||
IntegrationFlowContext.IntegrationFlowRegistrationBuilder builder = mock(IntegrationFlowContext.IntegrationFlowRegistrationBuilder.class, RETURNS_DEEP_STUBS);
|
||||
when(integrationFlowContext.registration(any(IntegrationFlow.class))).thenReturn(builder);
|
||||
when(builder.id(anyString()).register()).thenReturn(registration);
|
||||
when(registration.getInputChannel()).thenReturn(mock(MessageChannel.class));
|
||||
|
||||
manager.refreshAll();
|
||||
|
||||
Optional<MessageChannel> channel = manager.locateInputChannel("order.create", "v1");
|
||||
assertThat(channel).isPresent();
|
||||
|
||||
when(apiDefinitionService.loadActiveDefinitions()).thenReturn(List.of(agg1));
|
||||
manager.refreshAll();
|
||||
|
||||
verify(integrationFlowContext, atLeastOnce()).remove(anyString());
|
||||
}
|
||||
|
||||
@Test
|
||||
void obtainDebugHandle_should_reuse_existing_registration_when_present() {
|
||||
ApiDefinitionAggregate agg = aggregate("demo", "v1");
|
||||
IntegrationFlow flow = mock(IntegrationFlow.class);
|
||||
when(apiFlowAssembler.assemble(any())).thenReturn(ApiFlowRegistration.builder()
|
||||
.flowId("flow-debug")
|
||||
.inputChannelName("ch")
|
||||
.flow(flow)
|
||||
.build());
|
||||
IntegrationFlowContext.IntegrationFlowRegistrationBuilder builder = mock(IntegrationFlowContext.IntegrationFlowRegistrationBuilder.class, RETURNS_DEEP_STUBS);
|
||||
when(integrationFlowContext.registration(any(IntegrationFlow.class))).thenReturn(builder);
|
||||
when(builder.id(anyString()).register()).thenReturn(registration);
|
||||
MessageChannel input = mock(MessageChannel.class);
|
||||
when(registration.getInputChannel()).thenReturn(input);
|
||||
when(apiDefinitionService.loadActiveDefinitions()).thenReturn(List.of(agg));
|
||||
|
||||
manager.refreshAll();
|
||||
IntegrationFlowManager.DebugFlowHandle handle = manager.obtainDebugHandle(agg);
|
||||
|
||||
assertThat(handle.channel()).isNotNull();
|
||||
assertThat(handle.temporary()).isFalse();
|
||||
}
|
||||
|
||||
private ApiDefinitionAggregate aggregate(String apiCode, String version) {
|
||||
ApiDefinitionDO definitionDO = new ApiDefinitionDO();
|
||||
definitionDO.setApiCode(apiCode);
|
||||
definitionDO.setVersion(version);
|
||||
return ApiDefinitionAggregate.builder().definition(definitionDO).build();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,94 @@
|
||||
package com.zt.plat.module.databus.framework.integration.gateway.policy;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.zt.plat.framework.common.exception.ServiceException;
|
||||
import com.zt.plat.module.databus.dal.dataobject.gateway.ApiDefinitionDO;
|
||||
import com.zt.plat.module.databus.dal.dataobject.gateway.ApiPolicyRateLimitDO;
|
||||
import com.zt.plat.module.databus.framework.integration.gateway.domain.ApiDefinitionAggregate;
|
||||
import com.zt.plat.module.databus.framework.integration.gateway.model.ApiInvocationContext;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.junit.jupiter.MockitoExtension;
|
||||
import org.springframework.dao.DataAccessResourceFailureException;
|
||||
import org.springframework.data.redis.core.StringRedisTemplate;
|
||||
import org.springframework.data.redis.core.ValueOperations;
|
||||
|
||||
import static com.zt.plat.module.databus.service.gateway.impl.GatewayServiceErrorCodeConstants.API_RATE_LIMIT_EVALUATION_FAILED;
|
||||
import static com.zt.plat.module.databus.service.gateway.impl.GatewayServiceErrorCodeConstants.API_RATE_LIMIT_EXCEEDED;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.Mockito.*;
|
||||
|
||||
@ExtendWith(MockitoExtension.class)
|
||||
class DefaultRateLimitPolicyEvaluatorTest {
|
||||
|
||||
@Mock
|
||||
private StringRedisTemplate stringRedisTemplate;
|
||||
|
||||
@Mock
|
||||
private ValueOperations<String, String> valueOperations;
|
||||
|
||||
private DefaultRateLimitPolicyEvaluator evaluator;
|
||||
|
||||
private ApiDefinitionAggregate aggregate;
|
||||
|
||||
@BeforeEach
|
||||
void setUp() {
|
||||
ObjectMapper objectMapper = new ObjectMapper();
|
||||
evaluator = new DefaultRateLimitPolicyEvaluator(objectMapper, stringRedisTemplate);
|
||||
|
||||
ApiDefinitionDO definitionDO = new ApiDefinitionDO();
|
||||
definitionDO.setApiCode("order.create");
|
||||
definitionDO.setVersion("v1");
|
||||
|
||||
ApiPolicyRateLimitDO rateLimitDO = new ApiPolicyRateLimitDO();
|
||||
rateLimitDO.setConfig("{\"limit\":2,\"windowSeconds\":60}");
|
||||
|
||||
aggregate = ApiDefinitionAggregate.builder()
|
||||
.definition(definitionDO)
|
||||
.rateLimitPolicy(rateLimitDO)
|
||||
.build();
|
||||
|
||||
when(stringRedisTemplate.opsForValue()).thenReturn(valueOperations);
|
||||
}
|
||||
|
||||
@Test
|
||||
void evaluate_should_set_expire_on_first_hit_and_pass_under_limit() {
|
||||
when(valueOperations.increment(any())).thenReturn(1L, 2L);
|
||||
|
||||
ApiInvocationContext context = ApiInvocationContext.create();
|
||||
context.getRequestHeaders().put("X-Client-Id", "c1");
|
||||
|
||||
evaluator.evaluate(aggregate, context);
|
||||
evaluator.evaluate(aggregate, context);
|
||||
|
||||
verify(stringRedisTemplate, times(2)).opsForValue();
|
||||
verify(valueOperations, times(2)).increment(any());
|
||||
verify(stringRedisTemplate).expire(any(), any());
|
||||
}
|
||||
|
||||
@Test
|
||||
void evaluate_should_throw_when_exceed_limit() {
|
||||
when(valueOperations.increment(any())).thenReturn(3L);
|
||||
|
||||
ApiInvocationContext context = ApiInvocationContext.create();
|
||||
|
||||
ServiceException ex = assertThrows(ServiceException.class, () -> evaluator.evaluate(aggregate, context));
|
||||
|
||||
assertThat(ex.getCode()).isEqualTo(API_RATE_LIMIT_EXCEEDED.getCode());
|
||||
}
|
||||
|
||||
@Test
|
||||
void evaluate_should_wrap_redis_failure() {
|
||||
when(valueOperations.increment(any())).thenThrow(new DataAccessResourceFailureException("redis down"));
|
||||
|
||||
ApiInvocationContext context = ApiInvocationContext.create();
|
||||
|
||||
ServiceException ex = assertThrows(ServiceException.class, () -> evaluator.evaluate(aggregate, context));
|
||||
|
||||
assertThat(ex.getCode()).isEqualTo(API_RATE_LIMIT_EVALUATION_FAILED.getCode());
|
||||
}
|
||||
}
|
||||
@@ -16,11 +16,17 @@ import java.nio.charset.Charset;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.security.MessageDigest;
|
||||
import java.security.NoSuchAlgorithmException;
|
||||
import java.security.SecureRandom;
|
||||
import java.security.cert.X509Certificate;
|
||||
import java.time.Duration;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.Map;
|
||||
import java.util.TreeMap;
|
||||
import java.util.UUID;
|
||||
import javax.net.ssl.SSLContext;
|
||||
import javax.net.ssl.SSLParameters;
|
||||
import javax.net.ssl.TrustManager;
|
||||
import javax.net.ssl.X509TrustManager;
|
||||
|
||||
/**
|
||||
* 可直接运行的示例,演示如何使用 appId=test 与对应密钥调用本地 Databus API。
|
||||
@@ -28,13 +34,21 @@ import java.util.UUID;
|
||||
public final class DatabusApiInvocationExample {
|
||||
|
||||
public static final String TIMESTAMP = Long.toString(System.currentTimeMillis());
|
||||
private static final String APP_ID = "ztmy";
|
||||
private static final String APP_SECRET = "zFre/nTRGi7LpoFjN7oQkKeOT09x1fWTyIswrc702QQ=";
|
||||
|
||||
// private static final String APP_ID = "iwork";
|
||||
// private static final String APP_SECRET = "lpGXiNe/GMLk0vsbYGLa8eYxXq8tGhTbuu3/D4MJzIk=";
|
||||
private static final String APP_ID = "ztmy";
|
||||
private static final String APP_SECRET = "zFre/nTRGi7LpoFjN7oQkKeOT09x1fWTyIswrc702QQ=";
|
||||
private static final String ENCRYPTION_TYPE = CryptoSignatureUtils.ENCRYPT_TYPE_AES;
|
||||
private static final String TARGET_API = "http://172.16.46.63:30081/admin-api/databus/api/portal/callback/v1";
|
||||
private static final HttpClient HTTP_CLIENT = HttpClient.newBuilder()
|
||||
.connectTimeout(Duration.ofSeconds(5))
|
||||
.build();
|
||||
// private static final String TARGET_API = "http://172.16.46.63:30081/admin-api/databus/api/portal/callback/v1";
|
||||
// private static final String TARGET_API = "http://172.16.46.195:48080/admin-api/databus/api/portal/lgstOpenApi/v1";
|
||||
// private static final String TARGET_API = "http://172.16.46.195:48080/admin-api/databus/api/portal/lgstOpenApi/v1";
|
||||
private static final String TARGET_API = "https://jygk.chncopper.com:30078/admin-api/databus/api/portal/lgstOpenApi/v1";
|
||||
// private static final String TARGET_API = "http://localhost:48080/admin-api/databus/api/portal/callback/v1";
|
||||
// private static final String TARGET_API = "http://localhost:48080/admin-api/databus/api/portal/lgstOpenApi/v1";
|
||||
// private static final String TARGET_API = "http://localhost:48080/admin-api/databus/api/portal/testcbw/456";
|
||||
// ⚠️ 仅用于联调:信任所有证书 + 关闭主机名校验,生产环境请改为受信 CA 或自定义 truststore。
|
||||
private static final HttpClient HTTP_CLIENT = buildUnsafeHttpClient();
|
||||
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
|
||||
private static final PrintStream OUT = buildConsolePrintStream();
|
||||
public static final String ZT_APP_ID = "ZT-App-Id";
|
||||
@@ -47,6 +61,45 @@ public final class DatabusApiInvocationExample {
|
||||
private DatabusApiInvocationExample() {
|
||||
}
|
||||
|
||||
/**
|
||||
* 仅用于联调:信任所有证书并关闭主机名校验,生产环境请使用受信 CA 或自定义 truststore。
|
||||
*/
|
||||
private static HttpClient buildUnsafeHttpClient() {
|
||||
try {
|
||||
TrustManager[] trustAll = new TrustManager[]{
|
||||
new X509TrustManager() {
|
||||
@Override
|
||||
public void checkClientTrusted(X509Certificate[] chain, String authType) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void checkServerTrusted(X509Certificate[] chain, String authType) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public X509Certificate[] getAcceptedIssuers() {
|
||||
return new X509Certificate[0];
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
SSLContext sslContext = SSLContext.getInstance("TLS");
|
||||
sslContext.init(null, trustAll, new SecureRandom());
|
||||
|
||||
SSLParameters sslParameters = new SSLParameters();
|
||||
// 关闭主机名校验
|
||||
sslParameters.setEndpointIdentificationAlgorithm("");
|
||||
|
||||
return HttpClient.newBuilder()
|
||||
.sslContext(sslContext)
|
||||
.sslParameters(sslParameters)
|
||||
.connectTimeout(Duration.ofSeconds(5))
|
||||
.build();
|
||||
} catch (Exception ex) {
|
||||
throw new IllegalStateException("Failed to build unsafe HttpClient", ex);
|
||||
}
|
||||
}
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
OUT.println("=== GET 请求示例 ===");
|
||||
// executeGetExample();
|
||||
|
||||
@@ -0,0 +1,46 @@
|
||||
package com.zt.plat.module.databus.framework.integration.gateway.security;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
class GatewayJwtResolverTest {
|
||||
|
||||
private final ObjectMapper objectMapper = new ObjectMapper();
|
||||
|
||||
@Test
|
||||
void resolveJwtToken_should_pick_token_header_case_insensitive() {
|
||||
Map<String, Object> headers = new HashMap<>();
|
||||
headers.put("zT-AuTh-ToKeN", "abc123");
|
||||
|
||||
String token = GatewayJwtResolver.resolveJwtToken(headers, null, objectMapper);
|
||||
|
||||
assertThat(token).isEqualTo("abc123");
|
||||
}
|
||||
|
||||
@Test
|
||||
void resolveJwtToken_should_strip_bearer_prefix_and_array_values() {
|
||||
Map<String, Object> headers = new HashMap<>();
|
||||
headers.put("Authorization", new String[]{"Bearer XYZ.jwt"});
|
||||
|
||||
String token = GatewayJwtResolver.resolveJwtToken(headers, null, objectMapper);
|
||||
|
||||
assertThat(token).isEqualTo("XYZ.jwt");
|
||||
}
|
||||
|
||||
@Test
|
||||
void resolveJwtToken_should_parse_structured_json_string_and_fallback_to_query_params() {
|
||||
Map<String, Object> headers = new HashMap<>();
|
||||
headers.put("Authorization", "{\"token\":\"json-token\"}");
|
||||
Map<String, Object> query = Map.of("token", List.of("q1", "q2"));
|
||||
|
||||
String token = GatewayJwtResolver.resolveJwtToken(headers, query, objectMapper);
|
||||
|
||||
assertThat(token).isEqualTo("json-token");
|
||||
}
|
||||
}
|
||||
@@ -1,6 +1,7 @@
|
||||
DELETE FROM "databus_api_transform";
|
||||
DELETE FROM "databus_api_step";
|
||||
DELETE FROM "databus_api_definition";
|
||||
DELETE FROM "databus_api_definition_credential";
|
||||
DELETE FROM "databus_policy_rate_limit";
|
||||
DELETE FROM "databus_policy_audit";
|
||||
DELETE FROM "databus_api_flow_publish";
|
||||
|
||||
@@ -97,3 +97,16 @@ CREATE TABLE IF NOT EXISTS databus_api_flow_publish (
|
||||
updater VARCHAR(64),
|
||||
deleted BOOLEAN
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS databus_api_definition_credential (
|
||||
id BIGINT PRIMARY KEY,
|
||||
api_id BIGINT,
|
||||
credential_id BIGINT,
|
||||
app_id VARCHAR(255),
|
||||
tenant_id BIGINT,
|
||||
create_time TIMESTAMP,
|
||||
update_time TIMESTAMP,
|
||||
creator VARCHAR(64),
|
||||
updater VARCHAR(64),
|
||||
deleted BOOLEAN
|
||||
);
|
||||
|
||||
Reference in New Issue
Block a user