1. 新增 api 绑定客户凭证进行权限校验

2. 去除 api 定义的缓存策略
3. 新增短信渠道
4. 新增用户信息模糊查询
5. 修复全局的单元测试
This commit is contained in:
chenbowen
2025-12-12 10:03:10 +08:00
parent 99645c5ac8
commit cae0b9e4af
66 changed files with 1323 additions and 211 deletions

View File

@@ -0,0 +1,53 @@
package com.zt.plat.module.databus.framework.integration.gateway.core;
import com.zt.plat.framework.common.exception.ServiceException;
import com.zt.plat.framework.common.exception.util.ServiceExceptionUtil;
import com.zt.plat.module.databus.framework.integration.gateway.model.ApiInvocationContext;
import org.junit.jupiter.api.Test;
import org.springframework.http.HttpStatus;
import static com.zt.plat.module.databus.service.gateway.impl.GatewayServiceErrorCodeConstants.API_CREDENTIAL_UNAUTHORIZED;
import static com.zt.plat.module.databus.service.gateway.impl.GatewayServiceErrorCodeConstants.API_RATE_LIMIT_EXCEEDED;
import static org.assertj.core.api.Assertions.assertThat;
class ApiGatewayErrorProcessorTest {
private final ApiGatewayErrorProcessor processor = new ApiGatewayErrorProcessor();
@Test
void applyServiceException_should_fill_status_message_and_body() {
ApiInvocationContext context = ApiInvocationContext.create();
ServiceException exception = ServiceExceptionUtil.exception(API_RATE_LIMIT_EXCEEDED);
processor.applyServiceException(context, exception);
assertThat(context.getResponseStatus()).isEqualTo(HttpStatus.TOO_MANY_REQUESTS.value());
assertThat(context.getResponseMessage()).isNotBlank();
assertThat(context.getResponseBody()).isInstanceOfSatisfying(java.util.Map.class, body -> {
assertThat(body).containsEntry("errorCode", API_RATE_LIMIT_EXCEEDED.getCode());
assertThat(body).containsKey("errorMessage");
});
}
@Test
void applyUnexpectedException_should_use_first_non_empty_message() {
ApiInvocationContext context = ApiInvocationContext.create();
RuntimeException exception = new RuntimeException("outer", new IllegalStateException("inner cause"));
processor.applyUnexpectedException(context, exception);
assertThat(context.getResponseStatus()).isEqualTo(HttpStatus.INTERNAL_SERVER_ERROR.value());
assertThat(context.getResponseMessage()).isEqualTo("outer");
assertThat(context.getResponseBody()).isInstanceOf(java.util.Map.class);
}
@Test
void resolveServiceException_should_traverse_causes() {
ServiceException serviceException = ServiceExceptionUtil.exception(API_CREDENTIAL_UNAUTHORIZED);
RuntimeException wrapped = new RuntimeException(new IllegalStateException(serviceException));
ServiceException resolved = processor.resolveServiceException(wrapped);
assertThat(resolved).isSameAs(serviceException);
}
}

View File

@@ -0,0 +1,140 @@
package com.zt.plat.module.databus.framework.integration.gateway.core;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.zt.plat.framework.common.exception.ServiceException;
import com.zt.plat.framework.common.exception.util.ServiceExceptionUtil;
import com.zt.plat.module.databus.controller.admin.gateway.vo.ApiGatewayInvokeReqVO;
import com.zt.plat.module.databus.dal.dataobject.gateway.ApiDefinitionDO;
import com.zt.plat.module.databus.framework.integration.config.ApiGatewayProperties;
import com.zt.plat.module.databus.framework.integration.gateway.domain.ApiCredentialBinding;
import com.zt.plat.module.databus.framework.integration.gateway.domain.ApiDefinitionAggregate;
import com.zt.plat.module.databus.framework.integration.gateway.model.ApiGatewayResponse;
import com.zt.plat.module.databus.framework.integration.gateway.model.ApiInvocationContext;
import com.zt.plat.module.databus.service.gateway.ApiDefinitionService;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.http.ResponseEntity;
import org.springframework.integration.http.HttpHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import static com.zt.plat.module.databus.service.gateway.impl.GatewayServiceErrorCodeConstants.API_CREDENTIAL_UNAUTHORIZED;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.*;
@ExtendWith(MockitoExtension.class)
class ApiGatewayExecutionServiceTest {
private ApiGatewayExecutionService service;
@Mock
private ApiFlowDispatcher apiFlowDispatcher;
@Mock
private ApiGatewayErrorProcessor errorProcessor;
@Mock
private ApiGatewayAccessLogger accessLogger;
@Mock
private ApiDefinitionService apiDefinitionService;
private ApiGatewayProperties properties;
private ApiGatewayRequestMapper requestMapper;
private final ObjectMapper objectMapper = new ObjectMapper();
@BeforeEach
void setUp() {
properties = new ApiGatewayProperties();
properties.setBasePath("/databus/api");
properties.setEnableTenantHeader(true);
properties.setTenantHeader("ZT-Tenant-Id");
requestMapper = new ApiGatewayRequestMapper(objectMapper, properties);
service = new ApiGatewayExecutionService(requestMapper, apiFlowDispatcher, errorProcessor, properties, objectMapper, accessLogger, apiDefinitionService);
}
@Test
void invokeForDebug_should_dispatch_with_inactive_definition() {
ApiGatewayInvokeReqVO reqVO = new ApiGatewayInvokeReqVO();
reqVO.setApiCode("demo.api");
reqVO.setVersion("v1");
reqVO.getHeaders().put("ZT-Auth-Token", "debug-token");
reqVO.getHeaders().put(ApiGatewayProperties.APP_ID_HEADER, "app-1");
reqVO.getQueryParams().put("trace", "1");
reqVO.setPayload(Map.of("name", "alice"));
ApiDefinitionAggregate aggregate = ApiDefinitionAggregate.builder()
.definition(definition("demo.api", "v1"))
.build();
when(apiDefinitionService.findByCodeAndVersion(eq("demo.api"), eq("v1"))).thenReturn(Optional.empty());
when(apiDefinitionService.findByCodeAndVersionIncludingInactive(eq("demo.api"), eq("v1"))).thenReturn(Optional.of(aggregate));
when(apiFlowDispatcher.dispatchWithAggregate(eq(aggregate), any(ApiInvocationContext.class)))
.thenAnswer(invocation -> {
ApiInvocationContext ctx = invocation.getArgument(1);
ctx.setResponseStatus(201);
ctx.setResponseMessage("created");
ctx.setResponseBody(Map.of("ok", true));
return ctx;
});
ResponseEntity<ApiGatewayResponse> responseEntity = service.invokeForDebug(reqVO);
assertThat(responseEntity.getStatusCodeValue()).isEqualTo(201);
assertThat(responseEntity.getBody()).isNotNull();
assertThat(responseEntity.getBody().getMessage()).isEqualTo("created");
assertThat(responseEntity.getBody().getResponse()).isEqualTo(Map.of("ok", true));
}
@Test
void dispatch_should_apply_credential_authorization_for_non_debug() {
ApiInvocationContext context = ApiInvocationContext.create();
context.setApiCode("secure.api");
context.setApiVersion("v1");
ApiDefinitionAggregate aggregate = ApiDefinitionAggregate.builder()
.definition(definition("secure.api", "v1"))
.credentialBindings(List.of(ApiCredentialBinding.builder().appId("app-x").build()))
.build();
when(apiDefinitionService.findByCodeAndVersion(eq("secure.api"), eq("v1"))).thenReturn(Optional.of(aggregate));
doAnswer(invocation -> {
ApiInvocationContext ctx = invocation.getArgument(0);
ServiceException ex = ServiceExceptionUtil.exception(API_CREDENTIAL_UNAUTHORIZED);
ctx.setResponseStatus(403);
ctx.setResponseMessage(ex.getMessage());
return null;
}).when(errorProcessor).applyServiceException(any(ApiInvocationContext.class), any(ServiceException.class));
Message<ApiInvocationContext> message = MessageBuilder.withPayload(context)
.setHeader("apiCode", "secure.api")
.setHeader("version", "v1")
.setHeader(HttpHeaders.REQUEST_METHOD, "POST")
.build();
ApiInvocationContext result = service.dispatch(message);
assertThat(result.getResponseStatus()).isEqualTo(403);
verify(apiFlowDispatcher, never()).dispatch(anyString(), anyString(), any());
verify(errorProcessor, times(1)).applyServiceException(any(ApiInvocationContext.class), any(ServiceException.class));
}
private ApiDefinitionDO definition(String apiCode, String version) {
ApiDefinitionDO definitionDO = new ApiDefinitionDO();
definitionDO.setApiCode(apiCode);
definitionDO.setVersion(version);
return definitionDO;
}
}

View File

@@ -73,8 +73,8 @@ class ApiGatewayRequestMapperTest {
ApiInvocationContext context = mapper.map("", headers);
assertThat(context.getRequestHeaders().get("ZT-Auth-Token")).isEqualTo("token-123");
assertThat(context.getRequestHeaders().get("zt-auth-token")).isEqualTo("token-123");
assertThat(context.getRequestHeaders().get("ZT-Auth-Token")).isEqualTo(List.of("token-123"));
assertThat(context.getRequestHeaders().get("zt-auth-token")).isEqualTo(List.of("token-123"));
}
@Test

View File

@@ -0,0 +1,103 @@
package com.zt.plat.module.databus.framework.integration.gateway.core;
import com.zt.plat.module.databus.dal.dataobject.gateway.ApiDefinitionDO;
import com.zt.plat.module.databus.framework.integration.gateway.domain.ApiDefinitionAggregate;
import com.zt.plat.module.databus.service.gateway.ApiDefinitionService;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.context.IntegrationFlowContext;
import org.springframework.messaging.MessageChannel;
import java.util.List;
import java.util.Optional;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.*;
@ExtendWith(MockitoExtension.class)
class IntegrationFlowManagerTest {
@Mock
private IntegrationFlowContext integrationFlowContext;
@Mock
private ApiDefinitionService apiDefinitionService;
@Mock
private ApiFlowAssembler apiFlowAssembler;
@Mock
private IntegrationFlowContext.IntegrationFlowRegistration registration;
private IntegrationFlowManager manager;
@BeforeEach
void setUp() {
manager = new IntegrationFlowManager(integrationFlowContext, apiDefinitionService, apiFlowAssembler);
when(registration.getId()).thenReturn("flow-id");
}
@Test
void refreshAll_should_register_active_flows_and_remove_stale() {
ApiDefinitionAggregate agg1 = aggregate("order.create", "v1");
ApiDefinitionAggregate agg2 = aggregate("order.pay", "v1");
when(apiDefinitionService.loadActiveDefinitions()).thenReturn(List.of(agg1, agg2));
IntegrationFlow flow = mock(IntegrationFlow.class);
when(apiFlowAssembler.assemble(any())).thenReturn(ApiFlowRegistration.builder()
.flowId("flow-id")
.inputChannelName("ch")
.flow(flow)
.build());
IntegrationFlowContext.IntegrationFlowRegistrationBuilder builder = mock(IntegrationFlowContext.IntegrationFlowRegistrationBuilder.class, RETURNS_DEEP_STUBS);
when(integrationFlowContext.registration(any(IntegrationFlow.class))).thenReturn(builder);
when(builder.id(anyString()).register()).thenReturn(registration);
when(registration.getInputChannel()).thenReturn(mock(MessageChannel.class));
manager.refreshAll();
Optional<MessageChannel> channel = manager.locateInputChannel("order.create", "v1");
assertThat(channel).isPresent();
when(apiDefinitionService.loadActiveDefinitions()).thenReturn(List.of(agg1));
manager.refreshAll();
verify(integrationFlowContext, atLeastOnce()).remove(anyString());
}
@Test
void obtainDebugHandle_should_reuse_existing_registration_when_present() {
ApiDefinitionAggregate agg = aggregate("demo", "v1");
IntegrationFlow flow = mock(IntegrationFlow.class);
when(apiFlowAssembler.assemble(any())).thenReturn(ApiFlowRegistration.builder()
.flowId("flow-debug")
.inputChannelName("ch")
.flow(flow)
.build());
IntegrationFlowContext.IntegrationFlowRegistrationBuilder builder = mock(IntegrationFlowContext.IntegrationFlowRegistrationBuilder.class, RETURNS_DEEP_STUBS);
when(integrationFlowContext.registration(any(IntegrationFlow.class))).thenReturn(builder);
when(builder.id(anyString()).register()).thenReturn(registration);
MessageChannel input = mock(MessageChannel.class);
when(registration.getInputChannel()).thenReturn(input);
when(apiDefinitionService.loadActiveDefinitions()).thenReturn(List.of(agg));
manager.refreshAll();
IntegrationFlowManager.DebugFlowHandle handle = manager.obtainDebugHandle(agg);
assertThat(handle.channel()).isNotNull();
assertThat(handle.temporary()).isFalse();
}
private ApiDefinitionAggregate aggregate(String apiCode, String version) {
ApiDefinitionDO definitionDO = new ApiDefinitionDO();
definitionDO.setApiCode(apiCode);
definitionDO.setVersion(version);
return ApiDefinitionAggregate.builder().definition(definitionDO).build();
}
}

View File

@@ -0,0 +1,94 @@
package com.zt.plat.module.databus.framework.integration.gateway.policy;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.zt.plat.framework.common.exception.ServiceException;
import com.zt.plat.module.databus.dal.dataobject.gateway.ApiDefinitionDO;
import com.zt.plat.module.databus.dal.dataobject.gateway.ApiPolicyRateLimitDO;
import com.zt.plat.module.databus.framework.integration.gateway.domain.ApiDefinitionAggregate;
import com.zt.plat.module.databus.framework.integration.gateway.model.ApiInvocationContext;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.dao.DataAccessResourceFailureException;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.ValueOperations;
import static com.zt.plat.module.databus.service.gateway.impl.GatewayServiceErrorCodeConstants.API_RATE_LIMIT_EVALUATION_FAILED;
import static com.zt.plat.module.databus.service.gateway.impl.GatewayServiceErrorCodeConstants.API_RATE_LIMIT_EXCEEDED;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.*;
@ExtendWith(MockitoExtension.class)
class DefaultRateLimitPolicyEvaluatorTest {
@Mock
private StringRedisTemplate stringRedisTemplate;
@Mock
private ValueOperations<String, String> valueOperations;
private DefaultRateLimitPolicyEvaluator evaluator;
private ApiDefinitionAggregate aggregate;
@BeforeEach
void setUp() {
ObjectMapper objectMapper = new ObjectMapper();
evaluator = new DefaultRateLimitPolicyEvaluator(objectMapper, stringRedisTemplate);
ApiDefinitionDO definitionDO = new ApiDefinitionDO();
definitionDO.setApiCode("order.create");
definitionDO.setVersion("v1");
ApiPolicyRateLimitDO rateLimitDO = new ApiPolicyRateLimitDO();
rateLimitDO.setConfig("{\"limit\":2,\"windowSeconds\":60}");
aggregate = ApiDefinitionAggregate.builder()
.definition(definitionDO)
.rateLimitPolicy(rateLimitDO)
.build();
when(stringRedisTemplate.opsForValue()).thenReturn(valueOperations);
}
@Test
void evaluate_should_set_expire_on_first_hit_and_pass_under_limit() {
when(valueOperations.increment(any())).thenReturn(1L, 2L);
ApiInvocationContext context = ApiInvocationContext.create();
context.getRequestHeaders().put("X-Client-Id", "c1");
evaluator.evaluate(aggregate, context);
evaluator.evaluate(aggregate, context);
verify(stringRedisTemplate, times(2)).opsForValue();
verify(valueOperations, times(2)).increment(any());
verify(stringRedisTemplate).expire(any(), any());
}
@Test
void evaluate_should_throw_when_exceed_limit() {
when(valueOperations.increment(any())).thenReturn(3L);
ApiInvocationContext context = ApiInvocationContext.create();
ServiceException ex = assertThrows(ServiceException.class, () -> evaluator.evaluate(aggregate, context));
assertThat(ex.getCode()).isEqualTo(API_RATE_LIMIT_EXCEEDED.getCode());
}
@Test
void evaluate_should_wrap_redis_failure() {
when(valueOperations.increment(any())).thenThrow(new DataAccessResourceFailureException("redis down"));
ApiInvocationContext context = ApiInvocationContext.create();
ServiceException ex = assertThrows(ServiceException.class, () -> evaluator.evaluate(aggregate, context));
assertThat(ex.getCode()).isEqualTo(API_RATE_LIMIT_EVALUATION_FAILED.getCode());
}
}

View File

@@ -29,15 +29,17 @@ public final class DatabusApiInvocationExample {
public static final String TIMESTAMP = Long.toString(System.currentTimeMillis());
private static final String APP_ID = "iwork";
private static final String APP_SECRET = "lpGXiNe/GMLk0vsbYGLa8eYxXq8tGhTbuu3/D4MJzIk=";
// private static final String APP_ID = "ztmy";
// private static final String APP_SECRET = "zFre/nTRGi7LpoFjN7oQkKeOT09x1fWTyIswrc702QQ=";
// private static final String APP_ID = "iwork";
// private static final String APP_SECRET = "lpGXiNe/GMLk0vsbYGLa8eYxXq8tGhTbuu3/D4MJzIk=";
private static final String APP_ID = "ztmy";
private static final String APP_SECRET = "zFre/nTRGi7LpoFjN7oQkKeOT09x1fWTyIswrc702QQ=";
private static final String ENCRYPTION_TYPE = CryptoSignatureUtils.ENCRYPT_TYPE_AES;
// private static final String TARGET_API = "http://172.16.46.63:30081/admin-api/databus/api/portal/callback/v1";
// private static final String TARGET_API = "http://172.16.46.195:48080/admin-api/databus/api/portal/callback/v1";
// private static final String TARGET_API = "http://172.16.46.195:48080/admin-api/databus/api/portal/lgstOpenApi/v1";
// private static final String TARGET_API = "http://172.16.46.195:48080/admin-api/databus/api/portal/lgstOpenApi/v1";
// private static final String TARGET_API = "http://localhost:48080/admin-api/databus/api/portal/callback/v1";
private static final String TARGET_API = "http://localhost:48080/admin-api/databus/api/portal/testcbw/456";
private static final String TARGET_API = "http://localhost:48080/admin-api/databus/api/portal/lgstOpenApi/v1";
// private static final String TARGET_API = "http://localhost:48080/admin-api/databus/api/portal/testcbw/456";
private static final HttpClient HTTP_CLIENT = HttpClient.newBuilder()
.connectTimeout(Duration.ofSeconds(5))
.build();

View File

@@ -0,0 +1,46 @@
package com.zt.plat.module.databus.framework.integration.gateway.security;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.junit.jupiter.api.Test;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.assertj.core.api.Assertions.assertThat;
class GatewayJwtResolverTest {
private final ObjectMapper objectMapper = new ObjectMapper();
@Test
void resolveJwtToken_should_pick_token_header_case_insensitive() {
Map<String, Object> headers = new HashMap<>();
headers.put("zT-AuTh-ToKeN", "abc123");
String token = GatewayJwtResolver.resolveJwtToken(headers, null, objectMapper);
assertThat(token).isEqualTo("abc123");
}
@Test
void resolveJwtToken_should_strip_bearer_prefix_and_array_values() {
Map<String, Object> headers = new HashMap<>();
headers.put("Authorization", new String[]{"Bearer XYZ.jwt"});
String token = GatewayJwtResolver.resolveJwtToken(headers, null, objectMapper);
assertThat(token).isEqualTo("XYZ.jwt");
}
@Test
void resolveJwtToken_should_parse_structured_json_string_and_fallback_to_query_params() {
Map<String, Object> headers = new HashMap<>();
headers.put("Authorization", "{\"token\":\"json-token\"}");
Map<String, Object> query = Map.of("token", List.of("q1", "q2"));
String token = GatewayJwtResolver.resolveJwtToken(headers, query, objectMapper);
assertThat(token).isEqualTo("json-token");
}
}

View File

@@ -1,6 +1,7 @@
DELETE FROM "databus_api_transform";
DELETE FROM "databus_api_step";
DELETE FROM "databus_api_definition";
DELETE FROM "databus_api_definition_credential";
DELETE FROM "databus_policy_rate_limit";
DELETE FROM "databus_policy_audit";
DELETE FROM "databus_api_flow_publish";

View File

@@ -97,3 +97,16 @@ CREATE TABLE IF NOT EXISTS databus_api_flow_publish (
updater VARCHAR(64),
deleted BOOLEAN
);
CREATE TABLE IF NOT EXISTS databus_api_definition_credential (
id BIGINT PRIMARY KEY,
api_id BIGINT,
credential_id BIGINT,
app_id VARCHAR(255),
tenant_id BIGINT,
create_time TIMESTAMP,
update_time TIMESTAMP,
creator VARCHAR(64),
updater VARCHAR(64),
deleted BOOLEAN
);