Merge remote-tracking branch 'origin/dev' into dev
This commit is contained in:
@@ -27,7 +27,7 @@ public class GatewayWebClientConfiguration {
|
||||
@Value("${databus.gateway.web-client.max-in-memory-size:20971520}") int maxInMemorySize,
|
||||
@Value("${databus.gateway.web-client.max-idle-time:45000}") long maxIdleTimeMillis,
|
||||
@Value("${databus.gateway.web-client.evict-in-background-interval:20000}") long evictInBackgroundMillis,
|
||||
@Value("${databus.gateway.web-client.connection-pool-enabled:true}") boolean connectionPoolEnabled) {
|
||||
@Value("${databus.gateway.web-client.connection-pool-enabled:false}") boolean connectionPoolEnabled) {
|
||||
this.maxInMemorySize = maxInMemorySize;
|
||||
this.maxIdleTimeMillis = maxIdleTimeMillis;
|
||||
this.evictInBackgroundMillis = evictInBackgroundMillis;
|
||||
|
||||
@@ -3,9 +3,12 @@ package com.zt.plat.module.databus.framework.integration.gateway.core;
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.zt.plat.framework.common.util.monitor.TracerUtils;
|
||||
import com.zt.plat.framework.common.util.servlet.ServletUtils;
|
||||
import com.zt.plat.module.databus.dal.dataobject.gateway.ApiAccessLogDO;
|
||||
import com.zt.plat.module.databus.framework.integration.gateway.model.ApiInvocationContext;
|
||||
import com.zt.plat.module.databus.framework.integration.gateway.security.CachedBodyHttpServletRequest;
|
||||
import com.zt.plat.module.databus.service.gateway.ApiAccessLogService;
|
||||
import jakarta.servlet.http.HttpServletRequest;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.http.HttpHeaders;
|
||||
@@ -20,8 +23,7 @@ import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
import java.time.LocalDateTime;
|
||||
import java.time.ZoneId;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.*;
|
||||
|
||||
/**
|
||||
* 将 API 调用上下文持久化为访问日志。
|
||||
@@ -33,6 +35,8 @@ public class ApiGatewayAccessLogger {
|
||||
|
||||
public static final String ATTR_LOG_ID = "ApiAccessLogId";
|
||||
public static final String ATTR_EXCEPTION_STACK = "ApiAccessLogExceptionStack";
|
||||
public static final String HEADER_ACCESS_LOG_ID = "X-Databus-AccessLog-Id";
|
||||
private static final String ATTR_REQUEST_START = "ApiAccessLogRequestStart";
|
||||
|
||||
private static final int MAX_TEXT_LENGTH = 4000;
|
||||
|
||||
@@ -44,24 +48,15 @@ public class ApiGatewayAccessLogger {
|
||||
*/
|
||||
public void onRequest(ApiInvocationContext context) {
|
||||
try {
|
||||
String traceId = TracerUtils.getTraceId();
|
||||
ApiAccessLogDO logDO = new ApiAccessLogDO();
|
||||
logDO.setTraceId(traceId);
|
||||
logDO.setApiCode(context.getApiCode());
|
||||
logDO.setApiVersion(context.getApiVersion());
|
||||
logDO.setRequestMethod(context.getHttpMethod());
|
||||
logDO.setRequestPath(context.getRequestPath());
|
||||
logDO.setRequestQuery(toJson(context.getRequestQueryParams()));
|
||||
logDO.setRequestHeaders(toJson(context.getRequestHeaders()));
|
||||
logDO.setRequestBody(toJson(context.getRequestBody()));
|
||||
logDO.setClientIp(firstNonBlank(context.getClientIp(),
|
||||
GatewayHeaderUtils.findFirstHeaderValue(context.getRequestHeaders(), "X-Forwarded-For")));
|
||||
logDO.setUserAgent(GatewayHeaderUtils.findFirstHeaderValue(context.getRequestHeaders(), HttpHeaders.USER_AGENT));
|
||||
logDO.setStatus(3); // 默认未知
|
||||
logDO.setRequestTime(toLocalDateTime(context.getRequestTime()));
|
||||
logDO.setTenantId(parseTenantId(context.getTenantId()));
|
||||
Long logId = apiAccessLogService.create(logDO);
|
||||
context.getAttributes().put(ATTR_LOG_ID, logId);
|
||||
ApiAccessLogDO logDO = buildRequestSnapshot(context);
|
||||
Long existingLogId = getLogId(context);
|
||||
if (existingLogId != null) {
|
||||
logDO.setId(existingLogId);
|
||||
apiAccessLogService.update(logDO);
|
||||
} else {
|
||||
Long logId = apiAccessLogService.create(logDO);
|
||||
context.getAttributes().put(ATTR_LOG_ID, logId);
|
||||
}
|
||||
} catch (Exception ex) {
|
||||
log.warn("记录 API 访问日志开始阶段失败, traceId={}", TracerUtils.getTraceId(), ex);
|
||||
}
|
||||
@@ -111,15 +106,97 @@ public class ApiGatewayAccessLogger {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 安全过滤阶段的第一时间记录请求元数据,保证被快速拒绝的请求也能查询。
|
||||
*/
|
||||
public Long logEntrance(HttpServletRequest request) {
|
||||
if (request == null) {
|
||||
return null;
|
||||
}
|
||||
Object existing = request.getAttribute(ATTR_LOG_ID);
|
||||
if (existing instanceof Long logId) {
|
||||
return logId;
|
||||
}
|
||||
try {
|
||||
ApiAccessLogDO logDO = new ApiAccessLogDO();
|
||||
logDO.setTraceId(TracerUtils.getTraceId());
|
||||
logDO.setRequestMethod(request.getMethod());
|
||||
logDO.setRequestPath(request.getRequestURI());
|
||||
logDO.setRequestQuery(truncate(request.getQueryString()));
|
||||
logDO.setRequestHeaders(toJson(collectHeaders(request)));
|
||||
logDO.setClientIp(ServletUtils.getClientIP(request));
|
||||
logDO.setUserAgent(request.getHeader(HttpHeaders.USER_AGENT));
|
||||
logDO.setStatus(3);
|
||||
logDO.setRequestTime(LocalDateTime.now());
|
||||
Long logId = apiAccessLogService.create(logDO);
|
||||
request.setAttribute(ATTR_LOG_ID, logId);
|
||||
request.setAttribute(ATTR_REQUEST_START, Instant.now());
|
||||
return logId;
|
||||
} catch (Exception ex) {
|
||||
log.warn("记录入口 API 访问日志失败", ex);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 编排前即结束的请求在此补写状态码、耗时等关键信息。
|
||||
*/
|
||||
public void finalizeEarly(HttpServletRequest request, int status, String message) {
|
||||
if (request == null) {
|
||||
return;
|
||||
}
|
||||
Object existing = request.getAttribute(ATTR_LOG_ID);
|
||||
if (!(existing instanceof Long logId)) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
ApiAccessLogDO update = new ApiAccessLogDO();
|
||||
update.setId(logId);
|
||||
update.setResponseStatus(status);
|
||||
update.setResponseMessage(truncate(message));
|
||||
update.setStatus(resolveStatus(status));
|
||||
update.setResponseTime(LocalDateTime.now());
|
||||
update.setDuration(calculateDuration(request));
|
||||
apiAccessLogService.update(update);
|
||||
} catch (Exception ex) {
|
||||
log.warn("更新入口 API 访问日志失败, logId={}", logId, ex);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 将入口阶段生成的 logId 通过请求头继续传递,供后续流程关联合并。
|
||||
*/
|
||||
public static void propagateLogIdHeader(CachedBodyHttpServletRequest requestWrapper, Long logId) {
|
||||
if (requestWrapper == null || logId == null) {
|
||||
return;
|
||||
}
|
||||
requestWrapper.setHeader(HEADER_ACCESS_LOG_ID, String.valueOf(logId));
|
||||
}
|
||||
|
||||
private Long getLogId(ApiInvocationContext context) {
|
||||
Object value = context.getAttributes().get(ATTR_LOG_ID);
|
||||
if (value instanceof Long) {
|
||||
return (Long) value;
|
||||
}
|
||||
if (value instanceof Number number) {
|
||||
return number.longValue();
|
||||
}
|
||||
return null;
|
||||
return value instanceof Long ? (Long) value : null;
|
||||
}
|
||||
|
||||
/**
|
||||
* 根据编排上下文构建请求侧快照,用于访问日志首段信息。
|
||||
*/
|
||||
private ApiAccessLogDO buildRequestSnapshot(ApiInvocationContext context) {
|
||||
ApiAccessLogDO logDO = new ApiAccessLogDO();
|
||||
logDO.setTraceId(TracerUtils.getTraceId());
|
||||
logDO.setApiCode(context.getApiCode());
|
||||
logDO.setApiVersion(context.getApiVersion());
|
||||
logDO.setRequestMethod(context.getHttpMethod());
|
||||
logDO.setRequestPath(context.getRequestPath());
|
||||
logDO.setRequestQuery(toJson(context.getRequestQueryParams()));
|
||||
logDO.setRequestHeaders(toJson(context.getRequestHeaders()));
|
||||
logDO.setRequestBody(toJson(context.getRequestBody()));
|
||||
logDO.setClientIp(context.getClientIp());
|
||||
logDO.setUserAgent(GatewayHeaderUtils.findFirstHeaderValue(context.getRequestHeaders(), HttpHeaders.USER_AGENT));
|
||||
logDO.setStatus(3);
|
||||
logDO.setRequestTime(toLocalDateTime(context.getRequestTime()));
|
||||
logDO.setTenantId(parseTenantId(context.getTenantId()));
|
||||
return logDO;
|
||||
}
|
||||
|
||||
private Long calculateDuration(ApiInvocationContext context) {
|
||||
@@ -130,6 +207,14 @@ public class ApiGatewayAccessLogger {
|
||||
return Duration.between(start, Instant.now()).toMillis();
|
||||
}
|
||||
|
||||
private Long calculateDuration(HttpServletRequest request) {
|
||||
Object startAttr = request.getAttribute(ATTR_REQUEST_START);
|
||||
if (startAttr instanceof Instant start) {
|
||||
return Duration.between(start, Instant.now()).toMillis();
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
private Integer resolveStatus(Integer httpStatus) {
|
||||
if (httpStatus == null) {
|
||||
return 3;
|
||||
@@ -214,6 +299,19 @@ public class ApiGatewayAccessLogger {
|
||||
return extra;
|
||||
}
|
||||
|
||||
@SafeVarargs
|
||||
private <T> T firstNonNull(T... candidates) {
|
||||
if (candidates == null) {
|
||||
return null;
|
||||
}
|
||||
for (T candidate : candidates) {
|
||||
if (candidate != null) {
|
||||
return candidate;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
private String toJson(Object value) {
|
||||
if (value == null) {
|
||||
return null;
|
||||
@@ -265,27 +363,20 @@ public class ApiGatewayAccessLogger {
|
||||
}
|
||||
}
|
||||
|
||||
private String firstNonBlank(String... values) {
|
||||
if (values == null) {
|
||||
return null;
|
||||
private Map<String, String> collectHeaders(HttpServletRequest request) {
|
||||
if (request == null) {
|
||||
return Collections.emptyMap();
|
||||
}
|
||||
for (String value : values) {
|
||||
if (StringUtils.hasText(value)) {
|
||||
return value;
|
||||
Map<String, String> headers = new LinkedHashMap<>();
|
||||
Enumeration<String> names = request.getHeaderNames();
|
||||
while (names != null && names.hasMoreElements()) {
|
||||
String name = names.nextElement();
|
||||
Enumeration<String> values = request.getHeaders(name);
|
||||
if (values == null || !values.hasMoreElements()) {
|
||||
continue;
|
||||
}
|
||||
headers.put(name, values.nextElement());
|
||||
}
|
||||
return null;
|
||||
return headers;
|
||||
}
|
||||
|
||||
private Object firstNonNull(Object... values) {
|
||||
if (values == null) {
|
||||
return null;
|
||||
}
|
||||
for (Object value : values) {
|
||||
if (value != null) {
|
||||
return value;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -3,6 +3,7 @@ package com.zt.plat.module.databus.framework.integration.gateway.core;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.zt.plat.module.databus.framework.integration.config.ApiGatewayProperties;
|
||||
import com.zt.plat.module.databus.framework.integration.gateway.model.ApiInvocationContext;
|
||||
import com.zt.plat.module.databus.framework.integration.gateway.core.ApiGatewayAccessLogger;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.http.HttpHeaders;
|
||||
@@ -90,6 +91,7 @@ public class ApiGatewayRequestMapper {
|
||||
});
|
||||
context.setUserAgent(GatewayHeaderUtils.findFirstHeaderValue(context.getRequestHeaders(), HttpHeaders.USER_AGENT));
|
||||
context.setClientIp(resolveClientIp(headers, context.getRequestHeaders()));
|
||||
captureAccessLogId(context);
|
||||
populateQueryParams(headers, context, originalRequestUri);
|
||||
if (properties.isEnableTenantHeader()) {
|
||||
Object tenantHeaderValue = context.getRequestHeaders().get(properties.getTenantHeader());
|
||||
@@ -114,6 +116,21 @@ public class ApiGatewayRequestMapper {
|
||||
return context;
|
||||
}
|
||||
|
||||
private void captureAccessLogId(ApiInvocationContext context) {
|
||||
String headerValue = GatewayHeaderUtils.findFirstHeaderValue(context.getRequestHeaders(), ApiGatewayAccessLogger.HEADER_ACCESS_LOG_ID);
|
||||
if (!StringUtils.hasText(headerValue)) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
Long logId = Long.valueOf(headerValue);
|
||||
context.getAttributes().put(ApiGatewayAccessLogger.ATTR_LOG_ID, logId);
|
||||
} catch (NumberFormatException ex) {
|
||||
// 忽略格式问题,仅在属性中保留原文以便排查
|
||||
context.getAttributes().put(ApiGatewayAccessLogger.ATTR_LOG_ID, headerValue);
|
||||
}
|
||||
context.getRequestHeaders().remove(ApiGatewayAccessLogger.HEADER_ACCESS_LOG_ID);
|
||||
}
|
||||
|
||||
private boolean isInternalHeader(String headerName) {
|
||||
if (!StringUtils.hasText(headerName)) {
|
||||
return true;
|
||||
|
||||
@@ -13,6 +13,7 @@ import com.zt.plat.framework.tenant.core.context.TenantContextHolder;
|
||||
import com.zt.plat.framework.web.core.util.WebFrameworkUtils;
|
||||
import com.zt.plat.module.databus.dal.dataobject.gateway.ApiClientCredentialDO;
|
||||
import com.zt.plat.module.databus.framework.integration.config.ApiGatewayProperties;
|
||||
import com.zt.plat.module.databus.framework.integration.gateway.core.ApiGatewayAccessLogger;
|
||||
import com.zt.plat.module.databus.framework.integration.gateway.model.ApiGatewayResponse;
|
||||
import com.zt.plat.module.databus.service.gateway.ApiAnonymousUserService;
|
||||
import com.zt.plat.module.databus.service.gateway.ApiClientCredentialService;
|
||||
@@ -56,8 +57,10 @@ public class GatewaySecurityFilter extends OncePerRequestFilter {
|
||||
private final ApiClientCredentialService credentialService;
|
||||
private final ApiAnonymousUserService anonymousUserService;
|
||||
private final ObjectMapper objectMapper;
|
||||
private final ApiGatewayAccessLogger accessLogger;
|
||||
private final AntPathMatcher pathMatcher = new AntPathMatcher();
|
||||
private static final TypeReference<Map<String, Object>> MAP_TYPE = new TypeReference<>() {};
|
||||
private static final TypeReference<Map<String, Object>> MAP_TYPE = new TypeReference<>() {
|
||||
};
|
||||
|
||||
@Override
|
||||
protected void doFilterInternal(HttpServletRequest request, HttpServletResponse response, FilterChain filterChain)
|
||||
@@ -66,26 +69,38 @@ public class GatewaySecurityFilter extends OncePerRequestFilter {
|
||||
// 仅处理配置的 API 门户路径,不符合的请求直接放行
|
||||
boolean matchesPortalPath = properties.getAllBasePaths()
|
||||
.stream()
|
||||
.map(this::normalizeBasePath)
|
||||
.anyMatch(basePath -> pathMatcher.match(basePath + "/**", pathWithinApplication));
|
||||
if (!matchesPortalPath) {
|
||||
filterChain.doFilter(request, response);
|
||||
return;
|
||||
}
|
||||
// 校验访问 IP 是否落在允许范围内
|
||||
if (!isIpAllowed(request)) {
|
||||
log.warn("[API-PORTAL] 拦截来自 IP {} 访问 {} 的请求", request.getRemoteAddr(), pathWithinApplication);
|
||||
response.sendError(HttpStatus.FORBIDDEN.value(), "IP 禁止访问");
|
||||
return;
|
||||
}
|
||||
Long accessLogId = null;
|
||||
ApiGatewayProperties.Security security = properties.getSecurity();
|
||||
ApiClientCredentialDO credential = null;
|
||||
if (!security.isEnabled()) {
|
||||
filterChain.doFilter(request, response);
|
||||
return;
|
||||
}
|
||||
Long tenantId = null;
|
||||
boolean dispatchedToGateway = false;
|
||||
try {
|
||||
Long tenantId = resolveTenantId(request);
|
||||
tenantId = resolveTenantId(request);
|
||||
if (tenantId != null) {
|
||||
// 绑定租户上下文,保证访问日志与后续数据库操作可写入 tenantId
|
||||
TenantContextHolder.setTenantId(tenantId);
|
||||
}
|
||||
if (!isIpAllowed(request)) {
|
||||
log.warn("[API-PORTAL] 拦截来自 IP {} 访问 {} 的请求", request.getRemoteAddr(), pathWithinApplication);
|
||||
accessLogId = accessLogger.logEntrance(request);
|
||||
response.sendError(HttpStatus.FORBIDDEN.value(), "IP 禁止访问");
|
||||
accessLogger.finalizeEarly(request, HttpStatus.FORBIDDEN.value(), "IP 禁止访问");
|
||||
return;
|
||||
}
|
||||
// IP 校验通过后再补录入口日志,避免无租户信息写库
|
||||
accessLogId = accessLogger.logEntrance(request);
|
||||
if (!security.isEnabled()) {
|
||||
byte[] originalBody = StreamUtils.copyToByteArray(request.getInputStream());
|
||||
CachedBodyHttpServletRequest passthroughRequest = new CachedBodyHttpServletRequest(request, originalBody);
|
||||
ApiGatewayAccessLogger.propagateLogIdHeader(passthroughRequest, accessLogId);
|
||||
filterChain.doFilter(passthroughRequest, response);
|
||||
return;
|
||||
}
|
||||
// 从请求头解析 appId 并加载客户端凭证,包含匿名访问配置
|
||||
String appId = requireHeader(request, APP_ID_HEADER, "缺少应用标识");
|
||||
credential = credentialService.findActiveCredential(appId)
|
||||
@@ -118,6 +133,7 @@ public class GatewaySecurityFilter extends OncePerRequestFilter {
|
||||
|
||||
// 使用可重复读取的请求包装,供后续过滤器继续消费
|
||||
CachedBodyHttpServletRequest securedRequest = new CachedBodyHttpServletRequest(request, decryptedBody);
|
||||
ApiGatewayAccessLogger.propagateLogIdHeader(securedRequest, accessLogId);
|
||||
if (StringUtils.hasText(request.getCharacterEncoding())) {
|
||||
securedRequest.setCharacterEncoding(request.getCharacterEncoding());
|
||||
}
|
||||
@@ -129,16 +145,33 @@ public class GatewaySecurityFilter extends OncePerRequestFilter {
|
||||
ContentCachingResponseWrapper responseWrapper = new ContentCachingResponseWrapper(response);
|
||||
try {
|
||||
filterChain.doFilter(securedRequest, responseWrapper);
|
||||
dispatchedToGateway = true;
|
||||
encryptResponse(responseWrapper, credential, security);
|
||||
} finally {
|
||||
responseWrapper.copyBodyToResponse();
|
||||
}
|
||||
} catch (SecurityValidationException ex) {
|
||||
if (accessLogId == null) {
|
||||
accessLogId = accessLogger.logEntrance(request);
|
||||
}
|
||||
log.warn("[API-PORTAL] 安全校验失败: {}", ex.getMessage());
|
||||
writeErrorResponse(response, security, credential, ex.status(), ex.getMessage());
|
||||
if (!dispatchedToGateway) {
|
||||
accessLogger.finalizeEarly(request, ex.status().value(), ex.getMessage());
|
||||
}
|
||||
} catch (Exception ex) {
|
||||
if (accessLogId == null) {
|
||||
accessLogId = accessLogger.logEntrance(request);
|
||||
}
|
||||
log.error("[API-PORTAL] 处理安全校验时出现异常", ex);
|
||||
writeErrorResponse(response, security, credential, HttpStatus.INTERNAL_SERVER_ERROR, "网关安全校验失败");
|
||||
if (!dispatchedToGateway) {
|
||||
accessLogger.finalizeEarly(request, HttpStatus.INTERNAL_SERVER_ERROR.value(), "网关安全校验失败");
|
||||
}
|
||||
} finally {
|
||||
if (tenantId != null) {
|
||||
TenantContextHolder.clear();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -161,15 +194,6 @@ public class GatewaySecurityFilter extends OncePerRequestFilter {
|
||||
return requestUri;
|
||||
}
|
||||
|
||||
private String normalizeBasePath(String basePath) {
|
||||
String candidate = StringUtils.hasText(basePath) ? basePath : ApiGatewayProperties.DEFAULT_BASE_PATH;
|
||||
candidate = candidate.startsWith("/") ? candidate : "/" + candidate;
|
||||
if (candidate.endsWith("/")) {
|
||||
candidate = candidate.substring(0, candidate.length() - 1);
|
||||
}
|
||||
return candidate;
|
||||
}
|
||||
|
||||
private Long resolveTenantId(HttpServletRequest request) {
|
||||
if (!properties.isEnableTenantHeader()) {
|
||||
return null;
|
||||
@@ -257,10 +281,10 @@ public class GatewaySecurityFilter extends OncePerRequestFilter {
|
||||
|
||||
String signatureType = resolveSignatureType(credential, security);
|
||||
try {
|
||||
// boolean valid = CryptoSignatureUtils.verifySignature(signaturePayload, signatureType);
|
||||
// if (!valid) {
|
||||
// throw new SecurityValidationException(HttpStatus.UNAUTHORIZED, "签名校验失败");
|
||||
// }
|
||||
boolean valid = CryptoSignatureUtils.verifySignature(signaturePayload, signatureType);
|
||||
if (!valid) {
|
||||
throw new SecurityValidationException(HttpStatus.UNAUTHORIZED, "签名校验失败");
|
||||
}
|
||||
} catch (IllegalArgumentException ex) {
|
||||
throw new SecurityValidationException(HttpStatus.INTERNAL_SERVER_ERROR, "签名算法配置异常");
|
||||
}
|
||||
@@ -290,7 +314,6 @@ public class GatewaySecurityFilter extends OncePerRequestFilter {
|
||||
});
|
||||
} catch (IllegalArgumentException ex) {
|
||||
log.debug("[API-PORTAL] 解析查询串 {} 失败", queryString, ex);
|
||||
target.put("query", queryString);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -305,7 +328,7 @@ public class GatewaySecurityFilter extends OncePerRequestFilter {
|
||||
if (bodyText.startsWith("{")) {
|
||||
try {
|
||||
Map<String, Object> bodyMap = objectMapper.readValue(bodyText, MAP_TYPE);
|
||||
bodyMap.forEach((key, value) -> target.put(key, normalizeValue(value)));
|
||||
target.putAll(bodyMap);
|
||||
return;
|
||||
} catch (JsonProcessingException ex) {
|
||||
log.debug("[API-PORTAL] 解析请求体 JSON 失败", ex);
|
||||
@@ -314,20 +337,6 @@ public class GatewaySecurityFilter extends OncePerRequestFilter {
|
||||
target.put("body", bodyText);
|
||||
}
|
||||
|
||||
private Object normalizeValue(Object value) {
|
||||
if (value == null) {
|
||||
return null;
|
||||
}
|
||||
if (value instanceof Map || value instanceof List) {
|
||||
try {
|
||||
return objectMapper.writeValueAsString(value);
|
||||
} catch (JsonProcessingException ex) {
|
||||
return value.toString();
|
||||
}
|
||||
}
|
||||
return value;
|
||||
}
|
||||
|
||||
private String resolveEncryptionType(ApiClientCredentialDO credential, ApiGatewayProperties.Security security) {
|
||||
if (credential != null && StringUtils.hasText(credential.getEncryptionType())) {
|
||||
return credential.getEncryptionType();
|
||||
@@ -465,12 +474,12 @@ public class GatewaySecurityFilter extends OncePerRequestFilter {
|
||||
response.resetBuffer();
|
||||
response.setStatus(status.value());
|
||||
String resolvedMessage = StringUtils.hasText(message) ? message : status.getReasonPhrase();
|
||||
String traceId = TracerUtils.getTraceId();
|
||||
ApiGatewayResponse envelope = ApiGatewayResponse.builder()
|
||||
String traceId = TracerUtils.getTraceId();
|
||||
ApiGatewayResponse envelope = ApiGatewayResponse.builder()
|
||||
.code(status.value())
|
||||
.message(resolvedMessage)
|
||||
.response(null)
|
||||
.traceId(traceId)
|
||||
.traceId(traceId)
|
||||
.build();
|
||||
if (shouldEncryptErrorResponse(security, credential)) {
|
||||
String encryptionKey = credential.getEncryptionKey();
|
||||
|
||||
@@ -393,8 +393,13 @@ public class HttpStepHandler implements ApiStepHandler {
|
||||
}
|
||||
|
||||
private boolean supportsRequestBody(HttpMethod method) {
|
||||
// 所有请求都要传递请求体
|
||||
return true;
|
||||
if (method == null) {
|
||||
return true;
|
||||
}
|
||||
return !(HttpMethod.GET.equals(method)
|
||||
|| HttpMethod.HEAD.equals(method)
|
||||
|| HttpMethod.OPTIONS.equals(method)
|
||||
|| HttpMethod.TRACE.equals(method));
|
||||
}
|
||||
|
||||
private Mono<Object> applyResilientRetry(Mono<Object> responseMono, ApiStepDefinition stepDefinition) {
|
||||
|
||||
@@ -2,8 +2,11 @@ package com.zt.plat.module.databus.controller.admin.gateway;
|
||||
|
||||
import com.zt.plat.module.databus.controller.admin.gateway.vo.ApiGatewayInvokeReqVO;
|
||||
import com.zt.plat.module.databus.framework.integration.gateway.core.ApiGatewayExecutionService;
|
||||
import com.zt.plat.module.databus.framework.integration.gateway.core.IntegrationFlowManager;
|
||||
import com.zt.plat.module.databus.framework.integration.gateway.model.ApiGatewayResponse;
|
||||
import com.zt.plat.module.databus.framework.integration.config.ApiGatewayProperties;
|
||||
import com.zt.plat.module.databus.framework.integration.gateway.core.ApiGatewayAccessLogger;
|
||||
import com.zt.plat.module.databus.service.gateway.ApiAnonymousUserService;
|
||||
import com.zt.plat.module.databus.service.gateway.ApiClientCredentialService;
|
||||
import com.zt.plat.module.databus.service.gateway.ApiDefinitionService;
|
||||
import org.junit.jupiter.api.Test;
|
||||
@@ -48,8 +51,17 @@ class ApiGatewayControllerTest {
|
||||
@MockBean
|
||||
private StringRedisTemplate stringRedisTemplate;
|
||||
|
||||
@MockBean
|
||||
private ApiClientCredentialService apiClientCredentialService;
|
||||
@MockBean
|
||||
private IntegrationFlowManager integrationFlowManager;
|
||||
|
||||
@MockBean
|
||||
private ApiClientCredentialService apiClientCredentialService;
|
||||
|
||||
@MockBean
|
||||
private ApiAnonymousUserService apiAnonymousUserService;
|
||||
|
||||
@MockBean
|
||||
private ApiGatewayAccessLogger apiGatewayAccessLogger;
|
||||
|
||||
@Test
|
||||
void invokeShouldReturnGatewayEnvelope() throws Exception {
|
||||
|
||||
@@ -28,16 +28,10 @@ import java.util.UUID;
|
||||
public final class DatabusApiInvocationExample {
|
||||
|
||||
public static final String TIMESTAMP = Long.toString(System.currentTimeMillis());
|
||||
// private static final String APP_ID = "ztmy";
|
||||
// private static final String APP_SECRET = "zFre/nTRGi7LpoFjN7oQkKeOT09x1fWTyIswrc702QQ=";
|
||||
// private static final String APP_ID = "test";
|
||||
// private static final String APP_SECRET = "RSYtKXrXPLMy3oeh0cOro6QCioRUgqfnKCkDkNq78sI=";
|
||||
private static final String APP_ID = "testAnnoy";
|
||||
private static final String APP_SECRET = "jyGCymUjCFL2i3a4Tm3qBIkUrUl4ZgKPYvOU/47ZWcM=";
|
||||
private static final String APP_ID = "ztmy";
|
||||
private static final String APP_SECRET = "zFre/nTRGi7LpoFjN7oQkKeOT09x1fWTyIswrc702QQ=";
|
||||
private static final String ENCRYPTION_TYPE = CryptoSignatureUtils.ENCRYPT_TYPE_AES;
|
||||
// private static final String TARGET_API = "http://172.16.46.63:30081/admin-api/databus/api/portal/lgstOpenApi/v1";
|
||||
private static final String TARGET_API = "http://127.0.0.1:48080/admin-api/databus/api/portal/test/1";
|
||||
// private static final String TARGET_API = "http://127.0.0.1:48080/admin-api/databus/api/portal/lgstOpenApi/v1";
|
||||
private static final String TARGET_API = "http://172.16.46.63:30081/admin-api/databus/api/portal/callback/v1";
|
||||
private static final HttpClient HTTP_CLIENT = HttpClient.newBuilder()
|
||||
.connectTimeout(Duration.ofSeconds(5))
|
||||
.build();
|
||||
@@ -88,8 +82,11 @@ public final class DatabusApiInvocationExample {
|
||||
Map<String, Object> queryParams = new LinkedHashMap<>();
|
||||
|
||||
long extraTimestamp = 1761556157185L;
|
||||
// String bodyJson = String.format("""
|
||||
// {"operateFlag":"I","__interfaceType__":"R_MY_JY_03","data":{"endAddressName":"1","customerCompanyName":"中铜国贸","endAddressDetail":"测试地址","remark":" ","custSuppType":"1","shipperCompanyName":"中铜国贸","consigneeCorpCode":" ","consignerContactPhone":" 11","importFlag":"10","businessSupplierCode":" ","entrustMainCode":"WT3162251027027","endAddressCode":" ","specifyCarrierCorpCode":"10086689","materDetail":[{"detailStatus":"10","batchNo":"ZLTD2510ZTGM0017001","measureCodeMdm":"CU032110001","packType":" ","quantityPlanDetail":1,"deliveryOrderNo":"ZLTD2510ZTGM0017001","measureCode":"CU032110001","goodsSpecification":" ","measureUnitCode":"PAC","entrustDetailCode":"WT3162251027027001","brand":" ","soNumber":"68ecf0055502d565d22b378a"}],"operateFlag":1,"custSuppName":"上海锦生金属有限公司","startAddressCode":" ","planStartTime":1761556166000,"customerCompanyCode":0,"importMethod":"EXW","startAddressType":"10","shipperCompanyCode":"3162","deliverCondition":"20","businessSupplierName":" ","startAddressDetail":" 111","transType":"30","endAddressType":"20","planEndTime":1761556166000,"specifyCarrierCorpName":null,"custSuppFlag":"0101","businessType":"20","consigneeCorpName":" ","custSuppCode":"10086689","startAddressName":" 111","consignerContactName":" 11"},"datetime":"20251027170929","busiBillCode":"WT3162251027027","system":"BRMS","__requestId__":"f918841c-14fb-49eb-9640-c5d1b3d46bd1"}
|
||||
// """, extraTimestamp);
|
||||
String bodyJson = String.format("""
|
||||
{"operateFlag":"I","__interfaceType__":"R_MY_JY_03","data":{"endAddressName":"1","customerCompanyName":"中铜国贸","endAddressDetail":"测试地址","remark":" ","custSuppType":"1","shipperCompanyName":"中铜国贸","consigneeCorpCode":" ","consignerContactPhone":" 11","importFlag":"10","businessSupplierCode":" ","entrustMainCode":"WT3162251027027","endAddressCode":" ","specifyCarrierCorpCode":"10086689","materDetail":[{"detailStatus":"10","batchNo":"ZLTD2510ZTGM0017001","measureCodeMdm":"CU032110001","packType":" ","quantityPlanDetail":1,"deliveryOrderNo":"ZLTD2510ZTGM0017001","measureCode":"CU032110001","goodsSpecification":" ","measureUnitCode":"PAC","entrustDetailCode":"WT3162251027027001","brand":" ","soNumber":"68ecf0055502d565d22b378a"}],"operateFlag":1,"custSuppName":"上海锦生金属有限公司","startAddressCode":" ","planStartTime":1761556166000,"customerCompanyCode":0,"importMethod":"EXW","startAddressType":"10","shipperCompanyCode":"3162","deliverCondition":"20","businessSupplierName":" ","startAddressDetail":" 111","transType":"30","endAddressType":"20","planEndTime":1761556166000,"specifyCarrierCorpName":null,"custSuppFlag":"0101","businessType":"20","consigneeCorpName":" ","custSuppCode":"10086689","startAddressName":" 111","consignerContactName":" 11"},"datetime":"20251027170929","busiBillCode":"WT3162251027027","system":"BRMS","__requestId__":"f918841c-14fb-49eb-9640-c5d1b3d46bd1"}
|
||||
{}
|
||||
""", extraTimestamp);
|
||||
|
||||
Map<String, Object> bodyParams = parseBodyJson(bodyJson);
|
||||
|
||||
@@ -9,6 +9,7 @@ import com.zt.plat.framework.security.core.util.SecurityFrameworkUtils;
|
||||
import com.zt.plat.framework.web.core.util.WebFrameworkUtils;
|
||||
import com.zt.plat.module.databus.dal.dataobject.gateway.ApiClientCredentialDO;
|
||||
import com.zt.plat.module.databus.framework.integration.config.ApiGatewayProperties;
|
||||
import com.zt.plat.module.databus.framework.integration.gateway.core.ApiGatewayAccessLogger;
|
||||
import com.zt.plat.module.databus.service.gateway.ApiAnonymousUserService;
|
||||
import com.zt.plat.module.databus.service.gateway.ApiClientCredentialService;
|
||||
import jakarta.servlet.http.HttpServletRequest;
|
||||
@@ -25,7 +26,9 @@ import org.springframework.security.core.context.SecurityContextHolder;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.TreeMap;
|
||||
import java.util.UUID;
|
||||
|
||||
import static com.zt.plat.module.databus.framework.integration.config.ApiGatewayProperties.*;
|
||||
@@ -47,11 +50,12 @@ class GatewaySecurityFilterTest {
|
||||
ApiGatewayProperties properties = createProperties();
|
||||
properties.getSecurity().setEnabled(false);
|
||||
StringRedisTemplate redisTemplate = mock(StringRedisTemplate.class);
|
||||
ApiClientCredentialService credentialService = mock(ApiClientCredentialService.class);
|
||||
ApiAnonymousUserService anonymousUserService = mock(ApiAnonymousUserService.class);
|
||||
when(anonymousUserService.issueAccessToken(any())).thenReturn(Optional.empty());
|
||||
ApiClientCredentialService credentialService = mock(ApiClientCredentialService.class);
|
||||
ApiAnonymousUserService anonymousUserService = mock(ApiAnonymousUserService.class);
|
||||
when(anonymousUserService.issueAccessToken(any())).thenReturn(Optional.empty());
|
||||
GatewaySecurityFilter filter = new GatewaySecurityFilter(properties, redisTemplate, credentialService, anonymousUserService, new ObjectMapper());
|
||||
ApiGatewayAccessLogger accessLogger = mock(ApiGatewayAccessLogger.class);
|
||||
when(accessLogger.logEntrance(any())).thenReturn(1L);
|
||||
GatewaySecurityFilter filter = new GatewaySecurityFilter(properties, redisTemplate, credentialService, anonymousUserService, new ObjectMapper(), accessLogger);
|
||||
|
||||
MockHttpServletRequest request = new MockHttpServletRequest("GET", "/admin-api/databus/api/portal/demo/v1");
|
||||
request.setRemoteAddr("127.0.0.1");
|
||||
@@ -74,7 +78,9 @@ class GatewaySecurityFilterTest {
|
||||
ApiClientCredentialService credentialService = mock(ApiClientCredentialService.class);
|
||||
ApiAnonymousUserService anonymousUserService = mock(ApiAnonymousUserService.class);
|
||||
when(anonymousUserService.issueAccessToken(any())).thenReturn(Optional.empty());
|
||||
GatewaySecurityFilter filter = new GatewaySecurityFilter(properties, redisTemplate, credentialService, anonymousUserService, new ObjectMapper());
|
||||
ApiGatewayAccessLogger accessLogger = mock(ApiGatewayAccessLogger.class);
|
||||
when(accessLogger.logEntrance(any())).thenReturn(1L);
|
||||
GatewaySecurityFilter filter = new GatewaySecurityFilter(properties, redisTemplate, credentialService, anonymousUserService, new ObjectMapper(), accessLogger);
|
||||
|
||||
MockHttpServletRequest request = new MockHttpServletRequest("GET", "/admin-api/databus/api/portal/demo/v1");
|
||||
request.setRemoteAddr("10.0.0.1");
|
||||
@@ -97,6 +103,8 @@ class GatewaySecurityFilterTest {
|
||||
|
||||
ApiClientCredentialService credentialService = mock(ApiClientCredentialService.class);
|
||||
ApiAnonymousUserService anonymousUserService = mock(ApiAnonymousUserService.class);
|
||||
ApiGatewayAccessLogger accessLogger = mock(ApiGatewayAccessLogger.class);
|
||||
when(accessLogger.logEntrance(any())).thenReturn(1L);
|
||||
when(anonymousUserService.issueAccessToken(any())).thenReturn(Optional.empty());
|
||||
ApiClientCredentialDO credential = new ApiClientCredentialDO();
|
||||
credential.setAppId("demo-app");
|
||||
@@ -108,13 +116,13 @@ class GatewaySecurityFilterTest {
|
||||
properties.getSecurity().setRequireBodyEncryption(false);
|
||||
properties.getSecurity().setEncryptResponse(false);
|
||||
|
||||
GatewaySecurityFilter filter = new GatewaySecurityFilter(properties, redisTemplate, credentialService, anonymousUserService, new ObjectMapper());
|
||||
GatewaySecurityFilter filter = new GatewaySecurityFilter(properties, redisTemplate, credentialService, anonymousUserService, new ObjectMapper(), accessLogger);
|
||||
|
||||
MockHttpServletRequest request = new MockHttpServletRequest("GET", "/admin-api/databus/api/portal/demo/v1");
|
||||
request.setRemoteAddr("127.0.0.1");
|
||||
long timestamp = System.currentTimeMillis();
|
||||
String nonce = UUID.randomUUID().toString().replaceAll("-", "");
|
||||
String signature = signatureForApp("demo-app");
|
||||
String signature = signatureForApp("demo-app", timestamp);
|
||||
request.addHeader(APP_ID_HEADER, "demo-app");
|
||||
request.addHeader(TIMESTAMP_HEADER, String.valueOf(timestamp));
|
||||
request.addHeader(NONCE_HEADER, nonce);
|
||||
@@ -142,13 +150,15 @@ class GatewaySecurityFilterTest {
|
||||
|
||||
ApiClientCredentialService credentialService = mock(ApiClientCredentialService.class);
|
||||
ApiAnonymousUserService anonymousUserService = mock(ApiAnonymousUserService.class);
|
||||
ApiGatewayAccessLogger accessLogger = mock(ApiGatewayAccessLogger.class);
|
||||
when(accessLogger.logEntrance(any())).thenReturn(1L);
|
||||
ApiClientCredentialDO credential = new ApiClientCredentialDO();
|
||||
credential.setAppId("demo-app");
|
||||
credential.setEncryptionKey("demo-secret-key");
|
||||
credential.setEncryptionType(CryptoSignatureUtils.ENCRYPT_TYPE_AES);
|
||||
when(credentialService.findActiveCredential("demo-app")).thenReturn(Optional.of(credential));
|
||||
|
||||
GatewaySecurityFilter filter = new GatewaySecurityFilter(properties, redisTemplate, credentialService, anonymousUserService, new ObjectMapper());
|
||||
GatewaySecurityFilter filter = new GatewaySecurityFilter(properties, redisTemplate, credentialService, anonymousUserService, new ObjectMapper(), accessLogger);
|
||||
|
||||
MockHttpServletRequest request = new MockHttpServletRequest("GET", "/admin-api/databus/api/portal/demo/v1");
|
||||
request.setRemoteAddr("127.0.0.1");
|
||||
@@ -183,6 +193,8 @@ class GatewaySecurityFilterTest {
|
||||
|
||||
ApiClientCredentialService credentialService = mock(ApiClientCredentialService.class);
|
||||
ApiAnonymousUserService anonymousUserService = mock(ApiAnonymousUserService.class);
|
||||
ApiGatewayAccessLogger accessLogger = mock(ApiGatewayAccessLogger.class);
|
||||
when(accessLogger.logEntrance(any())).thenReturn(1L);
|
||||
ApiClientCredentialDO credential = new ApiClientCredentialDO();
|
||||
credential.setAppId("demo-app");
|
||||
credential.setSignatureType(null);
|
||||
@@ -200,7 +212,7 @@ class GatewaySecurityFilterTest {
|
||||
when(anonymousUserService.find(99L)).thenReturn(Optional.of(details));
|
||||
when(anonymousUserService.issueAccessToken(details)).thenReturn(Optional.of("mock-token"));
|
||||
|
||||
GatewaySecurityFilter filter = new GatewaySecurityFilter(properties, redisTemplate, credentialService, anonymousUserService, new ObjectMapper());
|
||||
GatewaySecurityFilter filter = new GatewaySecurityFilter(properties, redisTemplate, credentialService, anonymousUserService, new ObjectMapper(), accessLogger);
|
||||
|
||||
MockHttpServletRequest request = new MockHttpServletRequest("GET", "/admin-api/databus/api/portal/demo/v1");
|
||||
request.setRemoteAddr("127.0.0.1");
|
||||
@@ -209,7 +221,7 @@ class GatewaySecurityFilterTest {
|
||||
request.addHeader(APP_ID_HEADER, "demo-app");
|
||||
request.addHeader(TIMESTAMP_HEADER, String.valueOf(timestamp));
|
||||
request.addHeader(NONCE_HEADER, nonce);
|
||||
request.addHeader(SIGNATURE_HEADER, signatureForApp("demo-app"));
|
||||
request.addHeader(SIGNATURE_HEADER, signatureForApp("demo-app", timestamp));
|
||||
|
||||
MockHttpServletResponse response = new MockHttpServletResponse();
|
||||
MockFilterChain chain = new MockFilterChain();
|
||||
@@ -231,7 +243,20 @@ class GatewaySecurityFilterTest {
|
||||
return properties;
|
||||
}
|
||||
|
||||
private String signatureForApp(String appId) {
|
||||
return SecureUtil.md5("appId=" + appId);
|
||||
private String signatureForApp(String appId, long timestamp) {
|
||||
Map<String, Object> payload = new TreeMap<>();
|
||||
payload.put(APP_ID_HEADER, appId);
|
||||
payload.put(TIMESTAMP_HEADER, String.valueOf(timestamp));
|
||||
StringBuilder sb = new StringBuilder();
|
||||
payload.forEach((key, value) -> {
|
||||
if (value == null) {
|
||||
return;
|
||||
}
|
||||
if (sb.length() > 0) {
|
||||
sb.append('&');
|
||||
}
|
||||
sb.append(key).append('=').append(value);
|
||||
});
|
||||
return SecureUtil.md5(sb.toString());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,360 +0,0 @@
|
||||
package com.zt.plat.module.databus.framework.integration.gateway.step.impl;
|
||||
|
||||
import com.zt.plat.framework.common.exception.ServiceException;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.springframework.http.MediaType;
|
||||
import org.springframework.http.client.reactive.ReactorClientHttpConnector;
|
||||
import org.springframework.web.reactive.function.client.WebClient;
|
||||
import reactor.core.publisher.Mono;
|
||||
import reactor.netty.http.client.HttpClient;
|
||||
import reactor.netty.http.client.PrematureCloseException;
|
||||
import reactor.netty.resources.ConnectionProvider;
|
||||
import reactor.util.retry.Retry;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStreamReader;
|
||||
import java.io.OutputStream;
|
||||
import java.net.*;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.time.Duration;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.*;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
/**
|
||||
* Demonstrates the stale-connection scenario using the legacy vs the deferred retry pipeline.
|
||||
*/
|
||||
class HttpStepHandlerConnectionResetScenarioTest {
|
||||
|
||||
private static final Duration RETRY_DELAY = Duration.ofMillis(200);
|
||||
private static final int RETRY_ATTEMPTS = 3;
|
||||
private static final Duration BLOCK_TIMEOUT = Duration.ofSeconds(5);
|
||||
private static final Duration RESET_WAIT = Duration.ofMillis(300);
|
||||
|
||||
@Test
|
||||
void legacyPipelineLosesSuccessfulRetry() throws Exception {
|
||||
try (ResetOnceHttpServer server = new ResetOnceHttpServer()) {
|
||||
WebClient webClient = createWebClient();
|
||||
URI uri = server.uri("/demo");
|
||||
|
||||
warmUp(server, webClient, uri);
|
||||
server.awaitWarmupConnectionReset(RESET_WAIT);
|
||||
|
||||
legacyInvoke(webClient, uri, Map.of("mode", "legacy"));
|
||||
|
||||
server.awaitFreshResponses(1, Duration.ofSeconds(2));
|
||||
assertThat(server.getFreshResponseCount()).isEqualTo(1);
|
||||
assertThat(server.getServedBodies()).contains("reset", "fresh");
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
void deferredPipelinePropagatesSuccessfulRetry() throws Exception {
|
||||
try (ResetOnceHttpServer server = new ResetOnceHttpServer()) {
|
||||
WebClient webClient = createWebClient();
|
||||
URI uri = server.uri("/demo");
|
||||
|
||||
warmUp(server, webClient, uri);
|
||||
server.awaitWarmupConnectionReset(RESET_WAIT);
|
||||
|
||||
Object result = deferredInvoke(webClient, uri, Map.of("mode", "defer"));
|
||||
assertThat(result).isInstanceOf(Map.class);
|
||||
Map<?, ?> resultMap = (Map<?, ?>) result;
|
||||
assertThat(resultMap.get("stage")).isEqualTo("fresh");
|
||||
|
||||
server.awaitFreshResponses(1, Duration.ofSeconds(2));
|
||||
assertThat(server.getFreshResponseCount()).isEqualTo(1);
|
||||
assertThat(server.getServedBodies()).contains("reset", "fresh");
|
||||
}
|
||||
}
|
||||
|
||||
private WebClient createWebClient() {
|
||||
ConnectionProvider provider = ConnectionProvider.builder("http-step-handler-demo")
|
||||
.maxConnections(1)
|
||||
.pendingAcquireMaxCount(-1)
|
||||
.maxIdleTime(Duration.ofSeconds(5))
|
||||
.build();
|
||||
HttpClient httpClient = HttpClient.create(provider).compress(true);
|
||||
return WebClient.builder()
|
||||
.clientConnector(new ReactorClientHttpConnector(httpClient))
|
||||
.build();
|
||||
}
|
||||
|
||||
private void warmUp(ResetOnceHttpServer server, WebClient webClient, URI uri) {
|
||||
webClient.post()
|
||||
.uri(uri)
|
||||
.contentType(MediaType.APPLICATION_JSON)
|
||||
.accept(MediaType.APPLICATION_JSON)
|
||||
.bodyValue(Map.of("warm", true))
|
||||
.retrieve()
|
||||
.bodyToMono(Object.class)
|
||||
.block(BLOCK_TIMEOUT);
|
||||
server.awaitWarmupResponse(Duration.ofSeconds(2));
|
||||
}
|
||||
|
||||
private Object legacyInvoke(WebClient webClient, URI uri, Object body) {
|
||||
WebClient.RequestHeadersSpec<?> spec = webClient.post()
|
||||
.uri(uri)
|
||||
.contentType(MediaType.APPLICATION_JSON)
|
||||
.accept(MediaType.APPLICATION_JSON)
|
||||
.bodyValue(body);
|
||||
Mono<Object> responseMono = spec.retrieve()
|
||||
.bodyToMono(Object.class)
|
||||
// 模拟业务中首次订阅后缓存失败结果的场景
|
||||
.cache();
|
||||
return responseMono.retryWhen(Retry.fixedDelay(RETRY_ATTEMPTS, RETRY_DELAY)
|
||||
.filter(this::isRetryableException)
|
||||
.onRetryExhaustedThrow((specification, signal) -> signal.failure()))
|
||||
.block(BLOCK_TIMEOUT);
|
||||
}
|
||||
|
||||
private Object deferredInvoke(WebClient webClient, URI uri, Object body) {
|
||||
Mono<Object> responseMono = Mono.defer(() -> webClient.post()
|
||||
.uri(uri)
|
||||
.contentType(MediaType.APPLICATION_JSON)
|
||||
.accept(MediaType.APPLICATION_JSON)
|
||||
.bodyValue(body)
|
||||
.retrieve()
|
||||
.bodyToMono(Object.class)
|
||||
// 通过 defer,每次重试都会重新创建带缓存的响应 Mono
|
||||
.cache());
|
||||
return responseMono.retryWhen(Retry.fixedDelay(RETRY_ATTEMPTS, RETRY_DELAY)
|
||||
.filter(this::isRetryableException))
|
||||
.block(BLOCK_TIMEOUT);
|
||||
}
|
||||
|
||||
private boolean isRetryableException(Throwable throwable) {
|
||||
if (throwable == null) {
|
||||
return false;
|
||||
}
|
||||
Throwable cursor = throwable;
|
||||
while (cursor != null) {
|
||||
if (cursor instanceof ServiceException) {
|
||||
return false;
|
||||
}
|
||||
if (cursor instanceof PrematureCloseException) {
|
||||
return true;
|
||||
}
|
||||
if (cursor instanceof IOException) {
|
||||
return true;
|
||||
}
|
||||
cursor = cursor.getCause();
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
private static final class ResetOnceHttpServer implements AutoCloseable {
|
||||
|
||||
private static final Duration RESET_DELAY = Duration.ofMillis(250);
|
||||
|
||||
private final ServerSocket serverSocket;
|
||||
private final ExecutorService acceptExecutor;
|
||||
private final ScheduledExecutorService scheduler;
|
||||
private final AtomicInteger connectionCount = new AtomicInteger();
|
||||
private final AtomicInteger freshResponses = new AtomicInteger();
|
||||
private final CountDownLatch warmupResponseSent = new CountDownLatch(1);
|
||||
private final CountDownLatch warmupReset = new CountDownLatch(1);
|
||||
private final List<String> servedBodies = new CopyOnWriteArrayList<>();
|
||||
private volatile boolean running = true;
|
||||
private volatile Socket warmupSocket;
|
||||
|
||||
ResetOnceHttpServer() throws IOException {
|
||||
this.serverSocket = new ServerSocket(0, 50, InetAddress.getByName("127.0.0.1"));
|
||||
this.serverSocket.setReuseAddress(true);
|
||||
this.acceptExecutor = Executors.newSingleThreadExecutor(r -> {
|
||||
Thread t = new Thread(r, "reset-once-http-accept");
|
||||
t.setDaemon(true);
|
||||
return t;
|
||||
});
|
||||
this.scheduler = Executors.newSingleThreadScheduledExecutor(r -> {
|
||||
Thread t = new Thread(r, "reset-once-http-scheduler");
|
||||
t.setDaemon(true);
|
||||
return t;
|
||||
});
|
||||
acceptExecutor.submit(this::acceptLoop);
|
||||
}
|
||||
|
||||
URI uri(String path) {
|
||||
Objects.requireNonNull(path, "path");
|
||||
if (!path.startsWith("/")) {
|
||||
path = "/" + path;
|
||||
}
|
||||
return URI.create("http://127.0.0.1:" + serverSocket.getLocalPort() + path);
|
||||
}
|
||||
|
||||
List<String> getServedBodies() {
|
||||
return new ArrayList<>(servedBodies);
|
||||
}
|
||||
|
||||
int getFreshResponseCount() {
|
||||
return freshResponses.get();
|
||||
}
|
||||
|
||||
void awaitWarmupResponse(Duration timeout) {
|
||||
awaitLatch(warmupResponseSent, timeout);
|
||||
}
|
||||
|
||||
void awaitWarmupConnectionReset(Duration timeout) {
|
||||
awaitLatch(warmupReset, timeout);
|
||||
}
|
||||
|
||||
void awaitFreshResponses(int expected, Duration timeout) {
|
||||
long deadline = System.nanoTime() + timeout.toNanos();
|
||||
while (freshResponses.get() < expected && System.nanoTime() < deadline) {
|
||||
try {
|
||||
Thread.sleep(10);
|
||||
} catch (InterruptedException ignored) {
|
||||
Thread.currentThread().interrupt();
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void awaitLatch(CountDownLatch latch, Duration timeout) {
|
||||
try {
|
||||
if (!latch.await(timeout.toMillis(), TimeUnit.MILLISECONDS)) {
|
||||
throw new IllegalStateException("Timed out waiting for latch");
|
||||
}
|
||||
} catch (InterruptedException ex) {
|
||||
Thread.currentThread().interrupt();
|
||||
throw new IllegalStateException(ex);
|
||||
}
|
||||
}
|
||||
|
||||
private void acceptLoop() {
|
||||
try {
|
||||
while (running) {
|
||||
Socket socket = serverSocket.accept();
|
||||
int index = connectionCount.incrementAndGet();
|
||||
handle(socket, index);
|
||||
}
|
||||
} catch (SocketException ex) {
|
||||
if (running) {
|
||||
throw new IllegalStateException("Unexpected server socket error", ex);
|
||||
}
|
||||
} catch (IOException ex) {
|
||||
if (running) {
|
||||
throw new IllegalStateException("I/O error in server", ex);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void handle(Socket socket, int index) {
|
||||
try {
|
||||
socket.setTcpNoDelay(true);
|
||||
RequestMetadata metadata = readRequest(socket);
|
||||
if (index == 1) {
|
||||
warmupSocket = socket;
|
||||
String body = "{\"stage\":\"warmup\",\"path\":\"" + metadata.path + "\"}";
|
||||
writeResponse(socket, body, true);
|
||||
servedBodies.add("warmup");
|
||||
warmupResponseSent.countDown();
|
||||
scheduler.schedule(() -> forceReset(socket), RESET_DELAY.toMillis(), TimeUnit.MILLISECONDS);
|
||||
} else if (index == 2) {
|
||||
// 模拟客户端复用到仍在连接池中的旧连接,但服务端已在请求到达后立即复位。
|
||||
servedBodies.add("reset");
|
||||
scheduler.schedule(() -> closeWithReset(socket), 10, TimeUnit.MILLISECONDS);
|
||||
} else {
|
||||
String body = "{\"stage\":\"fresh\",\"attempt\":" + index + "}";
|
||||
writeResponse(socket, body, false);
|
||||
servedBodies.add("fresh");
|
||||
freshResponses.incrementAndGet();
|
||||
socket.close();
|
||||
}
|
||||
} catch (IOException ex) {
|
||||
// ignore for the purpose of the test
|
||||
}
|
||||
}
|
||||
|
||||
private void forceReset(Socket socket) {
|
||||
try {
|
||||
if (!socket.isClosed()) {
|
||||
servedBodies.add("reset");
|
||||
closeWithReset(socket);
|
||||
}
|
||||
} finally {
|
||||
warmupReset.countDown();
|
||||
}
|
||||
}
|
||||
|
||||
private void closeWithReset(Socket socket) {
|
||||
try {
|
||||
if (!socket.isClosed()) {
|
||||
socket.setSoLinger(true, 0);
|
||||
socket.close();
|
||||
}
|
||||
} catch (IOException ignored) {
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
|
||||
private RequestMetadata readRequest(Socket socket) throws IOException {
|
||||
BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream(), StandardCharsets.US_ASCII));
|
||||
String requestLine = reader.readLine();
|
||||
if (requestLine == null) {
|
||||
return new RequestMetadata("unknown", 0);
|
||||
}
|
||||
String path = requestLine.split(" ", 3)[1];
|
||||
int contentLength = 0;
|
||||
String line;
|
||||
while ((line = reader.readLine()) != null && !line.isEmpty()) {
|
||||
if (line.toLowerCase(Locale.ROOT).startsWith("content-length:")) {
|
||||
contentLength = Integer.parseInt(line.substring(line.indexOf(':') + 1).trim());
|
||||
}
|
||||
}
|
||||
if (contentLength > 0) {
|
||||
char[] buffer = new char[contentLength];
|
||||
int read = 0;
|
||||
while (read < contentLength) {
|
||||
int r = reader.read(buffer, read, contentLength - read);
|
||||
if (r < 0) {
|
||||
break;
|
||||
}
|
||||
read += r;
|
||||
}
|
||||
}
|
||||
return new RequestMetadata(path, contentLength);
|
||||
}
|
||||
|
||||
private void writeResponse(Socket socket, String body, boolean keepAlive) throws IOException {
|
||||
byte[] bodyBytes = body.getBytes(StandardCharsets.UTF_8);
|
||||
StringBuilder builder = new StringBuilder()
|
||||
.append("HTTP/1.1 200 OK\r\n")
|
||||
.append("Content-Type: application/json\r\n")
|
||||
.append("Content-Length: ").append(bodyBytes.length).append("\r\n");
|
||||
if (keepAlive) {
|
||||
builder.append("Connection: keep-alive\r\n");
|
||||
} else {
|
||||
builder.append("Connection: close\r\n");
|
||||
}
|
||||
builder.append("\r\n");
|
||||
OutputStream outputStream = socket.getOutputStream();
|
||||
outputStream.write(builder.toString().getBytes(StandardCharsets.US_ASCII));
|
||||
outputStream.write(bodyBytes);
|
||||
outputStream.flush();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws Exception {
|
||||
running = false;
|
||||
try {
|
||||
serverSocket.close();
|
||||
} catch (IOException ignored) {
|
||||
}
|
||||
if (warmupSocket != null && !warmupSocket.isClosed()) {
|
||||
try {
|
||||
warmupSocket.close();
|
||||
} catch (IOException ignored) {
|
||||
}
|
||||
}
|
||||
scheduler.shutdownNow();
|
||||
acceptExecutor.shutdownNow();
|
||||
}
|
||||
|
||||
private record RequestMetadata(String path, int contentLength) {
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -16,6 +16,7 @@ import com.zt.plat.module.databus.dal.mysql.gateway.ApiPolicyRateLimitMapper;
|
||||
import com.zt.plat.module.databus.dal.mysql.gateway.ApiStepMapper;
|
||||
import com.zt.plat.module.databus.dal.mysql.gateway.ApiTransformMapper;
|
||||
import com.zt.plat.module.databus.enums.gateway.ApiStatusEnum;
|
||||
import com.zt.plat.module.databus.service.gateway.ApiVersionService;
|
||||
import com.zt.plat.module.databus.service.gateway.impl.ApiDefinitionServiceImpl;
|
||||
import jakarta.annotation.Resource;
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
@@ -63,6 +64,9 @@ class ApiDefinitionServiceImplTest extends BaseDbUnitTest {
|
||||
@MockBean
|
||||
private StringRedisTemplate stringRedisTemplate;
|
||||
|
||||
@MockBean
|
||||
private ApiVersionService apiVersionService;
|
||||
|
||||
@TestConfiguration
|
||||
static class JacksonTestConfiguration {
|
||||
|
||||
|
||||
Reference in New Issue
Block a user