# Databus 模块 API 功能与第三方调用说明 > 适用范围:`zt-module-databus`(Server 侧)+ `zt-module-databus-api`(接口定义)。本文基于 2025-11-20 主干分支代码。 ## 1. 模块定位与整体能力 - **目标**:对外暴露统一的数据/业务编排网关,允许后台在可视化界面中配置 API、步骤、变换与限流策略,并即时发布到运行态。 - **核心特性**: 1. API 全生命周期管理(定义、版本、回滚、发布缓存刷新)。 2. 编排引擎基于 Spring Integration 动态装配,支持 Start/HTTP/RPC/Script/End 步骤及 JSON 变换链路。 3. 多重安全防护:**精确 IP 白/黑名单(仅支持单 IP,不支持 CIDR 段)**、应用凭证、时间戳 + 随机串、报文加解密、签名、防重放、租户隔离、匿名固定用户等。 4. QoS 能力:可插拔限流策略(Redis 固定窗口计数)、访问日志、追踪 ID & Step 级结果入库。 5. Debug 支持:管理端 `POST /databus/gateway/invoke` 可注入任意参数模拟真实调用。 ## 2. 运行时架构概览 | 组件 | 位置 | 作用 | | --- | --- | --- | | `GatewaySecurityFilter` | `framework.integration.gateway.security` | 过滤并校验所有落在 `databus.api-portal.base-path` 之下的 HTTP 请求,完成 IP 校验、报文解密、签名、防重放、匿名用户注入、响应加密。 | | `ApiGatewayExecutionService` | `framework.integration.gateway.core` | 将 HTTP 请求映射为 `ApiInvocationContext`,调度 Integration Flow,构造统一响应。 | | `IntegrationFlowManager` | `framework.integration.gateway.core` | 按 `apiCode + version` 动态注册 Spring Integration Flow,支持热刷新与调试临时 Flow。 | | `ApiFlowDispatcher` | 同上 | 依据 apiCode/version 找到输入通道,发送请求并等待 `ApiInvocationContext` 回传。 | | `PolicyAdvisorFactory` + `DefaultRateLimitPolicyEvaluator` | `framework.integration.gateway.core/policy` | 在 Flow 上织入限流等策略,当前默认实现支持 Redis 固定窗口。 | | `ApiGatewayAccessLogger` | `framework.integration.gateway.core` | 生成访问日志 `databus_api_access_log`,记录 Trace、请求/响应、耗时、步骤结果等。 | | 管控 REST 控制器 | `controller.admin.gateway.*` | 管理 API 定义、版本、凭证、策略、访问日志等。 | ## 4. 管控端 REST 接口速查 | 模块 | 方法 | 路径 | 说明 | | --- | --- | --- | --- | | API 定义 | GET | `/databus/gateway/definition/page` | 分页查询(支持 code/描述筛选)。 | | | GET | `/databus/gateway/definition/{id}` | 详情(含步骤、变换、限流绑定)。 | | | POST | `/databus/gateway/definition` | 新建定义,必填步骤(至少 Start+End)。 | | | PUT | `/databus/gateway/definition` | 更新并自动刷新对应 Flow。 | | | DELETE | `/databus/gateway/definition/{id}` | 删除并注销 Flow。 | | API 网关 | POST | `/databus/gateway/invoke` | 管理端调试调用。 | | | GET | `/databus/gateway/definitions` | 拉取当前已上线定义(供灰度/网关缓存)。 | | | POST | `/databus/gateway/cache/refresh` | 强制刷新所有 Flow 缓存。 | | API 版本 | GET | `/databus/gateway/version/get?id=` | 查询版本详情(自动还原 snapshotData)。 | | | GET | `/databus/gateway/version/page` | 分页。 | | | GET | `/databus/gateway/version/list?apiId=` | 列出某 API 的全部版本。 | | | PUT | `/databus/gateway/version/rollback` | 根据 `id + remark` 回滚。 | | | GET | `/databus/gateway/version/compare` | 差异对比(sourceId/targetId)。 | | 客户端凭证 | GET | `/databus/gateway/credential/page` | 分页。 | | | GET | `/databus/gateway/credential/get?id=` | 详情(含匿名配置)。 | | | POST | `/databus/gateway/credential/create` | 新增凭证。 | | | PUT | `/databus/gateway/credential/update` | 更新。 | | | DELETE | `/databus/gateway/credential/delete?id=` | 删除。 | | | GET | `/databus/gateway/credential/list-simple` | 下拉使用。 | | 限流策略 | GET | `/databus/gateway/policy/rate-limit/page` | 分页检索。 | | | GET | `/databus/gateway/policy/rate-limit/{id}` | 详情。 | | | GET | `/databus/gateway/policy/rate-limit/simple-list` | 精简列表。 | | | POST/PUT/DELETE | `/databus/gateway/policy/rate-limit` | 新增/更新/删除。 | | 访问日志 | GET | `/databus/gateway/access-log/page` | 分页(需 `databus:gateway:access-log:query` 权限)。 | | | GET | `/databus/gateway/access-log/get?id=` | 单条详情(自动补充 API 描述)。 | > 所有接口默认返回 `CommonResult` 包装,字段 `code/message/data`。必要时参考对应 VO(位置 `controller.admin.gateway.vo`)。 ## 5. API 生命周期管理要点 1. **状态机**:`ApiStatusEnum`(草稿/已上线/已下线/已废弃)。Integration Flow 只加载 `ONLINE` 状态定义。 2. **版本快照**:每次保存时写入 `databus_api_version`,可通过 `snapshotData` 一键恢复(`rollback` 接口)。 3. **变换校验**:保存时会校验同一级 `TransformPhaseEnum` 不可重复,并确保 Start/End 唯一且位于首尾。 4. **缓存刷新**: - 单 API:创建/更新/删除后自动调用 `IntegrationFlowManager.refresh(apiCode, version)`。 - 全量:管理员可调用 `/databus/gateway/cache/refresh` 做兜底。 ## 6. 网关请求路径与响应格式 - **默认 Base Path**:`/admin-api/databus/api/portal`(可通过 `databus.api-portal.base-path` 覆盖;兼容旧版 `/databus/api/portal`)。 - **最终路径**:`{basePath}/{apiCode}/{version}`,示例 `/admin-api/databus/api/portal/order.create/v1`。 - **支持方法**:GET/POST/PUT/DELETE/PATCH,均被映射为 `ApiInvocationContext.httpMethod`。 - **响应包装**: ```json { "code": 200, "message": "OK", "response": { "bizField": "value" }, "traceId": "c8a3d52f-..." } ``` > `code` 与 HTTP 状态保持一致;`response` 为 API 变换后的业务体;所有错误也沿用该 Envelope(若启用响应加密则返回 Base64 字串)。 ## 7. 配置项(`application.yml`)重点 > 说明:下表均有默认值,除非特别标注“必填”,其余可按需覆盖或直接使用默认。未配置 `allowed-ips` 表示放行所有来源;如开启多租户透传才需设置 `tenant-header`。 ```yaml databus: api-portal: base-path: /admin-api/databus/api/portal # 仅支持精确 IP;留空表示全放行。示例:允许两个固定出口 IP allowed-ips: [10.0.0.12, 10.0.0.13] denied-ips: [] enable-tenant-header: true tenant-header: ZT-Tenant-Id # 仅当 enable-tenant-header=true 时才需要配置 security: enabled: true signature-type: MD5 # 或 SHA256 encryption-type: AES # 或 DES allowed-clock-skew-seconds: 300 nonce-ttl-seconds: 600 require-body-encryption: true encrypt-response: true gateway: web-client: connection-pool-enabled: true # 默认启用 Reactor Netty 连接池,可在排查连接复用/长连接异常时设为 false ``` ### 必填/必选提示 - 无需在 yml 中显式写入必填项;若需要启用租户透传,请同步配置 `tenant-header`。 - 安全校验依赖“客户端凭证”里的 `encryptionKey/encryptionType/signatureType`;当 `require-body-encryption=true` 且凭证缺少密钥时,调用会失败(HTTP 500 `应用未配置加密密钥`)。 - 限流能力取决于 API 绑定的策略;未绑定即不做限流。`connection-pool-enabled` 仅用于排障,可保持默认。 > `GatewaySecurityFilter` 会自动注册到最高优先级 +10,确保该路径的请求先经过安全校验。 关闭连接池后,每次 HTTP Step 请求都会新建 TCP 连接,适合短期定位“连接被复用导致 Reset/超时”的场景,但会带来额外的握手开销;切换时可关注启动日志中的 `Databus gateway WebClient pooling` 提示。 ## 8. 第三方调用流程详解 ### 8.1 前置准备 1. **申请凭证**:在后台创建 `API 客户端凭证`,得到: - `appId`(对应 `ZT-App-Id` 头) - `encryptionKey`(用于 AES/DES 对称加密,服务器使用 `CryptoSignatureUtils.decrypt` 解密) - `encryptionType`、`signatureType` - `allowAnonymous` = true 时需选择一个固定系统用户(服务器将自动颁发内部 JWT)。 2. **确定 API**:记录 `apiCode`、`version`、请求方法、入参/变换契约。 3. **网络白名单**:将第三方出口 IP 加入 `allowed-ips`,否则直接返回 403。 4. **Redis 要求**:需保证 Redis 可用(用于 nonce、防重放、限流计数)。 ### 8.2 请求构建步骤 | 序号 | 操作 | 说明 | | --- | --- | --- | | 1 | 生成时间戳 | `timestamp = System.currentTimeMillis()`,与服务器时间差 ≤ 300s。 | | 2 | 生成随机串 | `nonce` 长度≥8,可使用 `UUID.randomUUID().toString().replace("-", "")`。 | | 3 | 准备明文 Body | 例如 `{"orderNo":"SO20251120001"}`,记为 `plainBody`。 | | 4 | 计算签名 | 使用**明文**请求数据构造签名载荷:Query 参数 + JSON Body 字段(若 Body 非 JSON 则整体放入 `body` 字段)+ `ZT-App-Id` + `ZT-Timestamp` + `ZT-Nonce`,按字典序拼接 `key=value` 以 `&` 连接,使用 `MD5/SHA256` 计算;结果写入 `ZT-Signature`。 | | 5 | 加密请求体 | 使用凭证的 `encryptionKey + encryptionType` 对 `plainBody` 进行对称加密,Base64 结果作为 HTTP Body;Content-Type 可设 `text/plain` 或 `application/json`。 | | 6 | 组装请求头 | `ZT-App-Id`, `ZT-Timestamp`, `ZT-Nonce`, `ZT-Signature`, `ZT-Tenant-Id`(可选), `X-Client-Id`(建议,与限流相关),如有自带 JWT 则设置 `Authorization`。 | | 7 | 发送请求 | URL = `https://{host}{basePath}/{apiCode}/{version}`,方法与 API 定义保持一致。 | #### 签名字段示例 ```text ZT-App-Id=demo-app &ZT-Nonce=0c5e2df9a1 &ZT-Timestamp=1732070400000 &orderNo=SO20251120001 ``` - Query 参数将被拼接为 `key=value`(多值以逗号连接),自动忽略 `signature` 字段。 - Body 为 JSON 时会按字段展开参与签名;仅在非 JSON 场景下才使用整体报文字符串作为 `body` 字段参与签名。 #### cURL 示例 ```bash curl -X POST "https://gw.example.com/admin-api/databus/api/portal/order.create/v1" \ -H "ZT-App-Id: demo-app" \ -H "ZT-Timestamp: 1732070400000" \ -H "ZT-Nonce: 0c5e2df9a1" \ -H "ZT-Signature: 8e377..." \ -H "X-Client-Id: mall" \ -H "Content-Type: text/plain" \ -d "Q2hhcnNldGV4dC1CYXNlNjQgZW5jcnlwdGVkIGJvZHk=" ``` > `-d` 的实际内容应当是 AES/ DES 加密后的 Base64 字符串。 ### 8.3 响应处理 1. 读取 HTTP 状态与 `ApiGatewayResponse.code/message/traceId`。 2. 若 `security.encrypt-response=true`,则响应体本身是加密串,需要使用同一 `encryptionKey/encryptionType` 解密得到 JSON,再解析 `response` 字段。 3. `traceId` 可用于后台日志及 `访问日志` 页面关联排查。 ### 8.4 错误与重试策略 | 场景 | 表现 | 处理建议 | | --- | --- | --- | | 时间戳/Nonce 不合法 | HTTP 401,`message` = `请求到达时间超出 300s`/`重复请求` | 校准服务器时间;`nonce` 不可重复(Redis TTL 默认 600s)。 | | 签名失败 | HTTP 401,`message` = `签名校验失败` | 检查签名字符串、字符编码、大小写。 | | 未配置密钥 | HTTP 500,`message` = `应用未配置加密密钥` | 在后台凭证中补齐密钥与算法,或取消强制加密。 | | 限流触发 | HTTP 429,`message` = `请求触发限流策略` | 调整 `X-Client-Id` 级并发或增大策略 `limit/windowSeconds`。 | | API 未发布 | HTTP 404,`message` = `API 定义未发布或已下线` | 确认 `status=ONLINE`,并刷新缓存。 | ## 9. 限流策略配置 - 存储在 `ApiPolicyRateLimitDO.config`,仅支持字段: ```json { "limit": 1000, "windowSeconds": 60 } ``` - 当前实现只读取上述两个字段;Key 模板不可配置。 - Redis Key 固定为:`databus:api:rl:{apiCode}:{version}:{X-Client-Id}`,`X-Client-Id` 缺省时使用 `anonymous`,计数首次出现会自动设置过期。 - 限流拦截后会抛出 `API_RATE_LIMIT_EXCEEDED`,在访问日志中标记 `status=1/2`。 ## 10. 访问日志字段对照 | 字段 | 说明 | | --- | --- | | `traceId` | 来自 `TracerUtils`,可在日志与链路追踪中搜索。 | | `requestHeaders`, `requestBody`, `responseBody` | 默认截断至 4000 字符,JSON 序列化存储。 | | `status` | 0=成功,1=客户端错误,2=服务端错误,3=未知。 | | `stepResults` | 序列化的步骤执行列表(见 `ApiStepResult`),含 `request/response/elapsed/error`。 | | `extra` | 附加变量/属性,供排查自定义上下文。 | > 可通过 `/databus/gateway/access-log/page` + `traceId` 或 `apiCode` 条件快速定位第三方问题。