1. 提高 databus api 的网络失败重试次数,避免复用旧链接导致的 connection reset 错误
2. 兼容顶级组织同步时的组织编码生成逻辑
This commit is contained in:
@@ -38,7 +38,7 @@ public class GatewayWebClientConfiguration {
|
||||
|
||||
private ReactorClientHttpConnector buildConnector() {
|
||||
ConnectionProvider.Builder providerBuilder = ConnectionProvider.builder("databus-gateway")
|
||||
.maxIdleTime(Duration.ofMillis(maxIdleTimeMillis));
|
||||
.maxIdleTime(Duration.ofMillis(maxIdleTimeMillis));
|
||||
if (evictInBackgroundMillis > 0) {
|
||||
providerBuilder.evictInBackground(Duration.ofMillis(evictInBackgroundMillis));
|
||||
}
|
||||
|
||||
@@ -47,6 +47,7 @@ public class HttpStepHandler implements ApiStepHandler {
|
||||
private final ExpressionExecutor expressionExecutor;
|
||||
|
||||
private static final Duration RETRY_DELAY = Duration.ofMillis(200);
|
||||
private static final int RETRY_ATTEMPTS = 3;
|
||||
|
||||
private static final Set<String> DEFAULT_FORWARDED_HEADERS = Set.of(
|
||||
"authorization",
|
||||
@@ -388,7 +389,7 @@ public class HttpStepHandler implements ApiStepHandler {
|
||||
}
|
||||
|
||||
private Mono<Object> applyResilientRetry(Mono<Object> responseMono, ApiStepDefinition stepDefinition) {
|
||||
return responseMono.retryWhen(Retry.fixedDelay(1, RETRY_DELAY)
|
||||
return responseMono.retryWhen(Retry.fixedDelay(RETRY_ATTEMPTS, RETRY_DELAY)
|
||||
.filter(this::isRetryableException)
|
||||
.doBeforeRetry(signal -> {
|
||||
if (log.isWarnEnabled()) {
|
||||
|
||||
@@ -11,32 +11,57 @@ import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.mockito.Mockito;
|
||||
import org.springframework.http.HttpHeaders;
|
||||
import org.springframework.http.HttpStatus;
|
||||
import org.springframework.http.MediaType;
|
||||
import org.springframework.http.client.reactive.ReactorClientHttpConnector;
|
||||
import org.springframework.messaging.MessageHeaders;
|
||||
import org.springframework.web.reactive.function.client.ClientResponse;
|
||||
import org.springframework.web.reactive.function.client.ExchangeFunction;
|
||||
import org.springframework.web.reactive.function.client.WebClient;
|
||||
import reactor.core.publisher.Mono;
|
||||
import reactor.netty.http.client.HttpClient;
|
||||
import reactor.netty.resources.ConnectionProvider;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.SocketException;
|
||||
import java.time.Duration;
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.assertj.core.api.Assertions.assertThatCode;
|
||||
|
||||
class HttpStepHandlerTest {
|
||||
|
||||
private MockWebServer server;
|
||||
private ExpressionExecutor expressionExecutor;
|
||||
private HttpStepHandler handler;
|
||||
private ConnectionProvider connectionProvider;
|
||||
|
||||
@BeforeEach
|
||||
void setUp() throws IOException {
|
||||
server = new MockWebServer();
|
||||
server.start();
|
||||
expressionExecutor = Mockito.mock(ExpressionExecutor.class);
|
||||
handler = new HttpStepHandler(WebClient.builder(), expressionExecutor);
|
||||
connectionProvider = ConnectionProvider.builder("http-step-handler-test")
|
||||
.maxConnections(1)
|
||||
.maxIdleTime(Duration.ofMinutes(2))
|
||||
.pendingAcquireMaxCount(-1)
|
||||
.build();
|
||||
HttpClient httpClient = HttpClient.create(connectionProvider);
|
||||
WebClient.Builder builder = WebClient.builder()
|
||||
.clientConnector(new ReactorClientHttpConnector(httpClient));
|
||||
handler = new HttpStepHandler(builder, expressionExecutor);
|
||||
}
|
||||
|
||||
@AfterEach
|
||||
void tearDown() throws IOException {
|
||||
if (connectionProvider != null) {
|
||||
connectionProvider.disposeLater().block(Duration.ofSeconds(5));
|
||||
}
|
||||
server.shutdown();
|
||||
}
|
||||
|
||||
@@ -102,4 +127,44 @@ class HttpStepHandlerTest {
|
||||
assertThat(request.getHeader("Content-Type")).contains("application/json");
|
||||
assertThat(request.getBody().readUtf8()).contains("\"amount\":100");
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldRecoverWhenReusingStaleConnectionAfterIdle() {
|
||||
AtomicInteger attemptCounter = new AtomicInteger();
|
||||
ExchangeFunction exchangeFunction = request -> {
|
||||
int attempt = attemptCounter.incrementAndGet();
|
||||
if (attempt <= 2) {
|
||||
return Mono.error(new IOException("Simulated connection reset", new SocketException("Connection reset")));
|
||||
}
|
||||
return Mono.just(ClientResponse.create(HttpStatus.OK)
|
||||
.header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
|
||||
.body("{\"ok\":true}")
|
||||
.build());
|
||||
};
|
||||
|
||||
HttpStepHandler simulatedHandler = new HttpStepHandler(WebClient.builder().exchangeFunction(exchangeFunction), expressionExecutor);
|
||||
|
||||
ApiStepDO stepDO = new ApiStepDO();
|
||||
stepDO.setId(3L);
|
||||
stepDO.setType("HTTP");
|
||||
stepDO.setTargetEndpoint("POST http://idle-reset.test/stale-connection");
|
||||
|
||||
ApiStepDefinition stepDefinition = ApiStepDefinition.builder()
|
||||
.step(stepDO)
|
||||
.metadata(Collections.emptyMap())
|
||||
.transforms(Collections.emptyList())
|
||||
.build();
|
||||
|
||||
var stepHandler = simulatedHandler.build(null, stepDefinition);
|
||||
|
||||
ApiInvocationContext context = ApiInvocationContext.create();
|
||||
context.setRequestBody(Collections.singletonMap("foo", "bar"));
|
||||
|
||||
assertThatCode(() -> stepHandler.handle(context, new MessageHeaders(Collections.emptyMap())))
|
||||
.doesNotThrowAnyException();
|
||||
|
||||
assertThat(attemptCounter.get()).isEqualTo(3);
|
||||
assertThat(context.getStepResults()).isNotEmpty();
|
||||
assertThat(context.getStepResults().get(0).isSuccess()).isTrue();
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user