Merge branch 'dev' into test
This commit is contained in:
@@ -273,10 +273,10 @@ public class GatewaySecurityFilter extends OncePerRequestFilter {
|
||||
|
||||
String signatureType = resolveSignatureType(credential, security);
|
||||
try {
|
||||
// boolean valid = CryptoSignatureUtils.verifySignature(signaturePayload, signatureType);
|
||||
// if (!valid) {
|
||||
// throw new SecurityValidationException(HttpStatus.UNAUTHORIZED, "签名校验失败");
|
||||
// }
|
||||
boolean valid = CryptoSignatureUtils.verifySignature(signaturePayload, signatureType);
|
||||
if (!valid) {
|
||||
throw new SecurityValidationException(HttpStatus.UNAUTHORIZED, "签名校验失败");
|
||||
}
|
||||
} catch (IllegalArgumentException ex) {
|
||||
throw new SecurityValidationException(HttpStatus.INTERNAL_SERVER_ERROR, "签名算法配置异常");
|
||||
}
|
||||
|
||||
@@ -393,8 +393,13 @@ public class HttpStepHandler implements ApiStepHandler {
|
||||
}
|
||||
|
||||
private boolean supportsRequestBody(HttpMethod method) {
|
||||
// 所有请求都要传递请求体
|
||||
return true;
|
||||
if (method == null) {
|
||||
return true;
|
||||
}
|
||||
return !(HttpMethod.GET.equals(method)
|
||||
|| HttpMethod.HEAD.equals(method)
|
||||
|| HttpMethod.OPTIONS.equals(method)
|
||||
|| HttpMethod.TRACE.equals(method));
|
||||
}
|
||||
|
||||
private Mono<Object> applyResilientRetry(Mono<Object> responseMono, ApiStepDefinition stepDefinition) {
|
||||
|
||||
@@ -2,8 +2,11 @@ package com.zt.plat.module.databus.controller.admin.gateway;
|
||||
|
||||
import com.zt.plat.module.databus.controller.admin.gateway.vo.ApiGatewayInvokeReqVO;
|
||||
import com.zt.plat.module.databus.framework.integration.gateway.core.ApiGatewayExecutionService;
|
||||
import com.zt.plat.module.databus.framework.integration.gateway.core.IntegrationFlowManager;
|
||||
import com.zt.plat.module.databus.framework.integration.gateway.model.ApiGatewayResponse;
|
||||
import com.zt.plat.module.databus.framework.integration.config.ApiGatewayProperties;
|
||||
import com.zt.plat.module.databus.framework.integration.gateway.core.ApiGatewayAccessLogger;
|
||||
import com.zt.plat.module.databus.service.gateway.ApiAnonymousUserService;
|
||||
import com.zt.plat.module.databus.service.gateway.ApiClientCredentialService;
|
||||
import com.zt.plat.module.databus.service.gateway.ApiDefinitionService;
|
||||
import org.junit.jupiter.api.Test;
|
||||
@@ -48,8 +51,17 @@ class ApiGatewayControllerTest {
|
||||
@MockBean
|
||||
private StringRedisTemplate stringRedisTemplate;
|
||||
|
||||
@MockBean
|
||||
private ApiClientCredentialService apiClientCredentialService;
|
||||
@MockBean
|
||||
private IntegrationFlowManager integrationFlowManager;
|
||||
|
||||
@MockBean
|
||||
private ApiClientCredentialService apiClientCredentialService;
|
||||
|
||||
@MockBean
|
||||
private ApiAnonymousUserService apiAnonymousUserService;
|
||||
|
||||
@MockBean
|
||||
private ApiGatewayAccessLogger apiGatewayAccessLogger;
|
||||
|
||||
@Test
|
||||
void invokeShouldReturnGatewayEnvelope() throws Exception {
|
||||
|
||||
@@ -28,16 +28,10 @@ import java.util.UUID;
|
||||
public final class DatabusApiInvocationExample {
|
||||
|
||||
public static final String TIMESTAMP = Long.toString(System.currentTimeMillis());
|
||||
// private static final String APP_ID = "ztmy";
|
||||
// private static final String APP_SECRET = "zFre/nTRGi7LpoFjN7oQkKeOT09x1fWTyIswrc702QQ=";
|
||||
// private static final String APP_ID = "test";
|
||||
// private static final String APP_SECRET = "RSYtKXrXPLMy3oeh0cOro6QCioRUgqfnKCkDkNq78sI=";
|
||||
private static final String APP_ID = "testAnnoy";
|
||||
private static final String APP_SECRET = "jyGCymUjCFL2i3a4Tm3qBIkUrUl4ZgKPYvOU/47ZWcM=";
|
||||
private static final String APP_ID = "ztmy";
|
||||
private static final String APP_SECRET = "zFre/nTRGi7LpoFjN7oQkKeOT09x1fWTyIswrc702QQ=";
|
||||
private static final String ENCRYPTION_TYPE = CryptoSignatureUtils.ENCRYPT_TYPE_AES;
|
||||
// private static final String TARGET_API = "http://172.16.46.63:30081/admin-api/databus/api/portal/lgstOpenApi/v1";
|
||||
private static final String TARGET_API = "http://127.0.0.1:48080/admin-api/databus/api/portal/test/1";
|
||||
// private static final String TARGET_API = "http://127.0.0.1:48080/admin-api/databus/api/portal/lgstOpenApi/v1";
|
||||
private static final String TARGET_API = "http://172.16.46.63:30081/admin-api/databus/api/portal/callback/v1";
|
||||
private static final HttpClient HTTP_CLIENT = HttpClient.newBuilder()
|
||||
.connectTimeout(Duration.ofSeconds(5))
|
||||
.build();
|
||||
@@ -88,8 +82,11 @@ public final class DatabusApiInvocationExample {
|
||||
Map<String, Object> queryParams = new LinkedHashMap<>();
|
||||
|
||||
long extraTimestamp = 1761556157185L;
|
||||
// String bodyJson = String.format("""
|
||||
// {"operateFlag":"I","__interfaceType__":"R_MY_JY_03","data":{"endAddressName":"1","customerCompanyName":"中铜国贸","endAddressDetail":"测试地址","remark":" ","custSuppType":"1","shipperCompanyName":"中铜国贸","consigneeCorpCode":" ","consignerContactPhone":" 11","importFlag":"10","businessSupplierCode":" ","entrustMainCode":"WT3162251027027","endAddressCode":" ","specifyCarrierCorpCode":"10086689","materDetail":[{"detailStatus":"10","batchNo":"ZLTD2510ZTGM0017001","measureCodeMdm":"CU032110001","packType":" ","quantityPlanDetail":1,"deliveryOrderNo":"ZLTD2510ZTGM0017001","measureCode":"CU032110001","goodsSpecification":" ","measureUnitCode":"PAC","entrustDetailCode":"WT3162251027027001","brand":" ","soNumber":"68ecf0055502d565d22b378a"}],"operateFlag":1,"custSuppName":"上海锦生金属有限公司","startAddressCode":" ","planStartTime":1761556166000,"customerCompanyCode":0,"importMethod":"EXW","startAddressType":"10","shipperCompanyCode":"3162","deliverCondition":"20","businessSupplierName":" ","startAddressDetail":" 111","transType":"30","endAddressType":"20","planEndTime":1761556166000,"specifyCarrierCorpName":null,"custSuppFlag":"0101","businessType":"20","consigneeCorpName":" ","custSuppCode":"10086689","startAddressName":" 111","consignerContactName":" 11"},"datetime":"20251027170929","busiBillCode":"WT3162251027027","system":"BRMS","__requestId__":"f918841c-14fb-49eb-9640-c5d1b3d46bd1"}
|
||||
// """, extraTimestamp);
|
||||
String bodyJson = String.format("""
|
||||
{"operateFlag":"I","__interfaceType__":"R_MY_JY_03","data":{"endAddressName":"1","customerCompanyName":"中铜国贸","endAddressDetail":"测试地址","remark":" ","custSuppType":"1","shipperCompanyName":"中铜国贸","consigneeCorpCode":" ","consignerContactPhone":" 11","importFlag":"10","businessSupplierCode":" ","entrustMainCode":"WT3162251027027","endAddressCode":" ","specifyCarrierCorpCode":"10086689","materDetail":[{"detailStatus":"10","batchNo":"ZLTD2510ZTGM0017001","measureCodeMdm":"CU032110001","packType":" ","quantityPlanDetail":1,"deliveryOrderNo":"ZLTD2510ZTGM0017001","measureCode":"CU032110001","goodsSpecification":" ","measureUnitCode":"PAC","entrustDetailCode":"WT3162251027027001","brand":" ","soNumber":"68ecf0055502d565d22b378a"}],"operateFlag":1,"custSuppName":"上海锦生金属有限公司","startAddressCode":" ","planStartTime":1761556166000,"customerCompanyCode":0,"importMethod":"EXW","startAddressType":"10","shipperCompanyCode":"3162","deliverCondition":"20","businessSupplierName":" ","startAddressDetail":" 111","transType":"30","endAddressType":"20","planEndTime":1761556166000,"specifyCarrierCorpName":null,"custSuppFlag":"0101","businessType":"20","consigneeCorpName":" ","custSuppCode":"10086689","startAddressName":" 111","consignerContactName":" 11"},"datetime":"20251027170929","busiBillCode":"WT3162251027027","system":"BRMS","__requestId__":"f918841c-14fb-49eb-9640-c5d1b3d46bd1"}
|
||||
{}
|
||||
""", extraTimestamp);
|
||||
|
||||
Map<String, Object> bodyParams = parseBodyJson(bodyJson);
|
||||
|
||||
@@ -9,6 +9,7 @@ import com.zt.plat.framework.security.core.util.SecurityFrameworkUtils;
|
||||
import com.zt.plat.framework.web.core.util.WebFrameworkUtils;
|
||||
import com.zt.plat.module.databus.dal.dataobject.gateway.ApiClientCredentialDO;
|
||||
import com.zt.plat.module.databus.framework.integration.config.ApiGatewayProperties;
|
||||
import com.zt.plat.module.databus.framework.integration.gateway.core.ApiGatewayAccessLogger;
|
||||
import com.zt.plat.module.databus.service.gateway.ApiAnonymousUserService;
|
||||
import com.zt.plat.module.databus.service.gateway.ApiClientCredentialService;
|
||||
import jakarta.servlet.http.HttpServletRequest;
|
||||
@@ -25,7 +26,9 @@ import org.springframework.security.core.context.SecurityContextHolder;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.TreeMap;
|
||||
import java.util.UUID;
|
||||
|
||||
import static com.zt.plat.module.databus.framework.integration.config.ApiGatewayProperties.*;
|
||||
@@ -47,11 +50,12 @@ class GatewaySecurityFilterTest {
|
||||
ApiGatewayProperties properties = createProperties();
|
||||
properties.getSecurity().setEnabled(false);
|
||||
StringRedisTemplate redisTemplate = mock(StringRedisTemplate.class);
|
||||
ApiClientCredentialService credentialService = mock(ApiClientCredentialService.class);
|
||||
ApiAnonymousUserService anonymousUserService = mock(ApiAnonymousUserService.class);
|
||||
when(anonymousUserService.issueAccessToken(any())).thenReturn(Optional.empty());
|
||||
ApiClientCredentialService credentialService = mock(ApiClientCredentialService.class);
|
||||
ApiAnonymousUserService anonymousUserService = mock(ApiAnonymousUserService.class);
|
||||
when(anonymousUserService.issueAccessToken(any())).thenReturn(Optional.empty());
|
||||
GatewaySecurityFilter filter = new GatewaySecurityFilter(properties, redisTemplate, credentialService, anonymousUserService, new ObjectMapper());
|
||||
ApiGatewayAccessLogger accessLogger = mock(ApiGatewayAccessLogger.class);
|
||||
when(accessLogger.logEntrance(any())).thenReturn(1L);
|
||||
GatewaySecurityFilter filter = new GatewaySecurityFilter(properties, redisTemplate, credentialService, anonymousUserService, new ObjectMapper(), accessLogger);
|
||||
|
||||
MockHttpServletRequest request = new MockHttpServletRequest("GET", "/admin-api/databus/api/portal/demo/v1");
|
||||
request.setRemoteAddr("127.0.0.1");
|
||||
@@ -74,7 +78,9 @@ class GatewaySecurityFilterTest {
|
||||
ApiClientCredentialService credentialService = mock(ApiClientCredentialService.class);
|
||||
ApiAnonymousUserService anonymousUserService = mock(ApiAnonymousUserService.class);
|
||||
when(anonymousUserService.issueAccessToken(any())).thenReturn(Optional.empty());
|
||||
GatewaySecurityFilter filter = new GatewaySecurityFilter(properties, redisTemplate, credentialService, anonymousUserService, new ObjectMapper());
|
||||
ApiGatewayAccessLogger accessLogger = mock(ApiGatewayAccessLogger.class);
|
||||
when(accessLogger.logEntrance(any())).thenReturn(1L);
|
||||
GatewaySecurityFilter filter = new GatewaySecurityFilter(properties, redisTemplate, credentialService, anonymousUserService, new ObjectMapper(), accessLogger);
|
||||
|
||||
MockHttpServletRequest request = new MockHttpServletRequest("GET", "/admin-api/databus/api/portal/demo/v1");
|
||||
request.setRemoteAddr("10.0.0.1");
|
||||
@@ -97,6 +103,8 @@ class GatewaySecurityFilterTest {
|
||||
|
||||
ApiClientCredentialService credentialService = mock(ApiClientCredentialService.class);
|
||||
ApiAnonymousUserService anonymousUserService = mock(ApiAnonymousUserService.class);
|
||||
ApiGatewayAccessLogger accessLogger = mock(ApiGatewayAccessLogger.class);
|
||||
when(accessLogger.logEntrance(any())).thenReturn(1L);
|
||||
when(anonymousUserService.issueAccessToken(any())).thenReturn(Optional.empty());
|
||||
ApiClientCredentialDO credential = new ApiClientCredentialDO();
|
||||
credential.setAppId("demo-app");
|
||||
@@ -108,13 +116,13 @@ class GatewaySecurityFilterTest {
|
||||
properties.getSecurity().setRequireBodyEncryption(false);
|
||||
properties.getSecurity().setEncryptResponse(false);
|
||||
|
||||
GatewaySecurityFilter filter = new GatewaySecurityFilter(properties, redisTemplate, credentialService, anonymousUserService, new ObjectMapper());
|
||||
GatewaySecurityFilter filter = new GatewaySecurityFilter(properties, redisTemplate, credentialService, anonymousUserService, new ObjectMapper(), accessLogger);
|
||||
|
||||
MockHttpServletRequest request = new MockHttpServletRequest("GET", "/admin-api/databus/api/portal/demo/v1");
|
||||
request.setRemoteAddr("127.0.0.1");
|
||||
long timestamp = System.currentTimeMillis();
|
||||
String nonce = UUID.randomUUID().toString().replaceAll("-", "");
|
||||
String signature = signatureForApp("demo-app");
|
||||
String signature = signatureForApp("demo-app", timestamp);
|
||||
request.addHeader(APP_ID_HEADER, "demo-app");
|
||||
request.addHeader(TIMESTAMP_HEADER, String.valueOf(timestamp));
|
||||
request.addHeader(NONCE_HEADER, nonce);
|
||||
@@ -142,13 +150,15 @@ class GatewaySecurityFilterTest {
|
||||
|
||||
ApiClientCredentialService credentialService = mock(ApiClientCredentialService.class);
|
||||
ApiAnonymousUserService anonymousUserService = mock(ApiAnonymousUserService.class);
|
||||
ApiGatewayAccessLogger accessLogger = mock(ApiGatewayAccessLogger.class);
|
||||
when(accessLogger.logEntrance(any())).thenReturn(1L);
|
||||
ApiClientCredentialDO credential = new ApiClientCredentialDO();
|
||||
credential.setAppId("demo-app");
|
||||
credential.setEncryptionKey("demo-secret-key");
|
||||
credential.setEncryptionType(CryptoSignatureUtils.ENCRYPT_TYPE_AES);
|
||||
when(credentialService.findActiveCredential("demo-app")).thenReturn(Optional.of(credential));
|
||||
|
||||
GatewaySecurityFilter filter = new GatewaySecurityFilter(properties, redisTemplate, credentialService, anonymousUserService, new ObjectMapper());
|
||||
GatewaySecurityFilter filter = new GatewaySecurityFilter(properties, redisTemplate, credentialService, anonymousUserService, new ObjectMapper(), accessLogger);
|
||||
|
||||
MockHttpServletRequest request = new MockHttpServletRequest("GET", "/admin-api/databus/api/portal/demo/v1");
|
||||
request.setRemoteAddr("127.0.0.1");
|
||||
@@ -183,6 +193,8 @@ class GatewaySecurityFilterTest {
|
||||
|
||||
ApiClientCredentialService credentialService = mock(ApiClientCredentialService.class);
|
||||
ApiAnonymousUserService anonymousUserService = mock(ApiAnonymousUserService.class);
|
||||
ApiGatewayAccessLogger accessLogger = mock(ApiGatewayAccessLogger.class);
|
||||
when(accessLogger.logEntrance(any())).thenReturn(1L);
|
||||
ApiClientCredentialDO credential = new ApiClientCredentialDO();
|
||||
credential.setAppId("demo-app");
|
||||
credential.setSignatureType(null);
|
||||
@@ -200,7 +212,7 @@ class GatewaySecurityFilterTest {
|
||||
when(anonymousUserService.find(99L)).thenReturn(Optional.of(details));
|
||||
when(anonymousUserService.issueAccessToken(details)).thenReturn(Optional.of("mock-token"));
|
||||
|
||||
GatewaySecurityFilter filter = new GatewaySecurityFilter(properties, redisTemplate, credentialService, anonymousUserService, new ObjectMapper());
|
||||
GatewaySecurityFilter filter = new GatewaySecurityFilter(properties, redisTemplate, credentialService, anonymousUserService, new ObjectMapper(), accessLogger);
|
||||
|
||||
MockHttpServletRequest request = new MockHttpServletRequest("GET", "/admin-api/databus/api/portal/demo/v1");
|
||||
request.setRemoteAddr("127.0.0.1");
|
||||
@@ -209,7 +221,7 @@ class GatewaySecurityFilterTest {
|
||||
request.addHeader(APP_ID_HEADER, "demo-app");
|
||||
request.addHeader(TIMESTAMP_HEADER, String.valueOf(timestamp));
|
||||
request.addHeader(NONCE_HEADER, nonce);
|
||||
request.addHeader(SIGNATURE_HEADER, signatureForApp("demo-app"));
|
||||
request.addHeader(SIGNATURE_HEADER, signatureForApp("demo-app", timestamp));
|
||||
|
||||
MockHttpServletResponse response = new MockHttpServletResponse();
|
||||
MockFilterChain chain = new MockFilterChain();
|
||||
@@ -231,7 +243,20 @@ class GatewaySecurityFilterTest {
|
||||
return properties;
|
||||
}
|
||||
|
||||
private String signatureForApp(String appId) {
|
||||
return SecureUtil.md5("appId=" + appId);
|
||||
private String signatureForApp(String appId, long timestamp) {
|
||||
Map<String, Object> payload = new TreeMap<>();
|
||||
payload.put(APP_ID_HEADER, appId);
|
||||
payload.put(TIMESTAMP_HEADER, String.valueOf(timestamp));
|
||||
StringBuilder sb = new StringBuilder();
|
||||
payload.forEach((key, value) -> {
|
||||
if (value == null) {
|
||||
return;
|
||||
}
|
||||
if (sb.length() > 0) {
|
||||
sb.append('&');
|
||||
}
|
||||
sb.append(key).append('=').append(value);
|
||||
});
|
||||
return SecureUtil.md5(sb.toString());
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,360 +0,0 @@
|
||||
package com.zt.plat.module.databus.framework.integration.gateway.step.impl;
|
||||
|
||||
import com.zt.plat.framework.common.exception.ServiceException;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.springframework.http.MediaType;
|
||||
import org.springframework.http.client.reactive.ReactorClientHttpConnector;
|
||||
import org.springframework.web.reactive.function.client.WebClient;
|
||||
import reactor.core.publisher.Mono;
|
||||
import reactor.netty.http.client.HttpClient;
|
||||
import reactor.netty.http.client.PrematureCloseException;
|
||||
import reactor.netty.resources.ConnectionProvider;
|
||||
import reactor.util.retry.Retry;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStreamReader;
|
||||
import java.io.OutputStream;
|
||||
import java.net.*;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.time.Duration;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.*;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
/**
|
||||
* Demonstrates the stale-connection scenario using the legacy vs the deferred retry pipeline.
|
||||
*/
|
||||
class HttpStepHandlerConnectionResetScenarioTest {
|
||||
|
||||
private static final Duration RETRY_DELAY = Duration.ofMillis(200);
|
||||
private static final int RETRY_ATTEMPTS = 3;
|
||||
private static final Duration BLOCK_TIMEOUT = Duration.ofSeconds(5);
|
||||
private static final Duration RESET_WAIT = Duration.ofMillis(300);
|
||||
|
||||
@Test
|
||||
void legacyPipelineLosesSuccessfulRetry() throws Exception {
|
||||
try (ResetOnceHttpServer server = new ResetOnceHttpServer()) {
|
||||
WebClient webClient = createWebClient();
|
||||
URI uri = server.uri("/demo");
|
||||
|
||||
warmUp(server, webClient, uri);
|
||||
server.awaitWarmupConnectionReset(RESET_WAIT);
|
||||
|
||||
legacyInvoke(webClient, uri, Map.of("mode", "legacy"));
|
||||
|
||||
server.awaitFreshResponses(1, Duration.ofSeconds(2));
|
||||
assertThat(server.getFreshResponseCount()).isEqualTo(1);
|
||||
assertThat(server.getServedBodies()).contains("reset", "fresh");
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
void deferredPipelinePropagatesSuccessfulRetry() throws Exception {
|
||||
try (ResetOnceHttpServer server = new ResetOnceHttpServer()) {
|
||||
WebClient webClient = createWebClient();
|
||||
URI uri = server.uri("/demo");
|
||||
|
||||
warmUp(server, webClient, uri);
|
||||
server.awaitWarmupConnectionReset(RESET_WAIT);
|
||||
|
||||
Object result = deferredInvoke(webClient, uri, Map.of("mode", "defer"));
|
||||
assertThat(result).isInstanceOf(Map.class);
|
||||
Map<?, ?> resultMap = (Map<?, ?>) result;
|
||||
assertThat(resultMap.get("stage")).isEqualTo("fresh");
|
||||
|
||||
server.awaitFreshResponses(1, Duration.ofSeconds(2));
|
||||
assertThat(server.getFreshResponseCount()).isEqualTo(1);
|
||||
assertThat(server.getServedBodies()).contains("reset", "fresh");
|
||||
}
|
||||
}
|
||||
|
||||
private WebClient createWebClient() {
|
||||
ConnectionProvider provider = ConnectionProvider.builder("http-step-handler-demo")
|
||||
.maxConnections(1)
|
||||
.pendingAcquireMaxCount(-1)
|
||||
.maxIdleTime(Duration.ofSeconds(5))
|
||||
.build();
|
||||
HttpClient httpClient = HttpClient.create(provider).compress(true);
|
||||
return WebClient.builder()
|
||||
.clientConnector(new ReactorClientHttpConnector(httpClient))
|
||||
.build();
|
||||
}
|
||||
|
||||
private void warmUp(ResetOnceHttpServer server, WebClient webClient, URI uri) {
|
||||
webClient.post()
|
||||
.uri(uri)
|
||||
.contentType(MediaType.APPLICATION_JSON)
|
||||
.accept(MediaType.APPLICATION_JSON)
|
||||
.bodyValue(Map.of("warm", true))
|
||||
.retrieve()
|
||||
.bodyToMono(Object.class)
|
||||
.block(BLOCK_TIMEOUT);
|
||||
server.awaitWarmupResponse(Duration.ofSeconds(2));
|
||||
}
|
||||
|
||||
private Object legacyInvoke(WebClient webClient, URI uri, Object body) {
|
||||
WebClient.RequestHeadersSpec<?> spec = webClient.post()
|
||||
.uri(uri)
|
||||
.contentType(MediaType.APPLICATION_JSON)
|
||||
.accept(MediaType.APPLICATION_JSON)
|
||||
.bodyValue(body);
|
||||
Mono<Object> responseMono = spec.retrieve()
|
||||
.bodyToMono(Object.class)
|
||||
// 模拟业务中首次订阅后缓存失败结果的场景
|
||||
.cache();
|
||||
return responseMono.retryWhen(Retry.fixedDelay(RETRY_ATTEMPTS, RETRY_DELAY)
|
||||
.filter(this::isRetryableException)
|
||||
.onRetryExhaustedThrow((specification, signal) -> signal.failure()))
|
||||
.block(BLOCK_TIMEOUT);
|
||||
}
|
||||
|
||||
private Object deferredInvoke(WebClient webClient, URI uri, Object body) {
|
||||
Mono<Object> responseMono = Mono.defer(() -> webClient.post()
|
||||
.uri(uri)
|
||||
.contentType(MediaType.APPLICATION_JSON)
|
||||
.accept(MediaType.APPLICATION_JSON)
|
||||
.bodyValue(body)
|
||||
.retrieve()
|
||||
.bodyToMono(Object.class)
|
||||
// 通过 defer,每次重试都会重新创建带缓存的响应 Mono
|
||||
.cache());
|
||||
return responseMono.retryWhen(Retry.fixedDelay(RETRY_ATTEMPTS, RETRY_DELAY)
|
||||
.filter(this::isRetryableException))
|
||||
.block(BLOCK_TIMEOUT);
|
||||
}
|
||||
|
||||
private boolean isRetryableException(Throwable throwable) {
|
||||
if (throwable == null) {
|
||||
return false;
|
||||
}
|
||||
Throwable cursor = throwable;
|
||||
while (cursor != null) {
|
||||
if (cursor instanceof ServiceException) {
|
||||
return false;
|
||||
}
|
||||
if (cursor instanceof PrematureCloseException) {
|
||||
return true;
|
||||
}
|
||||
if (cursor instanceof IOException) {
|
||||
return true;
|
||||
}
|
||||
cursor = cursor.getCause();
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
private static final class ResetOnceHttpServer implements AutoCloseable {
|
||||
|
||||
private static final Duration RESET_DELAY = Duration.ofMillis(250);
|
||||
|
||||
private final ServerSocket serverSocket;
|
||||
private final ExecutorService acceptExecutor;
|
||||
private final ScheduledExecutorService scheduler;
|
||||
private final AtomicInteger connectionCount = new AtomicInteger();
|
||||
private final AtomicInteger freshResponses = new AtomicInteger();
|
||||
private final CountDownLatch warmupResponseSent = new CountDownLatch(1);
|
||||
private final CountDownLatch warmupReset = new CountDownLatch(1);
|
||||
private final List<String> servedBodies = new CopyOnWriteArrayList<>();
|
||||
private volatile boolean running = true;
|
||||
private volatile Socket warmupSocket;
|
||||
|
||||
ResetOnceHttpServer() throws IOException {
|
||||
this.serverSocket = new ServerSocket(0, 50, InetAddress.getByName("127.0.0.1"));
|
||||
this.serverSocket.setReuseAddress(true);
|
||||
this.acceptExecutor = Executors.newSingleThreadExecutor(r -> {
|
||||
Thread t = new Thread(r, "reset-once-http-accept");
|
||||
t.setDaemon(true);
|
||||
return t;
|
||||
});
|
||||
this.scheduler = Executors.newSingleThreadScheduledExecutor(r -> {
|
||||
Thread t = new Thread(r, "reset-once-http-scheduler");
|
||||
t.setDaemon(true);
|
||||
return t;
|
||||
});
|
||||
acceptExecutor.submit(this::acceptLoop);
|
||||
}
|
||||
|
||||
URI uri(String path) {
|
||||
Objects.requireNonNull(path, "path");
|
||||
if (!path.startsWith("/")) {
|
||||
path = "/" + path;
|
||||
}
|
||||
return URI.create("http://127.0.0.1:" + serverSocket.getLocalPort() + path);
|
||||
}
|
||||
|
||||
List<String> getServedBodies() {
|
||||
return new ArrayList<>(servedBodies);
|
||||
}
|
||||
|
||||
int getFreshResponseCount() {
|
||||
return freshResponses.get();
|
||||
}
|
||||
|
||||
void awaitWarmupResponse(Duration timeout) {
|
||||
awaitLatch(warmupResponseSent, timeout);
|
||||
}
|
||||
|
||||
void awaitWarmupConnectionReset(Duration timeout) {
|
||||
awaitLatch(warmupReset, timeout);
|
||||
}
|
||||
|
||||
void awaitFreshResponses(int expected, Duration timeout) {
|
||||
long deadline = System.nanoTime() + timeout.toNanos();
|
||||
while (freshResponses.get() < expected && System.nanoTime() < deadline) {
|
||||
try {
|
||||
Thread.sleep(10);
|
||||
} catch (InterruptedException ignored) {
|
||||
Thread.currentThread().interrupt();
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void awaitLatch(CountDownLatch latch, Duration timeout) {
|
||||
try {
|
||||
if (!latch.await(timeout.toMillis(), TimeUnit.MILLISECONDS)) {
|
||||
throw new IllegalStateException("Timed out waiting for latch");
|
||||
}
|
||||
} catch (InterruptedException ex) {
|
||||
Thread.currentThread().interrupt();
|
||||
throw new IllegalStateException(ex);
|
||||
}
|
||||
}
|
||||
|
||||
private void acceptLoop() {
|
||||
try {
|
||||
while (running) {
|
||||
Socket socket = serverSocket.accept();
|
||||
int index = connectionCount.incrementAndGet();
|
||||
handle(socket, index);
|
||||
}
|
||||
} catch (SocketException ex) {
|
||||
if (running) {
|
||||
throw new IllegalStateException("Unexpected server socket error", ex);
|
||||
}
|
||||
} catch (IOException ex) {
|
||||
if (running) {
|
||||
throw new IllegalStateException("I/O error in server", ex);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void handle(Socket socket, int index) {
|
||||
try {
|
||||
socket.setTcpNoDelay(true);
|
||||
RequestMetadata metadata = readRequest(socket);
|
||||
if (index == 1) {
|
||||
warmupSocket = socket;
|
||||
String body = "{\"stage\":\"warmup\",\"path\":\"" + metadata.path + "\"}";
|
||||
writeResponse(socket, body, true);
|
||||
servedBodies.add("warmup");
|
||||
warmupResponseSent.countDown();
|
||||
scheduler.schedule(() -> forceReset(socket), RESET_DELAY.toMillis(), TimeUnit.MILLISECONDS);
|
||||
} else if (index == 2) {
|
||||
// 模拟客户端复用到仍在连接池中的旧连接,但服务端已在请求到达后立即复位。
|
||||
servedBodies.add("reset");
|
||||
scheduler.schedule(() -> closeWithReset(socket), 10, TimeUnit.MILLISECONDS);
|
||||
} else {
|
||||
String body = "{\"stage\":\"fresh\",\"attempt\":" + index + "}";
|
||||
writeResponse(socket, body, false);
|
||||
servedBodies.add("fresh");
|
||||
freshResponses.incrementAndGet();
|
||||
socket.close();
|
||||
}
|
||||
} catch (IOException ex) {
|
||||
// ignore for the purpose of the test
|
||||
}
|
||||
}
|
||||
|
||||
private void forceReset(Socket socket) {
|
||||
try {
|
||||
if (!socket.isClosed()) {
|
||||
servedBodies.add("reset");
|
||||
closeWithReset(socket);
|
||||
}
|
||||
} finally {
|
||||
warmupReset.countDown();
|
||||
}
|
||||
}
|
||||
|
||||
private void closeWithReset(Socket socket) {
|
||||
try {
|
||||
if (!socket.isClosed()) {
|
||||
socket.setSoLinger(true, 0);
|
||||
socket.close();
|
||||
}
|
||||
} catch (IOException ignored) {
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
|
||||
private RequestMetadata readRequest(Socket socket) throws IOException {
|
||||
BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream(), StandardCharsets.US_ASCII));
|
||||
String requestLine = reader.readLine();
|
||||
if (requestLine == null) {
|
||||
return new RequestMetadata("unknown", 0);
|
||||
}
|
||||
String path = requestLine.split(" ", 3)[1];
|
||||
int contentLength = 0;
|
||||
String line;
|
||||
while ((line = reader.readLine()) != null && !line.isEmpty()) {
|
||||
if (line.toLowerCase(Locale.ROOT).startsWith("content-length:")) {
|
||||
contentLength = Integer.parseInt(line.substring(line.indexOf(':') + 1).trim());
|
||||
}
|
||||
}
|
||||
if (contentLength > 0) {
|
||||
char[] buffer = new char[contentLength];
|
||||
int read = 0;
|
||||
while (read < contentLength) {
|
||||
int r = reader.read(buffer, read, contentLength - read);
|
||||
if (r < 0) {
|
||||
break;
|
||||
}
|
||||
read += r;
|
||||
}
|
||||
}
|
||||
return new RequestMetadata(path, contentLength);
|
||||
}
|
||||
|
||||
private void writeResponse(Socket socket, String body, boolean keepAlive) throws IOException {
|
||||
byte[] bodyBytes = body.getBytes(StandardCharsets.UTF_8);
|
||||
StringBuilder builder = new StringBuilder()
|
||||
.append("HTTP/1.1 200 OK\r\n")
|
||||
.append("Content-Type: application/json\r\n")
|
||||
.append("Content-Length: ").append(bodyBytes.length).append("\r\n");
|
||||
if (keepAlive) {
|
||||
builder.append("Connection: keep-alive\r\n");
|
||||
} else {
|
||||
builder.append("Connection: close\r\n");
|
||||
}
|
||||
builder.append("\r\n");
|
||||
OutputStream outputStream = socket.getOutputStream();
|
||||
outputStream.write(builder.toString().getBytes(StandardCharsets.US_ASCII));
|
||||
outputStream.write(bodyBytes);
|
||||
outputStream.flush();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws Exception {
|
||||
running = false;
|
||||
try {
|
||||
serverSocket.close();
|
||||
} catch (IOException ignored) {
|
||||
}
|
||||
if (warmupSocket != null && !warmupSocket.isClosed()) {
|
||||
try {
|
||||
warmupSocket.close();
|
||||
} catch (IOException ignored) {
|
||||
}
|
||||
}
|
||||
scheduler.shutdownNow();
|
||||
acceptExecutor.shutdownNow();
|
||||
}
|
||||
|
||||
private record RequestMetadata(String path, int contentLength) {
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -16,6 +16,7 @@ import com.zt.plat.module.databus.dal.mysql.gateway.ApiPolicyRateLimitMapper;
|
||||
import com.zt.plat.module.databus.dal.mysql.gateway.ApiStepMapper;
|
||||
import com.zt.plat.module.databus.dal.mysql.gateway.ApiTransformMapper;
|
||||
import com.zt.plat.module.databus.enums.gateway.ApiStatusEnum;
|
||||
import com.zt.plat.module.databus.service.gateway.ApiVersionService;
|
||||
import com.zt.plat.module.databus.service.gateway.impl.ApiDefinitionServiceImpl;
|
||||
import jakarta.annotation.Resource;
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
@@ -63,6 +64,9 @@ class ApiDefinitionServiceImplTest extends BaseDbUnitTest {
|
||||
@MockBean
|
||||
private StringRedisTemplate stringRedisTemplate;
|
||||
|
||||
@MockBean
|
||||
private ApiVersionService apiVersionService;
|
||||
|
||||
@TestConfiguration
|
||||
static class JacksonTestConfiguration {
|
||||
|
||||
|
||||
Reference in New Issue
Block a user