Merge branch 'dev' into test
This commit is contained in:
@@ -7,17 +7,23 @@ import com.zt.plat.module.databus.controller.admin.gateway.vo.accesslog.ApiAcces
|
||||
import com.zt.plat.module.databus.controller.admin.gateway.vo.accesslog.ApiAccessLogRespVO;
|
||||
import com.zt.plat.module.databus.dal.dataobject.gateway.ApiAccessLogDO;
|
||||
import com.zt.plat.module.databus.service.gateway.ApiAccessLogService;
|
||||
import com.zt.plat.module.databus.service.gateway.ApiDefinitionService;
|
||||
import io.swagger.v3.oas.annotations.Operation;
|
||||
import io.swagger.v3.oas.annotations.Parameter;
|
||||
import io.swagger.v3.oas.annotations.tags.Tag;
|
||||
import jakarta.annotation.Resource;
|
||||
import jakarta.validation.Valid;
|
||||
import org.springframework.security.access.prepost.PreAuthorize;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
import org.springframework.util.StringUtils;
|
||||
import org.springframework.validation.annotation.Validated;
|
||||
import org.springframework.web.bind.annotation.GetMapping;
|
||||
import org.springframework.web.bind.annotation.RequestMapping;
|
||||
import org.springframework.web.bind.annotation.RequestParam;
|
||||
import org.springframework.web.bind.annotation.RestController;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import static com.zt.plat.framework.common.pojo.CommonResult.success;
|
||||
|
||||
@@ -33,13 +39,18 @@ public class ApiAccessLogController {
|
||||
@Resource
|
||||
private ApiAccessLogService apiAccessLogService;
|
||||
|
||||
@Resource
|
||||
private ApiDefinitionService apiDefinitionService;
|
||||
|
||||
@GetMapping("/get")
|
||||
@Operation(summary = "获取访问日志详情")
|
||||
@Parameter(name = "id", description = "日志编号", required = true, example = "1024")
|
||||
@PreAuthorize("@ss.hasPermission('databus:gateway:access-log:query')")
|
||||
public CommonResult<ApiAccessLogRespVO> get(@RequestParam("id") Long id) {
|
||||
ApiAccessLogDO logDO = apiAccessLogService.get(id);
|
||||
return success(ApiAccessLogConvert.INSTANCE.convert(logDO));
|
||||
ApiAccessLogRespVO respVO = ApiAccessLogConvert.INSTANCE.convert(logDO);
|
||||
enrichDefinitionInfo(respVO);
|
||||
return success(respVO);
|
||||
}
|
||||
|
||||
@GetMapping("/page")
|
||||
@@ -47,6 +58,51 @@ public class ApiAccessLogController {
|
||||
@PreAuthorize("@ss.hasPermission('databus:gateway:access-log:query')")
|
||||
public CommonResult<PageResult<ApiAccessLogRespVO>> page(@Valid ApiAccessLogPageReqVO pageReqVO) {
|
||||
PageResult<ApiAccessLogDO> pageResult = apiAccessLogService.getPage(pageReqVO);
|
||||
return success(ApiAccessLogConvert.INSTANCE.convertPage(pageResult));
|
||||
PageResult<ApiAccessLogRespVO> result = ApiAccessLogConvert.INSTANCE.convertPage(pageResult);
|
||||
enrichDefinitionInfo(result.getList());
|
||||
return success(result);
|
||||
}
|
||||
|
||||
private void enrichDefinitionInfo(List<ApiAccessLogRespVO> list) {
|
||||
// 对分页结果批量补充 API 描述,使用本地缓存减少重复查询
|
||||
if (CollectionUtils.isEmpty(list)) {
|
||||
return;
|
||||
}
|
||||
Map<String, String> cache = new HashMap<>(list.size());
|
||||
list.forEach(item -> {
|
||||
if (item == null) {
|
||||
return;
|
||||
}
|
||||
String cacheKey = buildCacheKey(item.getApiCode(), item.getApiVersion());
|
||||
if (!cache.containsKey(cacheKey)) {
|
||||
cache.put(cacheKey, resolveApiDescription(item.getApiCode(), item.getApiVersion()));
|
||||
}
|
||||
item.setApiDescription(cache.get(cacheKey));
|
||||
});
|
||||
}
|
||||
|
||||
private void enrichDefinitionInfo(ApiAccessLogRespVO item) {
|
||||
// 单条数据同样需要补全描述信息
|
||||
if (item == null) {
|
||||
return;
|
||||
}
|
||||
item.setApiDescription(resolveApiDescription(item.getApiCode(), item.getApiVersion()));
|
||||
}
|
||||
|
||||
private String resolveApiDescription(String apiCode, String apiVersion) {
|
||||
if (!StringUtils.hasText(apiCode)) {
|
||||
return null;
|
||||
}
|
||||
String normalizedVersion = StringUtils.hasText(apiVersion) ? apiVersion.trim() : apiVersion;
|
||||
// 通过网关定义服务补全 API 描述,提升页面可读性
|
||||
return apiDefinitionService.findByCodeAndVersionIncludingInactive(apiCode, normalizedVersion)
|
||||
.map(aggregate -> aggregate.getDefinition() != null ? aggregate.getDefinition().getDescription() : null)
|
||||
.filter(StringUtils::hasText)
|
||||
.orElse(null);
|
||||
}
|
||||
|
||||
private String buildCacheKey(String apiCode, String apiVersion) {
|
||||
// 组合唯一键,避免重复查询相同的 API 描述
|
||||
return (apiCode == null ? "" : apiCode) + "#" + (apiVersion == null ? "" : apiVersion);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,7 +4,9 @@ import com.zt.plat.framework.common.pojo.PageResult;
|
||||
import com.zt.plat.module.databus.controller.admin.gateway.vo.accesslog.ApiAccessLogRespVO;
|
||||
import com.zt.plat.module.databus.dal.dataobject.gateway.ApiAccessLogDO;
|
||||
import org.mapstruct.Mapper;
|
||||
import org.mapstruct.Mapping;
|
||||
import org.mapstruct.factory.Mappers;
|
||||
import org.springframework.http.HttpStatus;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
@@ -13,6 +15,8 @@ public interface ApiAccessLogConvert {
|
||||
|
||||
ApiAccessLogConvert INSTANCE = Mappers.getMapper(ApiAccessLogConvert.class);
|
||||
|
||||
@Mapping(target = "statusDesc", expression = "java(statusDesc(bean.getStatus()))")
|
||||
@Mapping(target = "responseStatusText", expression = "java(resolveHttpStatusText(bean.getResponseStatus()))")
|
||||
ApiAccessLogRespVO convert(ApiAccessLogDO bean);
|
||||
|
||||
List<ApiAccessLogRespVO> convertList(List<ApiAccessLogDO> list);
|
||||
@@ -26,4 +30,26 @@ public interface ApiAccessLogConvert {
|
||||
result.setTotal(page.getTotal());
|
||||
return result;
|
||||
}
|
||||
|
||||
default String statusDesc(Integer status) {
|
||||
// 将数字状态码转换为中文描述,方便前端直接展示
|
||||
if (status == null) {
|
||||
return "未知";
|
||||
}
|
||||
return switch (status) {
|
||||
case 0 -> "成功";
|
||||
case 1 -> "客户端错误";
|
||||
case 2 -> "服务端错误";
|
||||
default -> "未知";
|
||||
};
|
||||
}
|
||||
|
||||
default String resolveHttpStatusText(Integer status) {
|
||||
// 统一使用 Spring 的 HttpStatus 解析出标准文案
|
||||
if (status == null) {
|
||||
return null;
|
||||
}
|
||||
HttpStatus resolved = HttpStatus.resolve(status);
|
||||
return resolved != null ? resolved.getReasonPhrase() : null;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -21,6 +21,9 @@ public class ApiAccessLogRespVO {
|
||||
@Schema(description = "API 编码", example = "user.query")
|
||||
private String apiCode;
|
||||
|
||||
@Schema(description = "API 描述", example = "用户查询服务")
|
||||
private String apiDescription;
|
||||
|
||||
@Schema(description = "API 版本", example = "v1")
|
||||
private String apiVersion;
|
||||
|
||||
@@ -42,6 +45,9 @@ public class ApiAccessLogRespVO {
|
||||
@Schema(description = "响应 HTTP 状态", example = "200")
|
||||
private Integer responseStatus;
|
||||
|
||||
@Schema(description = "响应 HTTP 状态说明", example = "OK")
|
||||
private String responseStatusText;
|
||||
|
||||
@Schema(description = "响应提示", example = "OK")
|
||||
private String responseMessage;
|
||||
|
||||
@@ -51,6 +57,9 @@ public class ApiAccessLogRespVO {
|
||||
@Schema(description = "访问状态", example = "0")
|
||||
private Integer status;
|
||||
|
||||
@Schema(description = "访问状态展示文案", example = "成功")
|
||||
private String statusDesc;
|
||||
|
||||
@Schema(description = "错误码", example = "DAT-001")
|
||||
private String errorCode;
|
||||
|
||||
|
||||
@@ -28,7 +28,7 @@ public class ApiAccessLogDO extends TenantBaseDO {
|
||||
private Long id;
|
||||
|
||||
/**
|
||||
* 请求追踪标识,对应 {@link com.zt.plat.module.databus.framework.integration.gateway.model.ApiInvocationContext#getRequestId()}
|
||||
* 请求追踪标识,对应 {@link com.zt.plat.framework.common.util.monitor.TracerUtils#getTraceId()}
|
||||
*/
|
||||
private String traceId;
|
||||
|
||||
|
||||
@@ -2,12 +2,14 @@ package com.zt.plat.module.databus.framework.integration.gateway.core;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.zt.plat.framework.common.util.monitor.TracerUtils;
|
||||
import com.zt.plat.module.databus.dal.dataobject.gateway.ApiAccessLogDO;
|
||||
import com.zt.plat.module.databus.framework.integration.gateway.model.ApiInvocationContext;
|
||||
import com.zt.plat.module.databus.service.gateway.ApiAccessLogService;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.http.HttpHeaders;
|
||||
import org.springframework.http.HttpStatus;
|
||||
import org.springframework.stereotype.Component;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
import org.springframework.util.StringUtils;
|
||||
@@ -42,8 +44,9 @@ public class ApiGatewayAccessLogger {
|
||||
*/
|
||||
public void onRequest(ApiInvocationContext context) {
|
||||
try {
|
||||
String traceId = TracerUtils.getTraceId();
|
||||
ApiAccessLogDO logDO = new ApiAccessLogDO();
|
||||
logDO.setTraceId(context.getRequestId());
|
||||
logDO.setTraceId(traceId);
|
||||
logDO.setApiCode(context.getApiCode());
|
||||
logDO.setApiVersion(context.getApiVersion());
|
||||
logDO.setRequestMethod(context.getHttpMethod());
|
||||
@@ -60,7 +63,7 @@ public class ApiGatewayAccessLogger {
|
||||
Long logId = apiAccessLogService.create(logDO);
|
||||
context.getAttributes().put(ATTR_LOG_ID, logId);
|
||||
} catch (Exception ex) {
|
||||
log.warn("记录 API 访问日志开始阶段失败, traceId={}", context.getRequestId(), ex);
|
||||
log.warn("记录 API 访问日志开始阶段失败, traceId={}", TracerUtils.getTraceId(), ex);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -85,12 +88,18 @@ public class ApiGatewayAccessLogger {
|
||||
try {
|
||||
ApiAccessLogDO update = new ApiAccessLogDO();
|
||||
update.setId(logId);
|
||||
update.setResponseStatus(context.getResponseStatus());
|
||||
update.setResponseMessage(context.getResponseMessage());
|
||||
int responseStatus = resolveHttpStatus(context);
|
||||
context.setResponseStatus(responseStatus);
|
||||
update.setResponseStatus(responseStatus);
|
||||
String responseMessage = resolveResponseMessage(context, responseStatus);
|
||||
update.setResponseMessage(responseMessage);
|
||||
if (!StringUtils.hasText(context.getResponseMessage()) && StringUtils.hasText(responseMessage)) {
|
||||
context.setResponseMessage(responseMessage);
|
||||
}
|
||||
update.setResponseBody(toJson(context.getResponseBody()));
|
||||
update.setStatus(resolveStatus(context.getResponseStatus()));
|
||||
update.setErrorCode(extractErrorCode(context.getResponseBody()));
|
||||
update.setErrorMessage(resolveErrorMessage(context));
|
||||
update.setStatus(resolveStatus(responseStatus));
|
||||
update.setErrorCode(extractErrorCode(context.getResponseBody(), responseStatus));
|
||||
update.setErrorMessage(resolveErrorMessage(context, responseStatus));
|
||||
update.setExceptionStack((String) context.getAttributes().get(ATTR_EXCEPTION_STACK));
|
||||
update.setStepResults(toJson(context.getStepResults()));
|
||||
update.setExtra(toJson(buildExtra(context)));
|
||||
@@ -98,7 +107,7 @@ public class ApiGatewayAccessLogger {
|
||||
update.setDuration(calculateDuration(context));
|
||||
apiAccessLogService.update(update);
|
||||
} catch (Exception ex) {
|
||||
log.warn("记录 API 访问日志结束阶段失败, traceId={}, logId={}", context.getRequestId(), logId, ex);
|
||||
log.warn("记录 API 访问日志结束阶段失败, traceId={}, logId={}", TracerUtils.getTraceId(), logId, ex);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -137,7 +146,10 @@ public class ApiGatewayAccessLogger {
|
||||
return 3;
|
||||
}
|
||||
|
||||
private String resolveErrorMessage(ApiInvocationContext context) {
|
||||
private String resolveErrorMessage(ApiInvocationContext context, int responseStatus) {
|
||||
if (!isErrorStatus(responseStatus)) {
|
||||
return null;
|
||||
}
|
||||
if (StringUtils.hasText(context.getResponseMessage())) {
|
||||
return truncate(context.getResponseMessage());
|
||||
}
|
||||
@@ -151,7 +163,10 @@ public class ApiGatewayAccessLogger {
|
||||
return null;
|
||||
}
|
||||
|
||||
private String extractErrorCode(Object responseBody) {
|
||||
private String extractErrorCode(Object responseBody, int responseStatus) {
|
||||
if (!isErrorStatus(responseStatus)) {
|
||||
return null;
|
||||
}
|
||||
if (responseBody instanceof Map<?, ?> map) {
|
||||
Object errorCode = firstNonNull(map.get("errorCode"), map.get("code"));
|
||||
return errorCode == null ? null : truncate(String.valueOf(errorCode));
|
||||
@@ -159,6 +174,27 @@ public class ApiGatewayAccessLogger {
|
||||
return null;
|
||||
}
|
||||
|
||||
private int resolveHttpStatus(ApiInvocationContext context) {
|
||||
Integer status = context.getResponseStatus();
|
||||
if (status != null) {
|
||||
return status;
|
||||
}
|
||||
// 默认兜底为 200,避免日志中出现空的 HTTP 状态码
|
||||
return HttpStatus.OK.value();
|
||||
}
|
||||
|
||||
private String resolveResponseMessage(ApiInvocationContext context, int responseStatus) {
|
||||
if (StringUtils.hasText(context.getResponseMessage())) {
|
||||
return truncate(context.getResponseMessage());
|
||||
}
|
||||
HttpStatus resolved = HttpStatus.resolve(responseStatus);
|
||||
return resolved != null ? resolved.getReasonPhrase() : null;
|
||||
}
|
||||
|
||||
private boolean isErrorStatus(int responseStatus) {
|
||||
return responseStatus >= 400;
|
||||
}
|
||||
|
||||
private Map<String, Object> buildExtra(ApiInvocationContext context) {
|
||||
Map<String, Object> extra = new HashMap<>();
|
||||
if (!CollectionUtils.isEmpty(context.getVariables())) {
|
||||
|
||||
@@ -4,6 +4,7 @@ import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.zt.plat.framework.common.exception.ServiceException;
|
||||
import com.zt.plat.framework.common.exception.util.ServiceExceptionUtil;
|
||||
import com.zt.plat.framework.common.util.monitor.TracerUtils;
|
||||
import com.zt.plat.module.databus.controller.admin.gateway.vo.ApiGatewayInvokeReqVO;
|
||||
import com.zt.plat.module.databus.framework.integration.config.ApiGatewayProperties;
|
||||
import com.zt.plat.module.databus.framework.integration.gateway.domain.ApiDefinitionAggregate;
|
||||
@@ -236,12 +237,12 @@ public class ApiGatewayExecutionService {
|
||||
String message = StringUtils.hasText(context.getResponseMessage())
|
||||
? context.getResponseMessage()
|
||||
: HttpStatus.valueOf(status).getReasonPhrase();
|
||||
return ApiGatewayResponse.builder()
|
||||
return ApiGatewayResponse.builder()
|
||||
.code(status)
|
||||
.message(message)
|
||||
.response(context.getResponseBody())
|
||||
.traceId(context.getRequestId())
|
||||
.build();
|
||||
.message(message)
|
||||
.response(context.getResponseBody())
|
||||
.traceId(TracerUtils.getTraceId())
|
||||
.build();
|
||||
}
|
||||
|
||||
private String normalizeBasePath(String basePath) {
|
||||
|
||||
@@ -4,6 +4,7 @@ import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.zt.plat.framework.common.util.json.JsonUtils;
|
||||
import com.zt.plat.framework.common.util.monitor.TracerUtils;
|
||||
import com.zt.plat.framework.common.util.security.CryptoSignatureUtils;
|
||||
import com.zt.plat.framework.common.util.servlet.ServletUtils;
|
||||
import com.zt.plat.framework.security.core.LoginUser;
|
||||
@@ -464,11 +465,12 @@ public class GatewaySecurityFilter extends OncePerRequestFilter {
|
||||
response.resetBuffer();
|
||||
response.setStatus(status.value());
|
||||
String resolvedMessage = StringUtils.hasText(message) ? message : status.getReasonPhrase();
|
||||
ApiGatewayResponse envelope = ApiGatewayResponse.builder()
|
||||
String traceId = TracerUtils.getTraceId();
|
||||
ApiGatewayResponse envelope = ApiGatewayResponse.builder()
|
||||
.code(status.value())
|
||||
.message(resolvedMessage)
|
||||
.response(null)
|
||||
.traceId(null)
|
||||
.traceId(traceId)
|
||||
.build();
|
||||
if (shouldEncryptErrorResponse(security, credential)) {
|
||||
String encryptionKey = credential.getEncryptionKey();
|
||||
|
||||
@@ -0,0 +1,360 @@
|
||||
package com.zt.plat.module.databus.framework.integration.gateway.step.impl;
|
||||
|
||||
import com.zt.plat.framework.common.exception.ServiceException;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.springframework.http.MediaType;
|
||||
import org.springframework.http.client.reactive.ReactorClientHttpConnector;
|
||||
import org.springframework.web.reactive.function.client.WebClient;
|
||||
import reactor.core.publisher.Mono;
|
||||
import reactor.netty.http.client.HttpClient;
|
||||
import reactor.netty.http.client.PrematureCloseException;
|
||||
import reactor.netty.resources.ConnectionProvider;
|
||||
import reactor.util.retry.Retry;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStreamReader;
|
||||
import java.io.OutputStream;
|
||||
import java.net.*;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.time.Duration;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.*;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
/**
|
||||
* Demonstrates the stale-connection scenario using the legacy vs the deferred retry pipeline.
|
||||
*/
|
||||
class HttpStepHandlerConnectionResetScenarioTest {
|
||||
|
||||
private static final Duration RETRY_DELAY = Duration.ofMillis(200);
|
||||
private static final int RETRY_ATTEMPTS = 3;
|
||||
private static final Duration BLOCK_TIMEOUT = Duration.ofSeconds(5);
|
||||
private static final Duration RESET_WAIT = Duration.ofMillis(300);
|
||||
|
||||
@Test
|
||||
void legacyPipelineLosesSuccessfulRetry() throws Exception {
|
||||
try (ResetOnceHttpServer server = new ResetOnceHttpServer()) {
|
||||
WebClient webClient = createWebClient();
|
||||
URI uri = server.uri("/demo");
|
||||
|
||||
warmUp(server, webClient, uri);
|
||||
server.awaitWarmupConnectionReset(RESET_WAIT);
|
||||
|
||||
legacyInvoke(webClient, uri, Map.of("mode", "legacy"));
|
||||
|
||||
server.awaitFreshResponses(1, Duration.ofSeconds(2));
|
||||
assertThat(server.getFreshResponseCount()).isEqualTo(1);
|
||||
assertThat(server.getServedBodies()).contains("reset", "fresh");
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
void deferredPipelinePropagatesSuccessfulRetry() throws Exception {
|
||||
try (ResetOnceHttpServer server = new ResetOnceHttpServer()) {
|
||||
WebClient webClient = createWebClient();
|
||||
URI uri = server.uri("/demo");
|
||||
|
||||
warmUp(server, webClient, uri);
|
||||
server.awaitWarmupConnectionReset(RESET_WAIT);
|
||||
|
||||
Object result = deferredInvoke(webClient, uri, Map.of("mode", "defer"));
|
||||
assertThat(result).isInstanceOf(Map.class);
|
||||
Map<?, ?> resultMap = (Map<?, ?>) result;
|
||||
assertThat(resultMap.get("stage")).isEqualTo("fresh");
|
||||
|
||||
server.awaitFreshResponses(1, Duration.ofSeconds(2));
|
||||
assertThat(server.getFreshResponseCount()).isEqualTo(1);
|
||||
assertThat(server.getServedBodies()).contains("reset", "fresh");
|
||||
}
|
||||
}
|
||||
|
||||
private WebClient createWebClient() {
|
||||
ConnectionProvider provider = ConnectionProvider.builder("http-step-handler-demo")
|
||||
.maxConnections(1)
|
||||
.pendingAcquireMaxCount(-1)
|
||||
.maxIdleTime(Duration.ofSeconds(5))
|
||||
.build();
|
||||
HttpClient httpClient = HttpClient.create(provider).compress(true);
|
||||
return WebClient.builder()
|
||||
.clientConnector(new ReactorClientHttpConnector(httpClient))
|
||||
.build();
|
||||
}
|
||||
|
||||
private void warmUp(ResetOnceHttpServer server, WebClient webClient, URI uri) {
|
||||
webClient.post()
|
||||
.uri(uri)
|
||||
.contentType(MediaType.APPLICATION_JSON)
|
||||
.accept(MediaType.APPLICATION_JSON)
|
||||
.bodyValue(Map.of("warm", true))
|
||||
.retrieve()
|
||||
.bodyToMono(Object.class)
|
||||
.block(BLOCK_TIMEOUT);
|
||||
server.awaitWarmupResponse(Duration.ofSeconds(2));
|
||||
}
|
||||
|
||||
private Object legacyInvoke(WebClient webClient, URI uri, Object body) {
|
||||
WebClient.RequestHeadersSpec<?> spec = webClient.post()
|
||||
.uri(uri)
|
||||
.contentType(MediaType.APPLICATION_JSON)
|
||||
.accept(MediaType.APPLICATION_JSON)
|
||||
.bodyValue(body);
|
||||
Mono<Object> responseMono = spec.retrieve()
|
||||
.bodyToMono(Object.class)
|
||||
// 模拟业务中首次订阅后缓存失败结果的场景
|
||||
.cache();
|
||||
return responseMono.retryWhen(Retry.fixedDelay(RETRY_ATTEMPTS, RETRY_DELAY)
|
||||
.filter(this::isRetryableException)
|
||||
.onRetryExhaustedThrow((specification, signal) -> signal.failure()))
|
||||
.block(BLOCK_TIMEOUT);
|
||||
}
|
||||
|
||||
private Object deferredInvoke(WebClient webClient, URI uri, Object body) {
|
||||
Mono<Object> responseMono = Mono.defer(() -> webClient.post()
|
||||
.uri(uri)
|
||||
.contentType(MediaType.APPLICATION_JSON)
|
||||
.accept(MediaType.APPLICATION_JSON)
|
||||
.bodyValue(body)
|
||||
.retrieve()
|
||||
.bodyToMono(Object.class)
|
||||
// 通过 defer,每次重试都会重新创建带缓存的响应 Mono
|
||||
.cache());
|
||||
return responseMono.retryWhen(Retry.fixedDelay(RETRY_ATTEMPTS, RETRY_DELAY)
|
||||
.filter(this::isRetryableException))
|
||||
.block(BLOCK_TIMEOUT);
|
||||
}
|
||||
|
||||
private boolean isRetryableException(Throwable throwable) {
|
||||
if (throwable == null) {
|
||||
return false;
|
||||
}
|
||||
Throwable cursor = throwable;
|
||||
while (cursor != null) {
|
||||
if (cursor instanceof ServiceException) {
|
||||
return false;
|
||||
}
|
||||
if (cursor instanceof PrematureCloseException) {
|
||||
return true;
|
||||
}
|
||||
if (cursor instanceof IOException) {
|
||||
return true;
|
||||
}
|
||||
cursor = cursor.getCause();
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
private static final class ResetOnceHttpServer implements AutoCloseable {
|
||||
|
||||
private static final Duration RESET_DELAY = Duration.ofMillis(250);
|
||||
|
||||
private final ServerSocket serverSocket;
|
||||
private final ExecutorService acceptExecutor;
|
||||
private final ScheduledExecutorService scheduler;
|
||||
private final AtomicInteger connectionCount = new AtomicInteger();
|
||||
private final AtomicInteger freshResponses = new AtomicInteger();
|
||||
private final CountDownLatch warmupResponseSent = new CountDownLatch(1);
|
||||
private final CountDownLatch warmupReset = new CountDownLatch(1);
|
||||
private final List<String> servedBodies = new CopyOnWriteArrayList<>();
|
||||
private volatile boolean running = true;
|
||||
private volatile Socket warmupSocket;
|
||||
|
||||
ResetOnceHttpServer() throws IOException {
|
||||
this.serverSocket = new ServerSocket(0, 50, InetAddress.getByName("127.0.0.1"));
|
||||
this.serverSocket.setReuseAddress(true);
|
||||
this.acceptExecutor = Executors.newSingleThreadExecutor(r -> {
|
||||
Thread t = new Thread(r, "reset-once-http-accept");
|
||||
t.setDaemon(true);
|
||||
return t;
|
||||
});
|
||||
this.scheduler = Executors.newSingleThreadScheduledExecutor(r -> {
|
||||
Thread t = new Thread(r, "reset-once-http-scheduler");
|
||||
t.setDaemon(true);
|
||||
return t;
|
||||
});
|
||||
acceptExecutor.submit(this::acceptLoop);
|
||||
}
|
||||
|
||||
URI uri(String path) {
|
||||
Objects.requireNonNull(path, "path");
|
||||
if (!path.startsWith("/")) {
|
||||
path = "/" + path;
|
||||
}
|
||||
return URI.create("http://127.0.0.1:" + serverSocket.getLocalPort() + path);
|
||||
}
|
||||
|
||||
List<String> getServedBodies() {
|
||||
return new ArrayList<>(servedBodies);
|
||||
}
|
||||
|
||||
int getFreshResponseCount() {
|
||||
return freshResponses.get();
|
||||
}
|
||||
|
||||
void awaitWarmupResponse(Duration timeout) {
|
||||
awaitLatch(warmupResponseSent, timeout);
|
||||
}
|
||||
|
||||
void awaitWarmupConnectionReset(Duration timeout) {
|
||||
awaitLatch(warmupReset, timeout);
|
||||
}
|
||||
|
||||
void awaitFreshResponses(int expected, Duration timeout) {
|
||||
long deadline = System.nanoTime() + timeout.toNanos();
|
||||
while (freshResponses.get() < expected && System.nanoTime() < deadline) {
|
||||
try {
|
||||
Thread.sleep(10);
|
||||
} catch (InterruptedException ignored) {
|
||||
Thread.currentThread().interrupt();
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void awaitLatch(CountDownLatch latch, Duration timeout) {
|
||||
try {
|
||||
if (!latch.await(timeout.toMillis(), TimeUnit.MILLISECONDS)) {
|
||||
throw new IllegalStateException("Timed out waiting for latch");
|
||||
}
|
||||
} catch (InterruptedException ex) {
|
||||
Thread.currentThread().interrupt();
|
||||
throw new IllegalStateException(ex);
|
||||
}
|
||||
}
|
||||
|
||||
private void acceptLoop() {
|
||||
try {
|
||||
while (running) {
|
||||
Socket socket = serverSocket.accept();
|
||||
int index = connectionCount.incrementAndGet();
|
||||
handle(socket, index);
|
||||
}
|
||||
} catch (SocketException ex) {
|
||||
if (running) {
|
||||
throw new IllegalStateException("Unexpected server socket error", ex);
|
||||
}
|
||||
} catch (IOException ex) {
|
||||
if (running) {
|
||||
throw new IllegalStateException("I/O error in server", ex);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void handle(Socket socket, int index) {
|
||||
try {
|
||||
socket.setTcpNoDelay(true);
|
||||
RequestMetadata metadata = readRequest(socket);
|
||||
if (index == 1) {
|
||||
warmupSocket = socket;
|
||||
String body = "{\"stage\":\"warmup\",\"path\":\"" + metadata.path + "\"}";
|
||||
writeResponse(socket, body, true);
|
||||
servedBodies.add("warmup");
|
||||
warmupResponseSent.countDown();
|
||||
scheduler.schedule(() -> forceReset(socket), RESET_DELAY.toMillis(), TimeUnit.MILLISECONDS);
|
||||
} else if (index == 2) {
|
||||
// 模拟客户端复用到仍在连接池中的旧连接,但服务端已在请求到达后立即复位。
|
||||
servedBodies.add("reset");
|
||||
scheduler.schedule(() -> closeWithReset(socket), 10, TimeUnit.MILLISECONDS);
|
||||
} else {
|
||||
String body = "{\"stage\":\"fresh\",\"attempt\":" + index + "}";
|
||||
writeResponse(socket, body, false);
|
||||
servedBodies.add("fresh");
|
||||
freshResponses.incrementAndGet();
|
||||
socket.close();
|
||||
}
|
||||
} catch (IOException ex) {
|
||||
// ignore for the purpose of the test
|
||||
}
|
||||
}
|
||||
|
||||
private void forceReset(Socket socket) {
|
||||
try {
|
||||
if (!socket.isClosed()) {
|
||||
servedBodies.add("reset");
|
||||
closeWithReset(socket);
|
||||
}
|
||||
} finally {
|
||||
warmupReset.countDown();
|
||||
}
|
||||
}
|
||||
|
||||
private void closeWithReset(Socket socket) {
|
||||
try {
|
||||
if (!socket.isClosed()) {
|
||||
socket.setSoLinger(true, 0);
|
||||
socket.close();
|
||||
}
|
||||
} catch (IOException ignored) {
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
|
||||
private RequestMetadata readRequest(Socket socket) throws IOException {
|
||||
BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream(), StandardCharsets.US_ASCII));
|
||||
String requestLine = reader.readLine();
|
||||
if (requestLine == null) {
|
||||
return new RequestMetadata("unknown", 0);
|
||||
}
|
||||
String path = requestLine.split(" ", 3)[1];
|
||||
int contentLength = 0;
|
||||
String line;
|
||||
while ((line = reader.readLine()) != null && !line.isEmpty()) {
|
||||
if (line.toLowerCase(Locale.ROOT).startsWith("content-length:")) {
|
||||
contentLength = Integer.parseInt(line.substring(line.indexOf(':') + 1).trim());
|
||||
}
|
||||
}
|
||||
if (contentLength > 0) {
|
||||
char[] buffer = new char[contentLength];
|
||||
int read = 0;
|
||||
while (read < contentLength) {
|
||||
int r = reader.read(buffer, read, contentLength - read);
|
||||
if (r < 0) {
|
||||
break;
|
||||
}
|
||||
read += r;
|
||||
}
|
||||
}
|
||||
return new RequestMetadata(path, contentLength);
|
||||
}
|
||||
|
||||
private void writeResponse(Socket socket, String body, boolean keepAlive) throws IOException {
|
||||
byte[] bodyBytes = body.getBytes(StandardCharsets.UTF_8);
|
||||
StringBuilder builder = new StringBuilder()
|
||||
.append("HTTP/1.1 200 OK\r\n")
|
||||
.append("Content-Type: application/json\r\n")
|
||||
.append("Content-Length: ").append(bodyBytes.length).append("\r\n");
|
||||
if (keepAlive) {
|
||||
builder.append("Connection: keep-alive\r\n");
|
||||
} else {
|
||||
builder.append("Connection: close\r\n");
|
||||
}
|
||||
builder.append("\r\n");
|
||||
OutputStream outputStream = socket.getOutputStream();
|
||||
outputStream.write(builder.toString().getBytes(StandardCharsets.US_ASCII));
|
||||
outputStream.write(bodyBytes);
|
||||
outputStream.flush();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws Exception {
|
||||
running = false;
|
||||
try {
|
||||
serverSocket.close();
|
||||
} catch (IOException ignored) {
|
||||
}
|
||||
if (warmupSocket != null && !warmupSocket.isClosed()) {
|
||||
try {
|
||||
warmupSocket.close();
|
||||
} catch (IOException ignored) {
|
||||
}
|
||||
}
|
||||
scheduler.shutdownNow();
|
||||
acceptExecutor.shutdownNow();
|
||||
}
|
||||
|
||||
private record RequestMetadata(String path, int contentLength) {
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user