Merge branch 'dev' into test
This commit is contained in:
@@ -10,12 +10,13 @@ import lombok.Getter;
|
||||
@Getter
|
||||
public enum ApiStatusEnum {
|
||||
|
||||
DRAFT(0),
|
||||
ONLINE(1),
|
||||
OFFLINE(2),
|
||||
DEPRECATED(3);
|
||||
DRAFT(0, "草稿"),
|
||||
ONLINE(1, "已上线"),
|
||||
OFFLINE(2, "已下线"),
|
||||
DEPRECATED(3, "已废弃");
|
||||
|
||||
private final int status;
|
||||
private final String label;
|
||||
|
||||
public static boolean isOnline(Integer status) {
|
||||
return status != null && status == ONLINE.status;
|
||||
@@ -25,4 +26,21 @@ public enum ApiStatusEnum {
|
||||
return status != null && status == DEPRECATED.status;
|
||||
}
|
||||
|
||||
public static ApiStatusEnum fromStatus(Integer status) {
|
||||
if (status == null) {
|
||||
return null;
|
||||
}
|
||||
for (ApiStatusEnum value : values()) {
|
||||
if (value.status == status) {
|
||||
return value;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
public static String labelOf(Integer status) {
|
||||
ApiStatusEnum value = fromStatus(status);
|
||||
return value != null ? value.label : "未知";
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -4,21 +4,46 @@ import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.boot.web.reactive.function.client.WebClientCustomizer;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.http.client.reactive.ReactorClientHttpConnector;
|
||||
import reactor.netty.http.client.HttpClient;
|
||||
import reactor.netty.resources.ConnectionProvider;
|
||||
|
||||
import java.time.Duration;
|
||||
|
||||
|
||||
@Configuration
|
||||
public class GatewayWebClientConfiguration {
|
||||
|
||||
private final int maxInMemorySize;
|
||||
private final long maxIdleTimeMillis;
|
||||
private final long evictInBackgroundMillis;
|
||||
private final ReactorClientHttpConnector httpConnector;
|
||||
|
||||
public GatewayWebClientConfiguration(
|
||||
@Value("${databus.gateway.web-client.max-in-memory-size:20971520}") int maxInMemorySize) {
|
||||
@Value("${databus.gateway.web-client.max-in-memory-size:20971520}") int maxInMemorySize,
|
||||
@Value("${databus.gateway.web-client.max-idle-time:45000}") long maxIdleTimeMillis,
|
||||
@Value("${databus.gateway.web-client.evict-in-background-interval:20000}") long evictInBackgroundMillis) {
|
||||
this.maxInMemorySize = maxInMemorySize;
|
||||
this.maxIdleTimeMillis = maxIdleTimeMillis > 0 ? maxIdleTimeMillis : 45000L;
|
||||
this.evictInBackgroundMillis = Math.max(evictInBackgroundMillis, 0L);
|
||||
this.httpConnector = buildConnector();
|
||||
}
|
||||
|
||||
@Bean
|
||||
public WebClientCustomizer gatewayWebClientCustomizer() {
|
||||
return builder -> builder.codecs(configurer ->
|
||||
configurer.defaultCodecs().maxInMemorySize(maxInMemorySize));
|
||||
return builder -> builder
|
||||
.clientConnector(httpConnector)
|
||||
.codecs(configurer -> configurer.defaultCodecs().maxInMemorySize(maxInMemorySize));
|
||||
}
|
||||
|
||||
private ReactorClientHttpConnector buildConnector() {
|
||||
ConnectionProvider.Builder providerBuilder = ConnectionProvider.builder("databus-gateway")
|
||||
.maxIdleTime(Duration.ofMillis(maxIdleTimeMillis));
|
||||
if (evictInBackgroundMillis > 0) {
|
||||
providerBuilder.evictInBackground(Duration.ofMillis(evictInBackgroundMillis));
|
||||
}
|
||||
ConnectionProvider provider = providerBuilder.build();
|
||||
HttpClient httpClient = HttpClient.create(provider).compress(true);
|
||||
return new ReactorClientHttpConnector(httpClient);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package com.zt.plat.module.databus.framework.integration.gateway.core;
|
||||
|
||||
import com.zt.plat.framework.common.exception.util.ServiceExceptionUtil;
|
||||
import com.zt.plat.module.databus.framework.integration.gateway.domain.ApiDefinitionAggregate;
|
||||
import com.zt.plat.module.databus.framework.integration.gateway.model.ApiInvocationContext;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import org.springframework.integration.core.MessagingTemplate;
|
||||
@@ -36,6 +37,24 @@ public class ApiFlowDispatcher {
|
||||
return (ApiInvocationContext) reply.getPayload();
|
||||
}
|
||||
|
||||
public ApiInvocationContext dispatchWithAggregate(ApiDefinitionAggregate aggregate, ApiInvocationContext context) {
|
||||
IntegrationFlowManager.DebugFlowHandle handle = integrationFlowManager.obtainDebugHandle(aggregate);
|
||||
MessageChannel channel = handle.channel();
|
||||
Message<ApiInvocationContext> message = MessageBuilder.withPayload(context)
|
||||
.setHeader("apiCode", aggregate.getDefinition().getApiCode())
|
||||
.setHeader("version", aggregate.getDefinition().getVersion())
|
||||
.build();
|
||||
try {
|
||||
Message<?> reply = messagingTemplate.sendAndReceive(channel, message);
|
||||
if (reply == null) {
|
||||
throw ServiceExceptionUtil.exception(API_FLOW_NO_REPLY, aggregate.getDefinition().getApiCode(), aggregate.getDefinition().getVersion());
|
||||
}
|
||||
return (ApiInvocationContext) reply.getPayload();
|
||||
} finally {
|
||||
integrationFlowManager.releaseDebugHandle(handle);
|
||||
}
|
||||
}
|
||||
|
||||
private MessageChannel requireInputChannel(String apiCode, String version) {
|
||||
// 未命中时,进行一次兜底补偿查询
|
||||
return integrationFlowManager.locateInputChannel(apiCode, version)
|
||||
|
||||
@@ -3,12 +3,15 @@ package com.zt.plat.module.databus.framework.integration.gateway.core;
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.zt.plat.framework.common.exception.ServiceException;
|
||||
import com.zt.plat.framework.common.exception.util.ServiceExceptionUtil;
|
||||
import com.zt.plat.module.databus.controller.admin.gateway.vo.ApiGatewayInvokeReqVO;
|
||||
import com.zt.plat.module.databus.framework.integration.config.ApiGatewayProperties;
|
||||
import com.zt.plat.module.databus.framework.integration.gateway.domain.ApiDefinitionAggregate;
|
||||
import com.zt.plat.module.databus.framework.integration.gateway.model.ApiGatewayResponse;
|
||||
import com.zt.plat.module.databus.framework.integration.gateway.model.ApiInvocationContext;
|
||||
import com.zt.plat.module.databus.framework.integration.gateway.security.GatewayJwtResolver;
|
||||
import com.zt.plat.module.databus.framework.integration.gateway.security.GatewaySecurityFilter;
|
||||
import com.zt.plat.module.databus.service.gateway.ApiDefinitionService;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.http.*;
|
||||
@@ -23,6 +26,9 @@ import java.lang.reflect.Array;
|
||||
import java.net.URI;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
|
||||
import static com.zt.plat.module.databus.service.gateway.impl.GatewayServiceErrorCodeConstants.API_DEFINITION_NOT_FOUND;
|
||||
|
||||
/**
|
||||
* Orchestrates API portal request mapping, dispatch and response building so that
|
||||
@@ -40,6 +46,7 @@ public class ApiGatewayExecutionService {
|
||||
private static final String HEADER_QUERY_STRING = org.springframework.integration.http.HttpHeaders.PREFIX + "queryString";
|
||||
private static final String HEADER_REMOTE_ADDRESS = org.springframework.integration.http.HttpHeaders.PREFIX + "remoteAddress";
|
||||
private static final String LOCAL_DEBUG_REMOTE_ADDRESS = "127.0.0.1";
|
||||
private static final String ATTR_DEBUG_INVOKE = "gatewayDebugInvoke";
|
||||
|
||||
private final ApiGatewayRequestMapper requestMapper;
|
||||
private final ApiFlowDispatcher apiFlowDispatcher;
|
||||
@@ -47,6 +54,7 @@ public class ApiGatewayExecutionService {
|
||||
private final ApiGatewayProperties properties;
|
||||
private final ObjectMapper objectMapper;
|
||||
private final ApiGatewayAccessLogger accessLogger;
|
||||
private final ApiDefinitionService apiDefinitionService;
|
||||
|
||||
/**
|
||||
* Maps a raw HTTP message (as provided by Spring Integration) into a context message.
|
||||
@@ -67,8 +75,16 @@ public class ApiGatewayExecutionService {
|
||||
ApiInvocationContext context = message.getPayload();
|
||||
accessLogger.onRequest(context);
|
||||
ApiInvocationContext responseContext;
|
||||
ApiDefinitionAggregate debugAggregate = null;
|
||||
try {
|
||||
responseContext = apiFlowDispatcher.dispatch(context.getApiCode(), context.getApiVersion(), context);
|
||||
if (Boolean.TRUE.equals(context.getAttributes().get(ATTR_DEBUG_INVOKE))) {
|
||||
debugAggregate = resolveDebugAggregate(context);
|
||||
}
|
||||
if (debugAggregate != null) {
|
||||
responseContext = apiFlowDispatcher.dispatchWithAggregate(debugAggregate, context);
|
||||
} else {
|
||||
responseContext = apiFlowDispatcher.dispatch(context.getApiCode(), context.getApiVersion(), context);
|
||||
}
|
||||
} catch (ServiceException ex) {
|
||||
errorProcessor.applyServiceException(context, ex);
|
||||
accessLogger.onException(context, ex);
|
||||
@@ -113,10 +129,20 @@ public class ApiGatewayExecutionService {
|
||||
ApiInvocationContext context = mappedMessage.getPayload();
|
||||
// Ensure query parameters & headers from debug payload are reflected after mapping.
|
||||
mergeDebugMetadata(context, reqVO);
|
||||
context.getAttributes().put(ATTR_DEBUG_INVOKE, Boolean.TRUE);
|
||||
ApiInvocationContext responseContext = dispatch(mappedMessage);
|
||||
return buildResponseEntity(responseContext);
|
||||
}
|
||||
|
||||
private ApiDefinitionAggregate resolveDebugAggregate(ApiInvocationContext context) {
|
||||
Optional<ApiDefinitionAggregate> activeDefinition = apiDefinitionService.findByCodeAndVersion(context.getApiCode(), context.getApiVersion());
|
||||
if (activeDefinition.isPresent()) {
|
||||
return activeDefinition.get();
|
||||
}
|
||||
return apiDefinitionService.findByCodeAndVersionIncludingInactive(context.getApiCode(), context.getApiVersion())
|
||||
.orElseThrow(() -> ServiceExceptionUtil.exception(API_DEFINITION_NOT_FOUND));
|
||||
}
|
||||
|
||||
private Message<?> buildDebugMessage(ApiGatewayInvokeReqVO reqVO) {
|
||||
Object payload = preparePayload(reqVO.getPayload());
|
||||
MessageBuilder<Object> builder = MessageBuilder.withPayload(payload);
|
||||
|
||||
@@ -12,6 +12,7 @@ import org.springframework.stereotype.Component;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
/**
|
||||
@@ -63,6 +64,33 @@ public class IntegrationFlowManager {
|
||||
return Optional.ofNullable(registration.getInputChannel());
|
||||
}
|
||||
|
||||
public DebugFlowHandle obtainDebugHandle(ApiDefinitionAggregate aggregate) {
|
||||
String key = key(aggregate.getDefinition().getApiCode(), aggregate.getDefinition().getVersion());
|
||||
IntegrationFlowContext.IntegrationFlowRegistration existing = activeRegistrations.get(key);
|
||||
if (existing != null) {
|
||||
return new DebugFlowHandle(existing.getInputChannel(), existing.getId(), false);
|
||||
}
|
||||
ApiFlowRegistration registration = apiFlowAssembler.assemble(aggregate);
|
||||
String debugId = registration.getFlowId() + "#debug#" + UUID.randomUUID();
|
||||
IntegrationFlowContext.IntegrationFlowRegistration debugRegistration = integrationFlowContext.registration(registration.getFlow())
|
||||
.id(debugId)
|
||||
.register();
|
||||
log.debug("[API-PORTAL] 临时注册调试流程 {} 对应 apiCode={} version={}", debugId, aggregate.getDefinition().getApiCode(), aggregate.getDefinition().getVersion());
|
||||
return new DebugFlowHandle(debugRegistration.getInputChannel(), debugRegistration.getId(), true);
|
||||
}
|
||||
|
||||
public void releaseDebugHandle(DebugFlowHandle handle) {
|
||||
if (handle == null || !handle.temporary) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
integrationFlowContext.remove(handle.registrationId);
|
||||
log.debug("[API-PORTAL] 已移除调试流程 {}", handle.registrationId);
|
||||
} catch (Exception ex) {
|
||||
log.warn("移除调试流程 {} 失败", handle.registrationId, ex);
|
||||
}
|
||||
}
|
||||
|
||||
private void registerFlow(ApiDefinitionAggregate aggregate) {
|
||||
String key = key(aggregate.getDefinition().getApiCode(), aggregate.getDefinition().getVersion());
|
||||
deregisterByKey(key);
|
||||
@@ -93,4 +121,24 @@ public class IntegrationFlowManager {
|
||||
private String key(String apiCode, String version) {
|
||||
return (apiCode + ":" + version).toLowerCase();
|
||||
}
|
||||
|
||||
public static final class DebugFlowHandle {
|
||||
private final MessageChannel channel;
|
||||
private final String registrationId;
|
||||
private final boolean temporary;
|
||||
|
||||
private DebugFlowHandle(MessageChannel channel, String registrationId, boolean temporary) {
|
||||
this.channel = channel;
|
||||
this.registrationId = registrationId;
|
||||
this.temporary = temporary;
|
||||
}
|
||||
|
||||
public MessageChannel channel() {
|
||||
return channel;
|
||||
}
|
||||
|
||||
public boolean temporary() {
|
||||
return temporary;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -23,7 +23,10 @@ import org.springframework.web.reactive.function.BodyInserters;
|
||||
import org.springframework.web.reactive.function.client.WebClient;
|
||||
import org.springframework.web.util.UriComponentsBuilder;
|
||||
import reactor.core.publisher.Mono;
|
||||
import reactor.netty.http.client.PrematureCloseException;
|
||||
import reactor.util.retry.Retry;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.time.Duration;
|
||||
import java.time.Instant;
|
||||
@@ -43,6 +46,8 @@ public class HttpStepHandler implements ApiStepHandler {
|
||||
private final WebClient.Builder webClientBuilder;
|
||||
private final ExpressionExecutor expressionExecutor;
|
||||
|
||||
private static final Duration RETRY_DELAY = Duration.ofMillis(200);
|
||||
|
||||
private static final Set<String> DEFAULT_FORWARDED_HEADERS = Set.of(
|
||||
"authorization",
|
||||
"zt-auth-token",
|
||||
@@ -108,6 +113,7 @@ public class HttpStepHandler implements ApiStepHandler {
|
||||
WebClient client = webClientBuilder.build();
|
||||
WebClient.RequestHeadersSpec<?> requestSpec = buildRequest(client, callSpec, requestPayload, headerMap, supportsBody);
|
||||
Mono<Object> responseMono = requestSpec.retrieve().bodyToMono(Object.class);
|
||||
responseMono = applyResilientRetry(responseMono, stepDefinition);
|
||||
Object response = timeout == null ? responseMono.block() : responseMono.block(timeout);
|
||||
payload.addStepResult(ApiStepResult.builder()
|
||||
.stepId(stepDefinition.getStep().getId())
|
||||
@@ -380,4 +386,37 @@ public class HttpStepHandler implements ApiStepHandler {
|
||||
// 所有请求都要传递请求体
|
||||
return true;
|
||||
}
|
||||
|
||||
private Mono<Object> applyResilientRetry(Mono<Object> responseMono, ApiStepDefinition stepDefinition) {
|
||||
return responseMono.retryWhen(Retry.fixedDelay(1, RETRY_DELAY)
|
||||
.filter(this::isRetryableException)
|
||||
.doBeforeRetry(signal -> {
|
||||
if (log.isWarnEnabled()) {
|
||||
log.warn("HTTP 步骤 stepId={} 第{}次重试,原因:{}",
|
||||
stepDefinition.getStep().getId(),
|
||||
signal.totalRetriesInARow(),
|
||||
signal.failure() == null ? "未知" : signal.failure().getMessage());
|
||||
}
|
||||
}));
|
||||
}
|
||||
|
||||
private boolean isRetryableException(Throwable throwable) {
|
||||
if (throwable == null) {
|
||||
return false;
|
||||
}
|
||||
Throwable cursor = throwable;
|
||||
while (cursor != null) {
|
||||
if (cursor instanceof ServiceException) {
|
||||
return false;
|
||||
}
|
||||
if (cursor instanceof PrematureCloseException) {
|
||||
return true;
|
||||
}
|
||||
if (cursor instanceof IOException) {
|
||||
return true;
|
||||
}
|
||||
cursor = cursor.getCause();
|
||||
}
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -24,6 +24,11 @@ public interface ApiDefinitionService {
|
||||
*/
|
||||
Optional<ApiDefinitionAggregate> findByCodeAndVersion(String apiCode, String version);
|
||||
|
||||
/**
|
||||
* Lookup API definition regardless of publish status.
|
||||
*/
|
||||
Optional<ApiDefinitionAggregate> findByCodeAndVersionIncludingInactive(String apiCode, String version);
|
||||
|
||||
/**
|
||||
* Refresh a specific definition by evicting cache and reloading from DB.
|
||||
*/
|
||||
|
||||
@@ -92,6 +92,12 @@ public class ApiDefinitionServiceImpl implements ApiDefinitionService {
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<ApiDefinitionAggregate> findByCodeAndVersionIncludingInactive(String apiCode, String version) {
|
||||
return apiDefinitionMapper.selectByCodeAndVersion(apiCode, version)
|
||||
.map(this::buildAggregate);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Optional<ApiDefinitionAggregate> refresh(String apiCode, String version) {
|
||||
String cacheKey = buildCacheKey(apiCode, version);
|
||||
|
||||
Reference in New Issue
Block a user