Merge branch 'dev' into test
This commit is contained in:
@@ -11,6 +11,7 @@ import com.zt.plat.module.databus.framework.integration.gateway.domain.ApiCreden
|
||||
import com.zt.plat.module.databus.framework.integration.gateway.domain.ApiDefinitionAggregate;
|
||||
import com.zt.plat.module.databus.framework.integration.gateway.model.ApiGatewayResponse;
|
||||
import com.zt.plat.module.databus.framework.integration.gateway.model.ApiInvocationContext;
|
||||
import com.zt.plat.module.databus.framework.integration.gateway.security.GatewayJwtResolver;
|
||||
import com.zt.plat.module.databus.framework.integration.gateway.security.GatewaySecurityFilter;
|
||||
import com.zt.plat.module.databus.service.gateway.ApiDefinitionService;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
@@ -171,6 +172,7 @@ public class ApiGatewayExecutionService {
|
||||
if (reqVO.getHeaders() != null) {
|
||||
requestHeaders.putAll(reqVO.getHeaders());
|
||||
}
|
||||
normalizeJwtHeaders(requestHeaders, reqVO.getQueryParams());
|
||||
requestHeaders.putIfAbsent(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE);
|
||||
builder.setHeader(HEADER_REQUEST_HEADERS, requestHeaders);
|
||||
requestHeaders.forEach((key, value) -> {
|
||||
@@ -304,4 +306,37 @@ public class ApiGatewayExecutionService {
|
||||
builder.queryParam(key, value);
|
||||
}
|
||||
|
||||
private void normalizeJwtHeaders(Map<String, Object> headers, Map<String, Object> queryParams) {
|
||||
String token = GatewayJwtResolver.resolveJwtToken(headers, queryParams, objectMapper);
|
||||
if (!StringUtils.hasText(token)) {
|
||||
return;
|
||||
}
|
||||
ensureHeaderValue(headers, GatewayJwtResolver.HEADER_ZT_AUTH_TOKEN, token);
|
||||
ensureHeaderValue(headers, HttpHeaders.AUTHORIZATION, "Bearer " + token);
|
||||
}
|
||||
|
||||
private void ensureHeaderValue(Map<String, Object> headers, String headerName, String value) {
|
||||
if (!StringUtils.hasText(headerName) || value == null) {
|
||||
return;
|
||||
}
|
||||
String existingKey = findHeaderKey(headers, headerName);
|
||||
if (existingKey != null) {
|
||||
headers.put(existingKey, value);
|
||||
} else {
|
||||
headers.put(headerName, value);
|
||||
}
|
||||
}
|
||||
|
||||
private String findHeaderKey(Map<String, Object> headers, String headerName) {
|
||||
if (headers == null || !StringUtils.hasText(headerName)) {
|
||||
return null;
|
||||
}
|
||||
for (String key : headers.keySet()) {
|
||||
if (headerName.equalsIgnoreCase(key)) {
|
||||
return key;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -4,8 +4,6 @@ import cn.hutool.core.collection.CollUtil;
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.github.benmanes.caffeine.cache.Caffeine;
|
||||
import com.github.benmanes.caffeine.cache.LoadingCache;
|
||||
import com.zt.plat.framework.common.exception.util.ServiceExceptionUtil;
|
||||
import com.zt.plat.framework.common.pojo.PageResult;
|
||||
import com.zt.plat.framework.common.util.object.BeanUtils;
|
||||
@@ -27,20 +25,16 @@ import com.zt.plat.module.databus.framework.integration.gateway.domain.ApiTransf
|
||||
import com.zt.plat.module.databus.service.gateway.ApiDefinitionService;
|
||||
import com.zt.plat.module.databus.service.gateway.ApiVersionService;
|
||||
import com.zt.plat.module.databus.service.gateway.ApiVersionSnapshotContextHolder;
|
||||
import jakarta.annotation.PostConstruct;
|
||||
import lombok.RequiredArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.ObjectProvider;
|
||||
import org.springframework.dao.DataAccessException;
|
||||
import org.springframework.data.redis.core.StringRedisTemplate;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
import org.springframework.util.CollectionUtils;
|
||||
import org.springframework.util.StringUtils;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static com.zt.plat.module.databus.service.gateway.impl.GatewayServiceErrorCodeConstants.*;
|
||||
@@ -50,8 +44,6 @@ import static com.zt.plat.module.databus.service.gateway.impl.GatewayServiceErro
|
||||
@RequiredArgsConstructor
|
||||
public class ApiDefinitionServiceImpl implements ApiDefinitionService {
|
||||
|
||||
private static final String REDIS_CACHE_PREFIX = "databus:api:def:";
|
||||
|
||||
private final ApiDefinitionMapper apiDefinitionMapper;
|
||||
private final ApiStepMapper apiStepMapper;
|
||||
private final ApiTransformMapper apiTransformMapper;
|
||||
@@ -60,19 +52,8 @@ public class ApiDefinitionServiceImpl implements ApiDefinitionService {
|
||||
private final ApiDefinitionCredentialMapper apiDefinitionCredentialMapper;
|
||||
private final ApiClientCredentialMapper apiClientCredentialMapper;
|
||||
private final ObjectMapper objectMapper;
|
||||
private final StringRedisTemplate stringRedisTemplate;
|
||||
private final ObjectProvider<ApiVersionService> apiVersionServiceProvider;
|
||||
|
||||
private LoadingCache<String, Optional<ApiDefinitionAggregate>> definitionCache;
|
||||
|
||||
@PostConstruct
|
||||
public void initCache() {
|
||||
definitionCache = Caffeine.newBuilder()
|
||||
.maximumSize(512)
|
||||
.expireAfterWrite(Duration.ofMinutes(5))
|
||||
.build(this::loadAggregateSync);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<ApiDefinitionAggregate> loadActiveDefinitions() {
|
||||
List<ApiDefinitionDO> definitions = apiDefinitionMapper.selectActiveDefinitions(Collections.singletonList(ApiStatusEnum.ONLINE.getStatus()));
|
||||
@@ -88,12 +69,9 @@ public class ApiDefinitionServiceImpl implements ApiDefinitionService {
|
||||
|
||||
@Override
|
||||
public Optional<ApiDefinitionAggregate> findByCodeAndVersion(String apiCode, String version) {
|
||||
String cacheKey = buildCacheKey(apiCode, version);
|
||||
try {
|
||||
return definitionCache.get(cacheKey);
|
||||
} catch (RuntimeException ex) {
|
||||
throw ServiceExceptionUtil.exception(API_DEFINITION_NOT_FOUND);
|
||||
}
|
||||
return apiDefinitionMapper.selectByCodeAndVersion(apiCode, version)
|
||||
.filter(definition -> ApiStatusEnum.isOnline(definition.getStatus()))
|
||||
.map(this::buildAggregate);
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -104,16 +82,12 @@ public class ApiDefinitionServiceImpl implements ApiDefinitionService {
|
||||
|
||||
@Override
|
||||
public Optional<ApiDefinitionAggregate> refresh(String apiCode, String version) {
|
||||
String cacheKey = buildCacheKey(apiCode, version);
|
||||
definitionCache.invalidate(cacheKey);
|
||||
deleteRedis(cacheKey);
|
||||
return findByCodeAndVersion(apiCode, version);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void refreshAllCache() {
|
||||
definitionCache.invalidateAll();
|
||||
clearRedisCacheForTenant(TenantContextHolder.getTenantId());
|
||||
// 缓存已移除,此处留空以保持接口兼容
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -165,14 +139,12 @@ public class ApiDefinitionServiceImpl implements ApiDefinitionService {
|
||||
ApiDefinitionDO updateObj = buildDefinitionDO(reqVO, existing);
|
||||
apiDefinitionMapper.updateById(updateObj);
|
||||
|
||||
invalidateCache(existing.getTenantId(), existing.getApiCode(), existing.getVersion());
|
||||
apiTransformMapper.deleteByApiId(existing.getId());
|
||||
apiStepMapper.deleteByApiId(existing.getId());
|
||||
apiDefinitionCredentialMapper.deleteByApiId(existing.getId());
|
||||
persistApiLevelTransforms(existing.getId(), reqVO.getApiLevelTransforms());
|
||||
persistSteps(existing.getId(), reqVO.getSteps());
|
||||
persistCredentialBindings(existing.getId(), reqVO.getCredentialIds());
|
||||
invalidateCache(updateObj.getTenantId(), updateObj.getApiCode(), updateObj.getVersion());
|
||||
} finally {
|
||||
if (skipSnapshot) {
|
||||
ApiVersionSnapshotContextHolder.clear();
|
||||
@@ -192,78 +164,12 @@ public class ApiDefinitionServiceImpl implements ApiDefinitionService {
|
||||
@Transactional(rollbackFor = Exception.class)
|
||||
public void delete(Long id) {
|
||||
ApiDefinitionDO existing = ensureExists(id);
|
||||
invalidateCache(existing.getTenantId(), existing.getApiCode(), existing.getVersion());
|
||||
apiTransformMapper.deleteByApiId(id);
|
||||
apiStepMapper.deleteByApiId(id);
|
||||
apiDefinitionCredentialMapper.deleteByApiId(id);
|
||||
apiDefinitionMapper.deleteById(id);
|
||||
}
|
||||
|
||||
private Optional<ApiDefinitionAggregate> loadAggregateSync(String cacheKey) {
|
||||
Optional<ApiDefinitionAggregate> cached = loadFromRedis(cacheKey);
|
||||
if (cached.isPresent()) {
|
||||
return cached;
|
||||
}
|
||||
String[] parts = cacheKey.split(":");
|
||||
String apiCode = parts[1];
|
||||
String version = parts[2];
|
||||
Optional<ApiDefinitionAggregate> aggregate = apiDefinitionMapper.selectByCodeAndVersion(apiCode, version)
|
||||
.filter(definition -> ApiStatusEnum.isOnline(definition.getStatus()))
|
||||
.map(this::buildAggregate);
|
||||
aggregate.ifPresent(value -> persistToRedis(cacheKey, value));
|
||||
return aggregate;
|
||||
}
|
||||
|
||||
private Optional<ApiDefinitionAggregate> loadFromRedis(String cacheKey) {
|
||||
try {
|
||||
String json = stringRedisTemplate.opsForValue().get(REDIS_CACHE_PREFIX + cacheKey);
|
||||
if (!StringUtils.hasText(json)) {
|
||||
return Optional.empty();
|
||||
}
|
||||
ApiDefinitionAggregate aggregate = objectMapper.readValue(json, ApiDefinitionAggregate.class);
|
||||
return Optional.of(aggregate);
|
||||
} catch (JsonProcessingException | DataAccessException ex) {
|
||||
log.warn("反序列化 Redis 中 key {} 的 API 定义聚合失败", cacheKey, ex);
|
||||
return Optional.empty();
|
||||
}
|
||||
}
|
||||
|
||||
private void persistToRedis(String cacheKey, ApiDefinitionAggregate aggregate) {
|
||||
try {
|
||||
String json = objectMapper.writeValueAsString(aggregate);
|
||||
stringRedisTemplate.opsForValue().set(REDIS_CACHE_PREFIX + cacheKey, json, 5, TimeUnit.MINUTES);
|
||||
} catch (JsonProcessingException | DataAccessException ex) {
|
||||
log.warn("将 API 定义聚合写入 Redis key {} 失败", cacheKey, ex);
|
||||
}
|
||||
}
|
||||
|
||||
private void deleteRedis(String cacheKey) {
|
||||
try {
|
||||
stringRedisTemplate.delete(REDIS_CACHE_PREFIX + cacheKey);
|
||||
} catch (DataAccessException ex) {
|
||||
log.warn("删除 Redis 中 key {} 的 API 定义聚合失败", cacheKey, ex);
|
||||
}
|
||||
}
|
||||
|
||||
private void clearRedisCacheForTenant(Long tenantId) {
|
||||
String tenantPart = tenantId == null ? "global" : tenantId.toString();
|
||||
String pattern = REDIS_CACHE_PREFIX + tenantPart + ":*";
|
||||
try {
|
||||
Set<String> keys = stringRedisTemplate.keys(pattern);
|
||||
if (CollectionUtils.isEmpty(keys)) {
|
||||
return;
|
||||
}
|
||||
stringRedisTemplate.delete(keys);
|
||||
} catch (DataAccessException ex) {
|
||||
log.warn("批量删除 Redis 中匹配 {} 的 API 定义聚合失败", pattern, ex);
|
||||
}
|
||||
}
|
||||
|
||||
private String buildCacheKey(String apiCode, String version) {
|
||||
Long tenantId = TenantContextHolder.getTenantId();
|
||||
return buildCacheKeyForTenant(tenantId, apiCode, version);
|
||||
}
|
||||
|
||||
/**
|
||||
* 构建包含步骤、变换、策略等元数据的聚合对象,供缓存与运行时直接使用。
|
||||
*/
|
||||
@@ -526,20 +432,6 @@ public class ApiDefinitionServiceImpl implements ApiDefinitionService {
|
||||
return TenantContextHolder.getTenantId();
|
||||
}
|
||||
|
||||
private void invalidateCache(Long tenantId, String apiCode, String version) {
|
||||
if (!StringUtils.hasText(apiCode) || !StringUtils.hasText(version)) {
|
||||
return;
|
||||
}
|
||||
String cacheKey = buildCacheKeyForTenant(tenantId, apiCode, version);
|
||||
definitionCache.invalidate(cacheKey);
|
||||
deleteRedis(cacheKey);
|
||||
}
|
||||
|
||||
private String buildCacheKeyForTenant(Long tenantId, String apiCode, String version) {
|
||||
String tenantPart = tenantId == null ? "global" : tenantId.toString();
|
||||
return tenantPart + ":" + apiCode.toLowerCase(Locale.ROOT) + ":" + version;
|
||||
}
|
||||
|
||||
/**
|
||||
* 先删除旧绑定,再对去重后的 credentialIds 批量插入,避免唯一约束冲突。
|
||||
*/
|
||||
|
||||
@@ -0,0 +1,53 @@
|
||||
package com.zt.plat.module.databus.framework.integration.gateway.core;
|
||||
|
||||
import com.zt.plat.framework.common.exception.ServiceException;
|
||||
import com.zt.plat.framework.common.exception.util.ServiceExceptionUtil;
|
||||
import com.zt.plat.module.databus.framework.integration.gateway.model.ApiInvocationContext;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.springframework.http.HttpStatus;
|
||||
|
||||
import static com.zt.plat.module.databus.service.gateway.impl.GatewayServiceErrorCodeConstants.API_CREDENTIAL_UNAUTHORIZED;
|
||||
import static com.zt.plat.module.databus.service.gateway.impl.GatewayServiceErrorCodeConstants.API_RATE_LIMIT_EXCEEDED;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
class ApiGatewayErrorProcessorTest {
|
||||
|
||||
private final ApiGatewayErrorProcessor processor = new ApiGatewayErrorProcessor();
|
||||
|
||||
@Test
|
||||
void applyServiceException_should_fill_status_message_and_body() {
|
||||
ApiInvocationContext context = ApiInvocationContext.create();
|
||||
ServiceException exception = ServiceExceptionUtil.exception(API_RATE_LIMIT_EXCEEDED);
|
||||
|
||||
processor.applyServiceException(context, exception);
|
||||
|
||||
assertThat(context.getResponseStatus()).isEqualTo(HttpStatus.TOO_MANY_REQUESTS.value());
|
||||
assertThat(context.getResponseMessage()).isNotBlank();
|
||||
assertThat(context.getResponseBody()).isInstanceOfSatisfying(java.util.Map.class, body -> {
|
||||
assertThat(body).containsEntry("errorCode", API_RATE_LIMIT_EXCEEDED.getCode());
|
||||
assertThat(body).containsKey("errorMessage");
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
void applyUnexpectedException_should_use_first_non_empty_message() {
|
||||
ApiInvocationContext context = ApiInvocationContext.create();
|
||||
RuntimeException exception = new RuntimeException("outer", new IllegalStateException("inner cause"));
|
||||
|
||||
processor.applyUnexpectedException(context, exception);
|
||||
|
||||
assertThat(context.getResponseStatus()).isEqualTo(HttpStatus.INTERNAL_SERVER_ERROR.value());
|
||||
assertThat(context.getResponseMessage()).isEqualTo("outer");
|
||||
assertThat(context.getResponseBody()).isInstanceOf(java.util.Map.class);
|
||||
}
|
||||
|
||||
@Test
|
||||
void resolveServiceException_should_traverse_causes() {
|
||||
ServiceException serviceException = ServiceExceptionUtil.exception(API_CREDENTIAL_UNAUTHORIZED);
|
||||
RuntimeException wrapped = new RuntimeException(new IllegalStateException(serviceException));
|
||||
|
||||
ServiceException resolved = processor.resolveServiceException(wrapped);
|
||||
|
||||
assertThat(resolved).isSameAs(serviceException);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,140 @@
|
||||
package com.zt.plat.module.databus.framework.integration.gateway.core;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.zt.plat.framework.common.exception.ServiceException;
|
||||
import com.zt.plat.framework.common.exception.util.ServiceExceptionUtil;
|
||||
import com.zt.plat.module.databus.controller.admin.gateway.vo.ApiGatewayInvokeReqVO;
|
||||
import com.zt.plat.module.databus.dal.dataobject.gateway.ApiDefinitionDO;
|
||||
import com.zt.plat.module.databus.framework.integration.config.ApiGatewayProperties;
|
||||
import com.zt.plat.module.databus.framework.integration.gateway.domain.ApiCredentialBinding;
|
||||
import com.zt.plat.module.databus.framework.integration.gateway.domain.ApiDefinitionAggregate;
|
||||
import com.zt.plat.module.databus.framework.integration.gateway.model.ApiGatewayResponse;
|
||||
import com.zt.plat.module.databus.framework.integration.gateway.model.ApiInvocationContext;
|
||||
import com.zt.plat.module.databus.service.gateway.ApiDefinitionService;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.junit.jupiter.MockitoExtension;
|
||||
import org.springframework.http.ResponseEntity;
|
||||
import org.springframework.integration.http.HttpHeaders;
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.messaging.support.MessageBuilder;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
|
||||
import static com.zt.plat.module.databus.service.gateway.impl.GatewayServiceErrorCodeConstants.API_CREDENTIAL_UNAUTHORIZED;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.ArgumentMatchers.eq;
|
||||
import static org.mockito.Mockito.*;
|
||||
|
||||
@ExtendWith(MockitoExtension.class)
|
||||
class ApiGatewayExecutionServiceTest {
|
||||
|
||||
private ApiGatewayExecutionService service;
|
||||
|
||||
@Mock
|
||||
private ApiFlowDispatcher apiFlowDispatcher;
|
||||
|
||||
@Mock
|
||||
private ApiGatewayErrorProcessor errorProcessor;
|
||||
|
||||
@Mock
|
||||
private ApiGatewayAccessLogger accessLogger;
|
||||
|
||||
@Mock
|
||||
private ApiDefinitionService apiDefinitionService;
|
||||
|
||||
private ApiGatewayProperties properties;
|
||||
|
||||
private ApiGatewayRequestMapper requestMapper;
|
||||
|
||||
private final ObjectMapper objectMapper = new ObjectMapper();
|
||||
|
||||
@BeforeEach
|
||||
void setUp() {
|
||||
properties = new ApiGatewayProperties();
|
||||
properties.setBasePath("/databus/api");
|
||||
properties.setEnableTenantHeader(true);
|
||||
properties.setTenantHeader("ZT-Tenant-Id");
|
||||
requestMapper = new ApiGatewayRequestMapper(objectMapper, properties);
|
||||
service = new ApiGatewayExecutionService(requestMapper, apiFlowDispatcher, errorProcessor, properties, objectMapper, accessLogger, apiDefinitionService);
|
||||
}
|
||||
|
||||
@Test
|
||||
void invokeForDebug_should_dispatch_with_inactive_definition() {
|
||||
ApiGatewayInvokeReqVO reqVO = new ApiGatewayInvokeReqVO();
|
||||
reqVO.setApiCode("demo.api");
|
||||
reqVO.setVersion("v1");
|
||||
reqVO.getHeaders().put("ZT-Auth-Token", "debug-token");
|
||||
reqVO.getHeaders().put(ApiGatewayProperties.APP_ID_HEADER, "app-1");
|
||||
reqVO.getQueryParams().put("trace", "1");
|
||||
reqVO.setPayload(Map.of("name", "alice"));
|
||||
|
||||
ApiDefinitionAggregate aggregate = ApiDefinitionAggregate.builder()
|
||||
.definition(definition("demo.api", "v1"))
|
||||
.build();
|
||||
|
||||
when(apiDefinitionService.findByCodeAndVersion(eq("demo.api"), eq("v1"))).thenReturn(Optional.empty());
|
||||
when(apiDefinitionService.findByCodeAndVersionIncludingInactive(eq("demo.api"), eq("v1"))).thenReturn(Optional.of(aggregate));
|
||||
when(apiFlowDispatcher.dispatchWithAggregate(eq(aggregate), any(ApiInvocationContext.class)))
|
||||
.thenAnswer(invocation -> {
|
||||
ApiInvocationContext ctx = invocation.getArgument(1);
|
||||
ctx.setResponseStatus(201);
|
||||
ctx.setResponseMessage("created");
|
||||
ctx.setResponseBody(Map.of("ok", true));
|
||||
return ctx;
|
||||
});
|
||||
|
||||
ResponseEntity<ApiGatewayResponse> responseEntity = service.invokeForDebug(reqVO);
|
||||
|
||||
assertThat(responseEntity.getStatusCodeValue()).isEqualTo(201);
|
||||
assertThat(responseEntity.getBody()).isNotNull();
|
||||
assertThat(responseEntity.getBody().getMessage()).isEqualTo("created");
|
||||
assertThat(responseEntity.getBody().getResponse()).isEqualTo(Map.of("ok", true));
|
||||
}
|
||||
|
||||
@Test
|
||||
void dispatch_should_apply_credential_authorization_for_non_debug() {
|
||||
ApiInvocationContext context = ApiInvocationContext.create();
|
||||
context.setApiCode("secure.api");
|
||||
context.setApiVersion("v1");
|
||||
|
||||
ApiDefinitionAggregate aggregate = ApiDefinitionAggregate.builder()
|
||||
.definition(definition("secure.api", "v1"))
|
||||
.credentialBindings(List.of(ApiCredentialBinding.builder().appId("app-x").build()))
|
||||
.build();
|
||||
|
||||
when(apiDefinitionService.findByCodeAndVersion(eq("secure.api"), eq("v1"))).thenReturn(Optional.of(aggregate));
|
||||
doAnswer(invocation -> {
|
||||
ApiInvocationContext ctx = invocation.getArgument(0);
|
||||
ServiceException ex = ServiceExceptionUtil.exception(API_CREDENTIAL_UNAUTHORIZED);
|
||||
ctx.setResponseStatus(403);
|
||||
ctx.setResponseMessage(ex.getMessage());
|
||||
return null;
|
||||
}).when(errorProcessor).applyServiceException(any(ApiInvocationContext.class), any(ServiceException.class));
|
||||
|
||||
Message<ApiInvocationContext> message = MessageBuilder.withPayload(context)
|
||||
.setHeader("apiCode", "secure.api")
|
||||
.setHeader("version", "v1")
|
||||
.setHeader(HttpHeaders.REQUEST_METHOD, "POST")
|
||||
.build();
|
||||
|
||||
ApiInvocationContext result = service.dispatch(message);
|
||||
|
||||
assertThat(result.getResponseStatus()).isEqualTo(403);
|
||||
verify(apiFlowDispatcher, never()).dispatch(anyString(), anyString(), any());
|
||||
verify(errorProcessor, times(1)).applyServiceException(any(ApiInvocationContext.class), any(ServiceException.class));
|
||||
}
|
||||
|
||||
private ApiDefinitionDO definition(String apiCode, String version) {
|
||||
ApiDefinitionDO definitionDO = new ApiDefinitionDO();
|
||||
definitionDO.setApiCode(apiCode);
|
||||
definitionDO.setVersion(version);
|
||||
return definitionDO;
|
||||
}
|
||||
}
|
||||
@@ -73,8 +73,8 @@ class ApiGatewayRequestMapperTest {
|
||||
|
||||
ApiInvocationContext context = mapper.map("", headers);
|
||||
|
||||
assertThat(context.getRequestHeaders().get("ZT-Auth-Token")).isEqualTo("token-123");
|
||||
assertThat(context.getRequestHeaders().get("zt-auth-token")).isEqualTo("token-123");
|
||||
assertThat(context.getRequestHeaders().get("ZT-Auth-Token")).isEqualTo(List.of("token-123"));
|
||||
assertThat(context.getRequestHeaders().get("zt-auth-token")).isEqualTo(List.of("token-123"));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
||||
@@ -0,0 +1,103 @@
|
||||
package com.zt.plat.module.databus.framework.integration.gateway.core;
|
||||
|
||||
import com.zt.plat.module.databus.dal.dataobject.gateway.ApiDefinitionDO;
|
||||
import com.zt.plat.module.databus.framework.integration.gateway.domain.ApiDefinitionAggregate;
|
||||
import com.zt.plat.module.databus.service.gateway.ApiDefinitionService;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.junit.jupiter.MockitoExtension;
|
||||
import org.springframework.integration.dsl.IntegrationFlow;
|
||||
import org.springframework.integration.dsl.context.IntegrationFlowContext;
|
||||
import org.springframework.messaging.MessageChannel;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.ArgumentMatchers.anyString;
|
||||
import static org.mockito.Mockito.*;
|
||||
|
||||
@ExtendWith(MockitoExtension.class)
|
||||
class IntegrationFlowManagerTest {
|
||||
|
||||
@Mock
|
||||
private IntegrationFlowContext integrationFlowContext;
|
||||
|
||||
@Mock
|
||||
private ApiDefinitionService apiDefinitionService;
|
||||
|
||||
@Mock
|
||||
private ApiFlowAssembler apiFlowAssembler;
|
||||
|
||||
@Mock
|
||||
private IntegrationFlowContext.IntegrationFlowRegistration registration;
|
||||
|
||||
private IntegrationFlowManager manager;
|
||||
|
||||
@BeforeEach
|
||||
void setUp() {
|
||||
manager = new IntegrationFlowManager(integrationFlowContext, apiDefinitionService, apiFlowAssembler);
|
||||
when(registration.getId()).thenReturn("flow-id");
|
||||
}
|
||||
|
||||
@Test
|
||||
void refreshAll_should_register_active_flows_and_remove_stale() {
|
||||
ApiDefinitionAggregate agg1 = aggregate("order.create", "v1");
|
||||
ApiDefinitionAggregate agg2 = aggregate("order.pay", "v1");
|
||||
|
||||
when(apiDefinitionService.loadActiveDefinitions()).thenReturn(List.of(agg1, agg2));
|
||||
IntegrationFlow flow = mock(IntegrationFlow.class);
|
||||
when(apiFlowAssembler.assemble(any())).thenReturn(ApiFlowRegistration.builder()
|
||||
.flowId("flow-id")
|
||||
.inputChannelName("ch")
|
||||
.flow(flow)
|
||||
.build());
|
||||
IntegrationFlowContext.IntegrationFlowRegistrationBuilder builder = mock(IntegrationFlowContext.IntegrationFlowRegistrationBuilder.class, RETURNS_DEEP_STUBS);
|
||||
when(integrationFlowContext.registration(any(IntegrationFlow.class))).thenReturn(builder);
|
||||
when(builder.id(anyString()).register()).thenReturn(registration);
|
||||
when(registration.getInputChannel()).thenReturn(mock(MessageChannel.class));
|
||||
|
||||
manager.refreshAll();
|
||||
|
||||
Optional<MessageChannel> channel = manager.locateInputChannel("order.create", "v1");
|
||||
assertThat(channel).isPresent();
|
||||
|
||||
when(apiDefinitionService.loadActiveDefinitions()).thenReturn(List.of(agg1));
|
||||
manager.refreshAll();
|
||||
|
||||
verify(integrationFlowContext, atLeastOnce()).remove(anyString());
|
||||
}
|
||||
|
||||
@Test
|
||||
void obtainDebugHandle_should_reuse_existing_registration_when_present() {
|
||||
ApiDefinitionAggregate agg = aggregate("demo", "v1");
|
||||
IntegrationFlow flow = mock(IntegrationFlow.class);
|
||||
when(apiFlowAssembler.assemble(any())).thenReturn(ApiFlowRegistration.builder()
|
||||
.flowId("flow-debug")
|
||||
.inputChannelName("ch")
|
||||
.flow(flow)
|
||||
.build());
|
||||
IntegrationFlowContext.IntegrationFlowRegistrationBuilder builder = mock(IntegrationFlowContext.IntegrationFlowRegistrationBuilder.class, RETURNS_DEEP_STUBS);
|
||||
when(integrationFlowContext.registration(any(IntegrationFlow.class))).thenReturn(builder);
|
||||
when(builder.id(anyString()).register()).thenReturn(registration);
|
||||
MessageChannel input = mock(MessageChannel.class);
|
||||
when(registration.getInputChannel()).thenReturn(input);
|
||||
when(apiDefinitionService.loadActiveDefinitions()).thenReturn(List.of(agg));
|
||||
|
||||
manager.refreshAll();
|
||||
IntegrationFlowManager.DebugFlowHandle handle = manager.obtainDebugHandle(agg);
|
||||
|
||||
assertThat(handle.channel()).isNotNull();
|
||||
assertThat(handle.temporary()).isFalse();
|
||||
}
|
||||
|
||||
private ApiDefinitionAggregate aggregate(String apiCode, String version) {
|
||||
ApiDefinitionDO definitionDO = new ApiDefinitionDO();
|
||||
definitionDO.setApiCode(apiCode);
|
||||
definitionDO.setVersion(version);
|
||||
return ApiDefinitionAggregate.builder().definition(definitionDO).build();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,94 @@
|
||||
package com.zt.plat.module.databus.framework.integration.gateway.policy;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.zt.plat.framework.common.exception.ServiceException;
|
||||
import com.zt.plat.module.databus.dal.dataobject.gateway.ApiDefinitionDO;
|
||||
import com.zt.plat.module.databus.dal.dataobject.gateway.ApiPolicyRateLimitDO;
|
||||
import com.zt.plat.module.databus.framework.integration.gateway.domain.ApiDefinitionAggregate;
|
||||
import com.zt.plat.module.databus.framework.integration.gateway.model.ApiInvocationContext;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.junit.jupiter.MockitoExtension;
|
||||
import org.springframework.dao.DataAccessResourceFailureException;
|
||||
import org.springframework.data.redis.core.StringRedisTemplate;
|
||||
import org.springframework.data.redis.core.ValueOperations;
|
||||
|
||||
import static com.zt.plat.module.databus.service.gateway.impl.GatewayServiceErrorCodeConstants.API_RATE_LIMIT_EVALUATION_FAILED;
|
||||
import static com.zt.plat.module.databus.service.gateway.impl.GatewayServiceErrorCodeConstants.API_RATE_LIMIT_EXCEEDED;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.Mockito.*;
|
||||
|
||||
@ExtendWith(MockitoExtension.class)
|
||||
class DefaultRateLimitPolicyEvaluatorTest {
|
||||
|
||||
@Mock
|
||||
private StringRedisTemplate stringRedisTemplate;
|
||||
|
||||
@Mock
|
||||
private ValueOperations<String, String> valueOperations;
|
||||
|
||||
private DefaultRateLimitPolicyEvaluator evaluator;
|
||||
|
||||
private ApiDefinitionAggregate aggregate;
|
||||
|
||||
@BeforeEach
|
||||
void setUp() {
|
||||
ObjectMapper objectMapper = new ObjectMapper();
|
||||
evaluator = new DefaultRateLimitPolicyEvaluator(objectMapper, stringRedisTemplate);
|
||||
|
||||
ApiDefinitionDO definitionDO = new ApiDefinitionDO();
|
||||
definitionDO.setApiCode("order.create");
|
||||
definitionDO.setVersion("v1");
|
||||
|
||||
ApiPolicyRateLimitDO rateLimitDO = new ApiPolicyRateLimitDO();
|
||||
rateLimitDO.setConfig("{\"limit\":2,\"windowSeconds\":60}");
|
||||
|
||||
aggregate = ApiDefinitionAggregate.builder()
|
||||
.definition(definitionDO)
|
||||
.rateLimitPolicy(rateLimitDO)
|
||||
.build();
|
||||
|
||||
when(stringRedisTemplate.opsForValue()).thenReturn(valueOperations);
|
||||
}
|
||||
|
||||
@Test
|
||||
void evaluate_should_set_expire_on_first_hit_and_pass_under_limit() {
|
||||
when(valueOperations.increment(any())).thenReturn(1L, 2L);
|
||||
|
||||
ApiInvocationContext context = ApiInvocationContext.create();
|
||||
context.getRequestHeaders().put("X-Client-Id", "c1");
|
||||
|
||||
evaluator.evaluate(aggregate, context);
|
||||
evaluator.evaluate(aggregate, context);
|
||||
|
||||
verify(stringRedisTemplate, times(2)).opsForValue();
|
||||
verify(valueOperations, times(2)).increment(any());
|
||||
verify(stringRedisTemplate).expire(any(), any());
|
||||
}
|
||||
|
||||
@Test
|
||||
void evaluate_should_throw_when_exceed_limit() {
|
||||
when(valueOperations.increment(any())).thenReturn(3L);
|
||||
|
||||
ApiInvocationContext context = ApiInvocationContext.create();
|
||||
|
||||
ServiceException ex = assertThrows(ServiceException.class, () -> evaluator.evaluate(aggregate, context));
|
||||
|
||||
assertThat(ex.getCode()).isEqualTo(API_RATE_LIMIT_EXCEEDED.getCode());
|
||||
}
|
||||
|
||||
@Test
|
||||
void evaluate_should_wrap_redis_failure() {
|
||||
when(valueOperations.increment(any())).thenThrow(new DataAccessResourceFailureException("redis down"));
|
||||
|
||||
ApiInvocationContext context = ApiInvocationContext.create();
|
||||
|
||||
ServiceException ex = assertThrows(ServiceException.class, () -> evaluator.evaluate(aggregate, context));
|
||||
|
||||
assertThat(ex.getCode()).isEqualTo(API_RATE_LIMIT_EVALUATION_FAILED.getCode());
|
||||
}
|
||||
}
|
||||
@@ -29,15 +29,17 @@ public final class DatabusApiInvocationExample {
|
||||
|
||||
public static final String TIMESTAMP = Long.toString(System.currentTimeMillis());
|
||||
|
||||
private static final String APP_ID = "iwork";
|
||||
private static final String APP_SECRET = "lpGXiNe/GMLk0vsbYGLa8eYxXq8tGhTbuu3/D4MJzIk=";
|
||||
// private static final String APP_ID = "ztmy";
|
||||
// private static final String APP_SECRET = "zFre/nTRGi7LpoFjN7oQkKeOT09x1fWTyIswrc702QQ=";
|
||||
// private static final String APP_ID = "iwork";
|
||||
// private static final String APP_SECRET = "lpGXiNe/GMLk0vsbYGLa8eYxXq8tGhTbuu3/D4MJzIk=";
|
||||
private static final String APP_ID = "ztmy";
|
||||
private static final String APP_SECRET = "zFre/nTRGi7LpoFjN7oQkKeOT09x1fWTyIswrc702QQ=";
|
||||
private static final String ENCRYPTION_TYPE = CryptoSignatureUtils.ENCRYPT_TYPE_AES;
|
||||
// private static final String TARGET_API = "http://172.16.46.63:30081/admin-api/databus/api/portal/callback/v1";
|
||||
// private static final String TARGET_API = "http://172.16.46.195:48080/admin-api/databus/api/portal/callback/v1";
|
||||
// private static final String TARGET_API = "http://172.16.46.195:48080/admin-api/databus/api/portal/lgstOpenApi/v1";
|
||||
// private static final String TARGET_API = "http://172.16.46.195:48080/admin-api/databus/api/portal/lgstOpenApi/v1";
|
||||
// private static final String TARGET_API = "http://localhost:48080/admin-api/databus/api/portal/callback/v1";
|
||||
private static final String TARGET_API = "http://localhost:48080/admin-api/databus/api/portal/testcbw/456";
|
||||
private static final String TARGET_API = "http://localhost:48080/admin-api/databus/api/portal/lgstOpenApi/v1";
|
||||
// private static final String TARGET_API = "http://localhost:48080/admin-api/databus/api/portal/testcbw/456";
|
||||
private static final HttpClient HTTP_CLIENT = HttpClient.newBuilder()
|
||||
.connectTimeout(Duration.ofSeconds(5))
|
||||
.build();
|
||||
|
||||
@@ -0,0 +1,46 @@
|
||||
package com.zt.plat.module.databus.framework.integration.gateway.security;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
class GatewayJwtResolverTest {
|
||||
|
||||
private final ObjectMapper objectMapper = new ObjectMapper();
|
||||
|
||||
@Test
|
||||
void resolveJwtToken_should_pick_token_header_case_insensitive() {
|
||||
Map<String, Object> headers = new HashMap<>();
|
||||
headers.put("zT-AuTh-ToKeN", "abc123");
|
||||
|
||||
String token = GatewayJwtResolver.resolveJwtToken(headers, null, objectMapper);
|
||||
|
||||
assertThat(token).isEqualTo("abc123");
|
||||
}
|
||||
|
||||
@Test
|
||||
void resolveJwtToken_should_strip_bearer_prefix_and_array_values() {
|
||||
Map<String, Object> headers = new HashMap<>();
|
||||
headers.put("Authorization", new String[]{"Bearer XYZ.jwt"});
|
||||
|
||||
String token = GatewayJwtResolver.resolveJwtToken(headers, null, objectMapper);
|
||||
|
||||
assertThat(token).isEqualTo("XYZ.jwt");
|
||||
}
|
||||
|
||||
@Test
|
||||
void resolveJwtToken_should_parse_structured_json_string_and_fallback_to_query_params() {
|
||||
Map<String, Object> headers = new HashMap<>();
|
||||
headers.put("Authorization", "{\"token\":\"json-token\"}");
|
||||
Map<String, Object> query = Map.of("token", List.of("q1", "q2"));
|
||||
|
||||
String token = GatewayJwtResolver.resolveJwtToken(headers, query, objectMapper);
|
||||
|
||||
assertThat(token).isEqualTo("json-token");
|
||||
}
|
||||
}
|
||||
@@ -1,6 +1,7 @@
|
||||
DELETE FROM "databus_api_transform";
|
||||
DELETE FROM "databus_api_step";
|
||||
DELETE FROM "databus_api_definition";
|
||||
DELETE FROM "databus_api_definition_credential";
|
||||
DELETE FROM "databus_policy_rate_limit";
|
||||
DELETE FROM "databus_policy_audit";
|
||||
DELETE FROM "databus_api_flow_publish";
|
||||
|
||||
@@ -97,3 +97,16 @@ CREATE TABLE IF NOT EXISTS databus_api_flow_publish (
|
||||
updater VARCHAR(64),
|
||||
deleted BOOLEAN
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS databus_api_definition_credential (
|
||||
id BIGINT PRIMARY KEY,
|
||||
api_id BIGINT,
|
||||
credential_id BIGINT,
|
||||
app_id VARCHAR(255),
|
||||
tenant_id BIGINT,
|
||||
create_time TIMESTAMP,
|
||||
update_time TIMESTAMP,
|
||||
creator VARCHAR(64),
|
||||
updater VARCHAR(64),
|
||||
deleted BOOLEAN
|
||||
);
|
||||
|
||||
Reference in New Issue
Block a user