Merge branch 'dev' into test

This commit is contained in:
chenbowen
2025-11-04 18:50:01 +08:00
6 changed files with 209 additions and 18 deletions

View File

@@ -38,7 +38,7 @@ public class GatewayWebClientConfiguration {
private ReactorClientHttpConnector buildConnector() {
ConnectionProvider.Builder providerBuilder = ConnectionProvider.builder("databus-gateway")
.maxIdleTime(Duration.ofMillis(maxIdleTimeMillis));
.maxIdleTime(Duration.ofMillis(maxIdleTimeMillis));
if (evictInBackgroundMillis > 0) {
providerBuilder.evictInBackground(Duration.ofMillis(evictInBackgroundMillis));
}

View File

@@ -47,6 +47,7 @@ public class HttpStepHandler implements ApiStepHandler {
private final ExpressionExecutor expressionExecutor;
private static final Duration RETRY_DELAY = Duration.ofMillis(200);
private static final int RETRY_ATTEMPTS = 3;
private static final Set<String> DEFAULT_FORWARDED_HEADERS = Set.of(
"authorization",
@@ -388,7 +389,7 @@ public class HttpStepHandler implements ApiStepHandler {
}
private Mono<Object> applyResilientRetry(Mono<Object> responseMono, ApiStepDefinition stepDefinition) {
return responseMono.retryWhen(Retry.fixedDelay(1, RETRY_DELAY)
return responseMono.retryWhen(Retry.fixedDelay(RETRY_ATTEMPTS, RETRY_DELAY)
.filter(this::isRetryableException)
.doBeforeRetry(signal -> {
if (log.isWarnEnabled()) {

View File

@@ -11,32 +11,57 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.client.reactive.ReactorClientHttpConnector;
import org.springframework.messaging.MessageHeaders;
import org.springframework.web.reactive.function.client.ClientResponse;
import org.springframework.web.reactive.function.client.ExchangeFunction;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Mono;
import reactor.netty.http.client.HttpClient;
import reactor.netty.resources.ConnectionProvider;
import java.io.IOException;
import java.net.SocketException;
import java.time.Duration;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatCode;
class HttpStepHandlerTest {
private MockWebServer server;
private ExpressionExecutor expressionExecutor;
private HttpStepHandler handler;
private ConnectionProvider connectionProvider;
@BeforeEach
void setUp() throws IOException {
server = new MockWebServer();
server.start();
expressionExecutor = Mockito.mock(ExpressionExecutor.class);
handler = new HttpStepHandler(WebClient.builder(), expressionExecutor);
connectionProvider = ConnectionProvider.builder("http-step-handler-test")
.maxConnections(1)
.maxIdleTime(Duration.ofMinutes(2))
.pendingAcquireMaxCount(-1)
.build();
HttpClient httpClient = HttpClient.create(connectionProvider);
WebClient.Builder builder = WebClient.builder()
.clientConnector(new ReactorClientHttpConnector(httpClient));
handler = new HttpStepHandler(builder, expressionExecutor);
}
@AfterEach
void tearDown() throws IOException {
if (connectionProvider != null) {
connectionProvider.disposeLater().block(Duration.ofSeconds(5));
}
server.shutdown();
}
@@ -102,4 +127,44 @@ class HttpStepHandlerTest {
assertThat(request.getHeader("Content-Type")).contains("application/json");
assertThat(request.getBody().readUtf8()).contains("\"amount\":100");
}
@Test
void shouldRecoverWhenReusingStaleConnectionAfterIdle() {
AtomicInteger attemptCounter = new AtomicInteger();
ExchangeFunction exchangeFunction = request -> {
int attempt = attemptCounter.incrementAndGet();
if (attempt <= 2) {
return Mono.error(new IOException("Simulated connection reset", new SocketException("Connection reset")));
}
return Mono.just(ClientResponse.create(HttpStatus.OK)
.header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
.body("{\"ok\":true}")
.build());
};
HttpStepHandler simulatedHandler = new HttpStepHandler(WebClient.builder().exchangeFunction(exchangeFunction), expressionExecutor);
ApiStepDO stepDO = new ApiStepDO();
stepDO.setId(3L);
stepDO.setType("HTTP");
stepDO.setTargetEndpoint("POST http://idle-reset.test/stale-connection");
ApiStepDefinition stepDefinition = ApiStepDefinition.builder()
.step(stepDO)
.metadata(Collections.emptyMap())
.transforms(Collections.emptyList())
.build();
var stepHandler = simulatedHandler.build(null, stepDefinition);
ApiInvocationContext context = ApiInvocationContext.create();
context.setRequestBody(Collections.singletonMap("foo", "bar"));
assertThatCode(() -> stepHandler.handle(context, new MessageHeaders(Collections.emptyMap())))
.doesNotThrowAnyException();
assertThat(attemptCounter.get()).isEqualTo(3);
assertThat(context.getStepResults()).isNotEmpty();
assertThat(context.getStepResults().get(0).isSuccess()).isTrue();
}
}