1. 启动默认调度,定时请求 databus api

2. 修复 databus 单元测试
3. 调整 iwork 回调业务编号
This commit is contained in:
chenbowen
2025-12-02 17:45:58 +08:00
parent 2e0b0a5e83
commit e11065a596
22 changed files with 662 additions and 407 deletions

View File

@@ -273,10 +273,10 @@ public class GatewaySecurityFilter extends OncePerRequestFilter {
String signatureType = resolveSignatureType(credential, security);
try {
// boolean valid = CryptoSignatureUtils.verifySignature(signaturePayload, signatureType);
// if (!valid) {
// throw new SecurityValidationException(HttpStatus.UNAUTHORIZED, "签名校验失败");
// }
boolean valid = CryptoSignatureUtils.verifySignature(signaturePayload, signatureType);
if (!valid) {
throw new SecurityValidationException(HttpStatus.UNAUTHORIZED, "签名校验失败");
}
} catch (IllegalArgumentException ex) {
throw new SecurityValidationException(HttpStatus.INTERNAL_SERVER_ERROR, "签名算法配置异常");
}

View File

@@ -393,8 +393,13 @@ public class HttpStepHandler implements ApiStepHandler {
}
private boolean supportsRequestBody(HttpMethod method) {
// 所有请求都要传递请求体
return true;
if (method == null) {
return true;
}
return !(HttpMethod.GET.equals(method)
|| HttpMethod.HEAD.equals(method)
|| HttpMethod.OPTIONS.equals(method)
|| HttpMethod.TRACE.equals(method));
}
private Mono<Object> applyResilientRetry(Mono<Object> responseMono, ApiStepDefinition stepDefinition) {

View File

@@ -2,8 +2,11 @@ package com.zt.plat.module.databus.controller.admin.gateway;
import com.zt.plat.module.databus.controller.admin.gateway.vo.ApiGatewayInvokeReqVO;
import com.zt.plat.module.databus.framework.integration.gateway.core.ApiGatewayExecutionService;
import com.zt.plat.module.databus.framework.integration.gateway.core.IntegrationFlowManager;
import com.zt.plat.module.databus.framework.integration.gateway.model.ApiGatewayResponse;
import com.zt.plat.module.databus.framework.integration.config.ApiGatewayProperties;
import com.zt.plat.module.databus.framework.integration.gateway.core.ApiGatewayAccessLogger;
import com.zt.plat.module.databus.service.gateway.ApiAnonymousUserService;
import com.zt.plat.module.databus.service.gateway.ApiClientCredentialService;
import com.zt.plat.module.databus.service.gateway.ApiDefinitionService;
import org.junit.jupiter.api.Test;
@@ -48,8 +51,17 @@ class ApiGatewayControllerTest {
@MockBean
private StringRedisTemplate stringRedisTemplate;
@MockBean
private ApiClientCredentialService apiClientCredentialService;
@MockBean
private IntegrationFlowManager integrationFlowManager;
@MockBean
private ApiClientCredentialService apiClientCredentialService;
@MockBean
private ApiAnonymousUserService apiAnonymousUserService;
@MockBean
private ApiGatewayAccessLogger apiGatewayAccessLogger;
@Test
void invokeShouldReturnGatewayEnvelope() throws Exception {

View File

@@ -28,16 +28,10 @@ import java.util.UUID;
public final class DatabusApiInvocationExample {
public static final String TIMESTAMP = Long.toString(System.currentTimeMillis());
// private static final String APP_ID = "ztmy";
// private static final String APP_SECRET = "zFre/nTRGi7LpoFjN7oQkKeOT09x1fWTyIswrc702QQ=";
// private static final String APP_ID = "test";
// private static final String APP_SECRET = "RSYtKXrXPLMy3oeh0cOro6QCioRUgqfnKCkDkNq78sI=";
private static final String APP_ID = "testAnnoy";
private static final String APP_SECRET = "jyGCymUjCFL2i3a4Tm3qBIkUrUl4ZgKPYvOU/47ZWcM=";
private static final String APP_ID = "ztmy";
private static final String APP_SECRET = "zFre/nTRGi7LpoFjN7oQkKeOT09x1fWTyIswrc702QQ=";
private static final String ENCRYPTION_TYPE = CryptoSignatureUtils.ENCRYPT_TYPE_AES;
// private static final String TARGET_API = "http://172.16.46.63:30081/admin-api/databus/api/portal/lgstOpenApi/v1";
private static final String TARGET_API = "http://127.0.0.1:48080/admin-api/databus/api/portal/test/1";
// private static final String TARGET_API = "http://127.0.0.1:48080/admin-api/databus/api/portal/lgstOpenApi/v1";
private static final String TARGET_API = "http://172.16.46.63:30081/admin-api/databus/api/portal/callback/v1";
private static final HttpClient HTTP_CLIENT = HttpClient.newBuilder()
.connectTimeout(Duration.ofSeconds(5))
.build();
@@ -88,8 +82,11 @@ public final class DatabusApiInvocationExample {
Map<String, Object> queryParams = new LinkedHashMap<>();
long extraTimestamp = 1761556157185L;
// String bodyJson = String.format("""
// {"operateFlag":"I","__interfaceType__":"R_MY_JY_03","data":{"endAddressName":"1","customerCompanyName":"中铜国贸","endAddressDetail":"测试地址","remark":" ","custSuppType":"1","shipperCompanyName":"中铜国贸","consigneeCorpCode":" ","consignerContactPhone":" 11","importFlag":"10","businessSupplierCode":" ","entrustMainCode":"WT3162251027027","endAddressCode":" ","specifyCarrierCorpCode":"10086689","materDetail":[{"detailStatus":"10","batchNo":"ZLTD2510ZTGM0017001","measureCodeMdm":"CU032110001","packType":" ","quantityPlanDetail":1,"deliveryOrderNo":"ZLTD2510ZTGM0017001","measureCode":"CU032110001","goodsSpecification":" ","measureUnitCode":"PAC","entrustDetailCode":"WT3162251027027001","brand":" ","soNumber":"68ecf0055502d565d22b378a"}],"operateFlag":1,"custSuppName":"上海锦生金属有限公司","startAddressCode":" ","planStartTime":1761556166000,"customerCompanyCode":0,"importMethod":"EXW","startAddressType":"10","shipperCompanyCode":"3162","deliverCondition":"20","businessSupplierName":" ","startAddressDetail":" 111","transType":"30","endAddressType":"20","planEndTime":1761556166000,"specifyCarrierCorpName":null,"custSuppFlag":"0101","businessType":"20","consigneeCorpName":" ","custSuppCode":"10086689","startAddressName":" 111","consignerContactName":" 11"},"datetime":"20251027170929","busiBillCode":"WT3162251027027","system":"BRMS","__requestId__":"f918841c-14fb-49eb-9640-c5d1b3d46bd1"}
// """, extraTimestamp);
String bodyJson = String.format("""
{"operateFlag":"I","__interfaceType__":"R_MY_JY_03","data":{"endAddressName":"1","customerCompanyName":"中铜国贸","endAddressDetail":"测试地址","remark":" ","custSuppType":"1","shipperCompanyName":"中铜国贸","consigneeCorpCode":" ","consignerContactPhone":" 11","importFlag":"10","businessSupplierCode":" ","entrustMainCode":"WT3162251027027","endAddressCode":" ","specifyCarrierCorpCode":"10086689","materDetail":[{"detailStatus":"10","batchNo":"ZLTD2510ZTGM0017001","measureCodeMdm":"CU032110001","packType":" ","quantityPlanDetail":1,"deliveryOrderNo":"ZLTD2510ZTGM0017001","measureCode":"CU032110001","goodsSpecification":" ","measureUnitCode":"PAC","entrustDetailCode":"WT3162251027027001","brand":" ","soNumber":"68ecf0055502d565d22b378a"}],"operateFlag":1,"custSuppName":"上海锦生金属有限公司","startAddressCode":" ","planStartTime":1761556166000,"customerCompanyCode":0,"importMethod":"EXW","startAddressType":"10","shipperCompanyCode":"3162","deliverCondition":"20","businessSupplierName":" ","startAddressDetail":" 111","transType":"30","endAddressType":"20","planEndTime":1761556166000,"specifyCarrierCorpName":null,"custSuppFlag":"0101","businessType":"20","consigneeCorpName":" ","custSuppCode":"10086689","startAddressName":" 111","consignerContactName":" 11"},"datetime":"20251027170929","busiBillCode":"WT3162251027027","system":"BRMS","__requestId__":"f918841c-14fb-49eb-9640-c5d1b3d46bd1"}
{}
""", extraTimestamp);
Map<String, Object> bodyParams = parseBodyJson(bodyJson);

View File

@@ -9,6 +9,7 @@ import com.zt.plat.framework.security.core.util.SecurityFrameworkUtils;
import com.zt.plat.framework.web.core.util.WebFrameworkUtils;
import com.zt.plat.module.databus.dal.dataobject.gateway.ApiClientCredentialDO;
import com.zt.plat.module.databus.framework.integration.config.ApiGatewayProperties;
import com.zt.plat.module.databus.framework.integration.gateway.core.ApiGatewayAccessLogger;
import com.zt.plat.module.databus.service.gateway.ApiAnonymousUserService;
import com.zt.plat.module.databus.service.gateway.ApiClientCredentialService;
import jakarta.servlet.http.HttpServletRequest;
@@ -25,7 +26,9 @@ import org.springframework.security.core.context.SecurityContextHolder;
import java.time.Duration;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.TreeMap;
import java.util.UUID;
import static com.zt.plat.module.databus.framework.integration.config.ApiGatewayProperties.*;
@@ -47,11 +50,12 @@ class GatewaySecurityFilterTest {
ApiGatewayProperties properties = createProperties();
properties.getSecurity().setEnabled(false);
StringRedisTemplate redisTemplate = mock(StringRedisTemplate.class);
ApiClientCredentialService credentialService = mock(ApiClientCredentialService.class);
ApiAnonymousUserService anonymousUserService = mock(ApiAnonymousUserService.class);
when(anonymousUserService.issueAccessToken(any())).thenReturn(Optional.empty());
ApiClientCredentialService credentialService = mock(ApiClientCredentialService.class);
ApiAnonymousUserService anonymousUserService = mock(ApiAnonymousUserService.class);
when(anonymousUserService.issueAccessToken(any())).thenReturn(Optional.empty());
GatewaySecurityFilter filter = new GatewaySecurityFilter(properties, redisTemplate, credentialService, anonymousUserService, new ObjectMapper());
ApiGatewayAccessLogger accessLogger = mock(ApiGatewayAccessLogger.class);
when(accessLogger.logEntrance(any())).thenReturn(1L);
GatewaySecurityFilter filter = new GatewaySecurityFilter(properties, redisTemplate, credentialService, anonymousUserService, new ObjectMapper(), accessLogger);
MockHttpServletRequest request = new MockHttpServletRequest("GET", "/admin-api/databus/api/portal/demo/v1");
request.setRemoteAddr("127.0.0.1");
@@ -74,7 +78,9 @@ class GatewaySecurityFilterTest {
ApiClientCredentialService credentialService = mock(ApiClientCredentialService.class);
ApiAnonymousUserService anonymousUserService = mock(ApiAnonymousUserService.class);
when(anonymousUserService.issueAccessToken(any())).thenReturn(Optional.empty());
GatewaySecurityFilter filter = new GatewaySecurityFilter(properties, redisTemplate, credentialService, anonymousUserService, new ObjectMapper());
ApiGatewayAccessLogger accessLogger = mock(ApiGatewayAccessLogger.class);
when(accessLogger.logEntrance(any())).thenReturn(1L);
GatewaySecurityFilter filter = new GatewaySecurityFilter(properties, redisTemplate, credentialService, anonymousUserService, new ObjectMapper(), accessLogger);
MockHttpServletRequest request = new MockHttpServletRequest("GET", "/admin-api/databus/api/portal/demo/v1");
request.setRemoteAddr("10.0.0.1");
@@ -97,6 +103,8 @@ class GatewaySecurityFilterTest {
ApiClientCredentialService credentialService = mock(ApiClientCredentialService.class);
ApiAnonymousUserService anonymousUserService = mock(ApiAnonymousUserService.class);
ApiGatewayAccessLogger accessLogger = mock(ApiGatewayAccessLogger.class);
when(accessLogger.logEntrance(any())).thenReturn(1L);
when(anonymousUserService.issueAccessToken(any())).thenReturn(Optional.empty());
ApiClientCredentialDO credential = new ApiClientCredentialDO();
credential.setAppId("demo-app");
@@ -108,13 +116,13 @@ class GatewaySecurityFilterTest {
properties.getSecurity().setRequireBodyEncryption(false);
properties.getSecurity().setEncryptResponse(false);
GatewaySecurityFilter filter = new GatewaySecurityFilter(properties, redisTemplate, credentialService, anonymousUserService, new ObjectMapper());
GatewaySecurityFilter filter = new GatewaySecurityFilter(properties, redisTemplate, credentialService, anonymousUserService, new ObjectMapper(), accessLogger);
MockHttpServletRequest request = new MockHttpServletRequest("GET", "/admin-api/databus/api/portal/demo/v1");
request.setRemoteAddr("127.0.0.1");
long timestamp = System.currentTimeMillis();
String nonce = UUID.randomUUID().toString().replaceAll("-", "");
String signature = signatureForApp("demo-app");
String signature = signatureForApp("demo-app", timestamp);
request.addHeader(APP_ID_HEADER, "demo-app");
request.addHeader(TIMESTAMP_HEADER, String.valueOf(timestamp));
request.addHeader(NONCE_HEADER, nonce);
@@ -142,13 +150,15 @@ class GatewaySecurityFilterTest {
ApiClientCredentialService credentialService = mock(ApiClientCredentialService.class);
ApiAnonymousUserService anonymousUserService = mock(ApiAnonymousUserService.class);
ApiGatewayAccessLogger accessLogger = mock(ApiGatewayAccessLogger.class);
when(accessLogger.logEntrance(any())).thenReturn(1L);
ApiClientCredentialDO credential = new ApiClientCredentialDO();
credential.setAppId("demo-app");
credential.setEncryptionKey("demo-secret-key");
credential.setEncryptionType(CryptoSignatureUtils.ENCRYPT_TYPE_AES);
when(credentialService.findActiveCredential("demo-app")).thenReturn(Optional.of(credential));
GatewaySecurityFilter filter = new GatewaySecurityFilter(properties, redisTemplate, credentialService, anonymousUserService, new ObjectMapper());
GatewaySecurityFilter filter = new GatewaySecurityFilter(properties, redisTemplate, credentialService, anonymousUserService, new ObjectMapper(), accessLogger);
MockHttpServletRequest request = new MockHttpServletRequest("GET", "/admin-api/databus/api/portal/demo/v1");
request.setRemoteAddr("127.0.0.1");
@@ -183,6 +193,8 @@ class GatewaySecurityFilterTest {
ApiClientCredentialService credentialService = mock(ApiClientCredentialService.class);
ApiAnonymousUserService anonymousUserService = mock(ApiAnonymousUserService.class);
ApiGatewayAccessLogger accessLogger = mock(ApiGatewayAccessLogger.class);
when(accessLogger.logEntrance(any())).thenReturn(1L);
ApiClientCredentialDO credential = new ApiClientCredentialDO();
credential.setAppId("demo-app");
credential.setSignatureType(null);
@@ -200,7 +212,7 @@ class GatewaySecurityFilterTest {
when(anonymousUserService.find(99L)).thenReturn(Optional.of(details));
when(anonymousUserService.issueAccessToken(details)).thenReturn(Optional.of("mock-token"));
GatewaySecurityFilter filter = new GatewaySecurityFilter(properties, redisTemplate, credentialService, anonymousUserService, new ObjectMapper());
GatewaySecurityFilter filter = new GatewaySecurityFilter(properties, redisTemplate, credentialService, anonymousUserService, new ObjectMapper(), accessLogger);
MockHttpServletRequest request = new MockHttpServletRequest("GET", "/admin-api/databus/api/portal/demo/v1");
request.setRemoteAddr("127.0.0.1");
@@ -209,7 +221,7 @@ class GatewaySecurityFilterTest {
request.addHeader(APP_ID_HEADER, "demo-app");
request.addHeader(TIMESTAMP_HEADER, String.valueOf(timestamp));
request.addHeader(NONCE_HEADER, nonce);
request.addHeader(SIGNATURE_HEADER, signatureForApp("demo-app"));
request.addHeader(SIGNATURE_HEADER, signatureForApp("demo-app", timestamp));
MockHttpServletResponse response = new MockHttpServletResponse();
MockFilterChain chain = new MockFilterChain();
@@ -231,7 +243,20 @@ class GatewaySecurityFilterTest {
return properties;
}
private String signatureForApp(String appId) {
return SecureUtil.md5("appId=" + appId);
private String signatureForApp(String appId, long timestamp) {
Map<String, Object> payload = new TreeMap<>();
payload.put(APP_ID_HEADER, appId);
payload.put(TIMESTAMP_HEADER, String.valueOf(timestamp));
StringBuilder sb = new StringBuilder();
payload.forEach((key, value) -> {
if (value == null) {
return;
}
if (sb.length() > 0) {
sb.append('&');
}
sb.append(key).append('=').append(value);
});
return SecureUtil.md5(sb.toString());
}
}

View File

@@ -1,360 +0,0 @@
package com.zt.plat.module.databus.framework.integration.gateway.step.impl;
import com.zt.plat.framework.common.exception.ServiceException;
import org.junit.jupiter.api.Test;
import org.springframework.http.MediaType;
import org.springframework.http.client.reactive.ReactorClientHttpConnector;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Mono;
import reactor.netty.http.client.HttpClient;
import reactor.netty.http.client.PrematureCloseException;
import reactor.netty.resources.ConnectionProvider;
import reactor.util.retry.Retry;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.*;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import static org.assertj.core.api.Assertions.assertThat;
/**
* Demonstrates the stale-connection scenario using the legacy vs the deferred retry pipeline.
*/
class HttpStepHandlerConnectionResetScenarioTest {
private static final Duration RETRY_DELAY = Duration.ofMillis(200);
private static final int RETRY_ATTEMPTS = 3;
private static final Duration BLOCK_TIMEOUT = Duration.ofSeconds(5);
private static final Duration RESET_WAIT = Duration.ofMillis(300);
@Test
void legacyPipelineLosesSuccessfulRetry() throws Exception {
try (ResetOnceHttpServer server = new ResetOnceHttpServer()) {
WebClient webClient = createWebClient();
URI uri = server.uri("/demo");
warmUp(server, webClient, uri);
server.awaitWarmupConnectionReset(RESET_WAIT);
legacyInvoke(webClient, uri, Map.of("mode", "legacy"));
server.awaitFreshResponses(1, Duration.ofSeconds(2));
assertThat(server.getFreshResponseCount()).isEqualTo(1);
assertThat(server.getServedBodies()).contains("reset", "fresh");
}
}
@Test
void deferredPipelinePropagatesSuccessfulRetry() throws Exception {
try (ResetOnceHttpServer server = new ResetOnceHttpServer()) {
WebClient webClient = createWebClient();
URI uri = server.uri("/demo");
warmUp(server, webClient, uri);
server.awaitWarmupConnectionReset(RESET_WAIT);
Object result = deferredInvoke(webClient, uri, Map.of("mode", "defer"));
assertThat(result).isInstanceOf(Map.class);
Map<?, ?> resultMap = (Map<?, ?>) result;
assertThat(resultMap.get("stage")).isEqualTo("fresh");
server.awaitFreshResponses(1, Duration.ofSeconds(2));
assertThat(server.getFreshResponseCount()).isEqualTo(1);
assertThat(server.getServedBodies()).contains("reset", "fresh");
}
}
private WebClient createWebClient() {
ConnectionProvider provider = ConnectionProvider.builder("http-step-handler-demo")
.maxConnections(1)
.pendingAcquireMaxCount(-1)
.maxIdleTime(Duration.ofSeconds(5))
.build();
HttpClient httpClient = HttpClient.create(provider).compress(true);
return WebClient.builder()
.clientConnector(new ReactorClientHttpConnector(httpClient))
.build();
}
private void warmUp(ResetOnceHttpServer server, WebClient webClient, URI uri) {
webClient.post()
.uri(uri)
.contentType(MediaType.APPLICATION_JSON)
.accept(MediaType.APPLICATION_JSON)
.bodyValue(Map.of("warm", true))
.retrieve()
.bodyToMono(Object.class)
.block(BLOCK_TIMEOUT);
server.awaitWarmupResponse(Duration.ofSeconds(2));
}
private Object legacyInvoke(WebClient webClient, URI uri, Object body) {
WebClient.RequestHeadersSpec<?> spec = webClient.post()
.uri(uri)
.contentType(MediaType.APPLICATION_JSON)
.accept(MediaType.APPLICATION_JSON)
.bodyValue(body);
Mono<Object> responseMono = spec.retrieve()
.bodyToMono(Object.class)
// 模拟业务中首次订阅后缓存失败结果的场景
.cache();
return responseMono.retryWhen(Retry.fixedDelay(RETRY_ATTEMPTS, RETRY_DELAY)
.filter(this::isRetryableException)
.onRetryExhaustedThrow((specification, signal) -> signal.failure()))
.block(BLOCK_TIMEOUT);
}
private Object deferredInvoke(WebClient webClient, URI uri, Object body) {
Mono<Object> responseMono = Mono.defer(() -> webClient.post()
.uri(uri)
.contentType(MediaType.APPLICATION_JSON)
.accept(MediaType.APPLICATION_JSON)
.bodyValue(body)
.retrieve()
.bodyToMono(Object.class)
// 通过 defer每次重试都会重新创建带缓存的响应 Mono
.cache());
return responseMono.retryWhen(Retry.fixedDelay(RETRY_ATTEMPTS, RETRY_DELAY)
.filter(this::isRetryableException))
.block(BLOCK_TIMEOUT);
}
private boolean isRetryableException(Throwable throwable) {
if (throwable == null) {
return false;
}
Throwable cursor = throwable;
while (cursor != null) {
if (cursor instanceof ServiceException) {
return false;
}
if (cursor instanceof PrematureCloseException) {
return true;
}
if (cursor instanceof IOException) {
return true;
}
cursor = cursor.getCause();
}
return false;
}
private static final class ResetOnceHttpServer implements AutoCloseable {
private static final Duration RESET_DELAY = Duration.ofMillis(250);
private final ServerSocket serverSocket;
private final ExecutorService acceptExecutor;
private final ScheduledExecutorService scheduler;
private final AtomicInteger connectionCount = new AtomicInteger();
private final AtomicInteger freshResponses = new AtomicInteger();
private final CountDownLatch warmupResponseSent = new CountDownLatch(1);
private final CountDownLatch warmupReset = new CountDownLatch(1);
private final List<String> servedBodies = new CopyOnWriteArrayList<>();
private volatile boolean running = true;
private volatile Socket warmupSocket;
ResetOnceHttpServer() throws IOException {
this.serverSocket = new ServerSocket(0, 50, InetAddress.getByName("127.0.0.1"));
this.serverSocket.setReuseAddress(true);
this.acceptExecutor = Executors.newSingleThreadExecutor(r -> {
Thread t = new Thread(r, "reset-once-http-accept");
t.setDaemon(true);
return t;
});
this.scheduler = Executors.newSingleThreadScheduledExecutor(r -> {
Thread t = new Thread(r, "reset-once-http-scheduler");
t.setDaemon(true);
return t;
});
acceptExecutor.submit(this::acceptLoop);
}
URI uri(String path) {
Objects.requireNonNull(path, "path");
if (!path.startsWith("/")) {
path = "/" + path;
}
return URI.create("http://127.0.0.1:" + serverSocket.getLocalPort() + path);
}
List<String> getServedBodies() {
return new ArrayList<>(servedBodies);
}
int getFreshResponseCount() {
return freshResponses.get();
}
void awaitWarmupResponse(Duration timeout) {
awaitLatch(warmupResponseSent, timeout);
}
void awaitWarmupConnectionReset(Duration timeout) {
awaitLatch(warmupReset, timeout);
}
void awaitFreshResponses(int expected, Duration timeout) {
long deadline = System.nanoTime() + timeout.toNanos();
while (freshResponses.get() < expected && System.nanoTime() < deadline) {
try {
Thread.sleep(10);
} catch (InterruptedException ignored) {
Thread.currentThread().interrupt();
return;
}
}
}
private void awaitLatch(CountDownLatch latch, Duration timeout) {
try {
if (!latch.await(timeout.toMillis(), TimeUnit.MILLISECONDS)) {
throw new IllegalStateException("Timed out waiting for latch");
}
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
throw new IllegalStateException(ex);
}
}
private void acceptLoop() {
try {
while (running) {
Socket socket = serverSocket.accept();
int index = connectionCount.incrementAndGet();
handle(socket, index);
}
} catch (SocketException ex) {
if (running) {
throw new IllegalStateException("Unexpected server socket error", ex);
}
} catch (IOException ex) {
if (running) {
throw new IllegalStateException("I/O error in server", ex);
}
}
}
private void handle(Socket socket, int index) {
try {
socket.setTcpNoDelay(true);
RequestMetadata metadata = readRequest(socket);
if (index == 1) {
warmupSocket = socket;
String body = "{\"stage\":\"warmup\",\"path\":\"" + metadata.path + "\"}";
writeResponse(socket, body, true);
servedBodies.add("warmup");
warmupResponseSent.countDown();
scheduler.schedule(() -> forceReset(socket), RESET_DELAY.toMillis(), TimeUnit.MILLISECONDS);
} else if (index == 2) {
// 模拟客户端复用到仍在连接池中的旧连接,但服务端已在请求到达后立即复位。
servedBodies.add("reset");
scheduler.schedule(() -> closeWithReset(socket), 10, TimeUnit.MILLISECONDS);
} else {
String body = "{\"stage\":\"fresh\",\"attempt\":" + index + "}";
writeResponse(socket, body, false);
servedBodies.add("fresh");
freshResponses.incrementAndGet();
socket.close();
}
} catch (IOException ex) {
// ignore for the purpose of the test
}
}
private void forceReset(Socket socket) {
try {
if (!socket.isClosed()) {
servedBodies.add("reset");
closeWithReset(socket);
}
} finally {
warmupReset.countDown();
}
}
private void closeWithReset(Socket socket) {
try {
if (!socket.isClosed()) {
socket.setSoLinger(true, 0);
socket.close();
}
} catch (IOException ignored) {
// ignore
}
}
private RequestMetadata readRequest(Socket socket) throws IOException {
BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream(), StandardCharsets.US_ASCII));
String requestLine = reader.readLine();
if (requestLine == null) {
return new RequestMetadata("unknown", 0);
}
String path = requestLine.split(" ", 3)[1];
int contentLength = 0;
String line;
while ((line = reader.readLine()) != null && !line.isEmpty()) {
if (line.toLowerCase(Locale.ROOT).startsWith("content-length:")) {
contentLength = Integer.parseInt(line.substring(line.indexOf(':') + 1).trim());
}
}
if (contentLength > 0) {
char[] buffer = new char[contentLength];
int read = 0;
while (read < contentLength) {
int r = reader.read(buffer, read, contentLength - read);
if (r < 0) {
break;
}
read += r;
}
}
return new RequestMetadata(path, contentLength);
}
private void writeResponse(Socket socket, String body, boolean keepAlive) throws IOException {
byte[] bodyBytes = body.getBytes(StandardCharsets.UTF_8);
StringBuilder builder = new StringBuilder()
.append("HTTP/1.1 200 OK\r\n")
.append("Content-Type: application/json\r\n")
.append("Content-Length: ").append(bodyBytes.length).append("\r\n");
if (keepAlive) {
builder.append("Connection: keep-alive\r\n");
} else {
builder.append("Connection: close\r\n");
}
builder.append("\r\n");
OutputStream outputStream = socket.getOutputStream();
outputStream.write(builder.toString().getBytes(StandardCharsets.US_ASCII));
outputStream.write(bodyBytes);
outputStream.flush();
}
@Override
public void close() throws Exception {
running = false;
try {
serverSocket.close();
} catch (IOException ignored) {
}
if (warmupSocket != null && !warmupSocket.isClosed()) {
try {
warmupSocket.close();
} catch (IOException ignored) {
}
}
scheduler.shutdownNow();
acceptExecutor.shutdownNow();
}
private record RequestMetadata(String path, int contentLength) {
}
}
}

View File

@@ -16,6 +16,7 @@ import com.zt.plat.module.databus.dal.mysql.gateway.ApiPolicyRateLimitMapper;
import com.zt.plat.module.databus.dal.mysql.gateway.ApiStepMapper;
import com.zt.plat.module.databus.dal.mysql.gateway.ApiTransformMapper;
import com.zt.plat.module.databus.enums.gateway.ApiStatusEnum;
import com.zt.plat.module.databus.service.gateway.ApiVersionService;
import com.zt.plat.module.databus.service.gateway.impl.ApiDefinitionServiceImpl;
import jakarta.annotation.Resource;
import org.junit.jupiter.api.AfterEach;
@@ -63,6 +64,9 @@ class ApiDefinitionServiceImplTest extends BaseDbUnitTest {
@MockBean
private StringRedisTemplate stringRedisTemplate;
@MockBean
private ApiVersionService apiVersionService;
@TestConfiguration
static class JacksonTestConfiguration {