# Databus 模块 API 功能与第三方调用说明 > 适用范围:`zt-module-databus`(Server 侧)+ `zt-module-databus-api`(接口定义)。本文基于 2025-11-20 主干分支代码。 ## 1. 模块定位与整体能力 - **目标**:对外暴露统一的数据/业务编排网关,允许后台在可视化界面中配置 API、步骤、变换与限流策略,并即时发布到运行态。 - **核心特性**: 1. API 全生命周期管理(定义、版本、回滚、发布缓存刷新)。 2. 编排引擎基于 Spring Integration 动态装配,支持 Start/HTTP/RPC/Script/End 步骤及 JSON 变换链路。 3. 多重安全防护:IP 白/黑名单、应用凭证、时间戳 + 随机串、报文加解密、签名、防重放、租户隔离、匿名固定用户等。 4. QoS 能力:可插拔限流策略(Redis 固定窗口计数)、审计日志、追踪 ID & Step 级结果入库。 5. Debug 支持:管理端 `POST /databus/gateway/invoke` 可注入任意参数模拟真实调用。 ## 2. 运行时架构概览 | 组件 | 位置 | 作用 | | --- | --- | --- | | `GatewaySecurityFilter` | `framework.integration.gateway.security` | 过滤并校验所有落在 `databus.api-portal.base-path` 之下的 HTTP 请求,完成 IP 校验、报文解密、签名、防重放、匿名用户注入、响应加密。 | | `ApiGatewayExecutionService` | `framework.integration.gateway.core` | 将 HTTP 请求映射为 `ApiInvocationContext`,调度 Integration Flow,构造统一响应。 | | `IntegrationFlowManager` | `framework.integration.gateway.core` | 按 `apiCode + version` 动态注册 Spring Integration Flow,支持热刷新与调试临时 Flow。 | | `ApiFlowDispatcher` | 同上 | 依据 apiCode/version 找到输入通道,发送请求并等待 `ApiInvocationContext` 回传。 | | `PolicyAdvisorFactory` + `DefaultRateLimitPolicyEvaluator` | `framework.integration.gateway.core/policy` | 在 Flow 上织入限流等策略,当前默认实现支持 Redis 固定窗口。 | | `ApiGatewayAccessLogger` | `framework.integration.gateway.core` | 生成访问日志 `databus_api_access_log`,记录 Trace、请求/响应、耗时、步骤结果等。 | | 管控 REST 控制器 | `controller.admin.gateway.*` | 管理 API 定义、版本、凭证、策略、访问日志等。 | ## 4. 管控端 REST 接口速查 | 模块 | 方法 | 路径 | 说明 | | --- | --- | --- | --- | | API 定义 | GET | `/databus/gateway/definition/page` | 分页查询(支持 code/描述筛选)。 | | | GET | `/databus/gateway/definition/{id}` | 详情(含步骤、变换、限流绑定)。 | | | POST | `/databus/gateway/definition` | 新建定义,必填步骤(至少 Start+End)。 | | | PUT | `/databus/gateway/definition` | 更新并自动刷新对应 Flow。 | | | DELETE | `/databus/gateway/definition/{id}` | 删除并注销 Flow。 | | API 网关 | POST | `/databus/gateway/invoke` | 管理端调试调用。 | | | GET | `/databus/gateway/definitions` | 拉取当前已上线定义(供灰度/网关缓存)。 | | | POST | `/databus/gateway/cache/refresh` | 强制刷新所有 Flow 缓存。 | | API 版本 | GET | `/databus/gateway/version/get?id=` | 查询版本详情(自动还原 snapshotData)。 | | | GET | `/databus/gateway/version/page` | 分页。 | | | GET | `/databus/gateway/version/list?apiId=` | 列出某 API 的全部版本。 | | | PUT | `/databus/gateway/version/rollback` | 根据 `id + remark` 回滚。 | | | GET | `/databus/gateway/version/compare` | 差异对比(sourceId/targetId)。 | | 客户端凭证 | GET | `/databus/gateway/credential/page` | 分页。 | | | GET | `/databus/gateway/credential/get?id=` | 详情(含匿名配置)。 | | | POST | `/databus/gateway/credential/create` | 新增凭证。 | | | PUT | `/databus/gateway/credential/update` | 更新。 | | | DELETE | `/databus/gateway/credential/delete?id=` | 删除。 | | | GET | `/databus/gateway/credential/list-simple` | 下拉使用。 | | 限流策略 | GET | `/databus/gateway/policy/rate-limit/page` | 分页检索。 | | | GET | `/databus/gateway/policy/rate-limit/{id}` | 详情。 | | | GET | `/databus/gateway/policy/rate-limit/simple-list` | 精简列表。 | | | POST/PUT/DELETE | `/databus/gateway/policy/rate-limit` | 新增/更新/删除。 | | 访问日志 | GET | `/databus/gateway/access-log/page` | 分页(需 `databus:gateway:access-log:query` 权限)。 | | | GET | `/databus/gateway/access-log/get?id=` | 单条详情(自动补充 API 描述)。 | > 所有接口默认返回 `CommonResult` 包装,字段 `code/message/data`。必要时参考对应 VO(位置 `controller.admin.gateway.vo`)。 ## 5. API 生命周期管理要点 1. **状态机**:`ApiStatusEnum`(草稿/已上线/已下线/已废弃)。Integration Flow 只加载 `ONLINE` 状态定义。 2. **版本快照**:每次保存时写入 `databus_api_version`,可通过 `snapshotData` 一键恢复(`rollback` 接口)。 3. **变换校验**:保存时会校验同一级 `TransformPhaseEnum` 不可重复,并确保 Start/End 唯一且位于首尾。 4. **缓存刷新**: - 单 API:创建/更新/删除后自动调用 `IntegrationFlowManager.refresh(apiCode, version)`。 - 全量:管理员可调用 `/databus/gateway/cache/refresh` 做兜底。 ## 6. 网关请求路径与响应格式 - **默认 Base Path**:`/admin-api/databus/api/portal`(可通过 `databus.api-portal.base-path` 覆盖;兼容旧版 `/databus/api/portal`)。 - **最终路径**:`{basePath}/{apiCode}/{version}`,示例 `/admin-api/databus/api/portal/order.create/v1`。 - **支持方法**:GET/POST/PUT/DELETE/PATCH,均被映射为 `ApiInvocationContext.httpMethod`。 - **响应包装**: ```json { "code": 200, "message": "OK", "response": { "bizField": "value" }, "traceId": "c8a3d52f-..." } ``` > `code` 与 HTTP 状态保持一致;`response` 为 API 变换后的业务体;所有错误也沿用该 Envelope(若启用响应加密则返回 Base64 字串)。 ## 7. 配置项(`application.yml`)重点 ```yaml databus: api-portal: base-path: /admin-api/databus/api/portal allowed-ips: [10.0.0.0/24] # 可为空表示全放行 denied-ips: [] enable-tenant-header: true tenant-header: ZT-Tenant-Id enable-audit: true enable-rate-limit: true security: enabled: true signature-type: MD5 # 或 SHA256 encryption-type: AES # 或 DES allowed-clock-skew-seconds: 300 nonce-ttl-seconds: 600 require-body-encryption: true encrypt-response: true ``` > `GatewaySecurityFilter` 会自动注册到最高优先级 +10,确保该路径的请求先经过安全校验。 ## 8. 第三方调用流程详解 ### 8.1 前置准备 1. **申请凭证**:在后台创建 `API 客户端凭证`,得到: - `appId`(对应 `ZT-App-Id` 头) - `encryptionKey`(用于 AES/DES 对称加密,服务器使用 `CryptoSignatureUtils.decrypt` 解密) - `encryptionType`、`signatureType` - `allowAnonymous` = true 时需选择一个固定系统用户(服务器将自动颁发内部 JWT)。 2. **确定 API**:记录 `apiCode`、`version`、请求方法、入参/变换契约。 3. **网络白名单**:将第三方出口 IP 加入 `allowed-ips`,否则直接返回 403。 4. **Redis 要求**:需保证 Redis 可用(用于 nonce、防重放、限流计数)。 ### 8.2 请求构建步骤 | 序号 | 操作 | 说明 | | --- | --- | --- | | 1 | 生成时间戳 | `timestamp = System.currentTimeMillis()`,与服务器时间差 ≤ 300s。 | | 2 | 生成随机串 | `nonce` 长度≥8,可使用 `UUID.randomUUID().toString().replace("-", "")`。 | | 3 | 准备明文 Body | 例如 `{"orderNo":"SO20251120001"}`,记为 `plainBody`。 | | 4 | 计算签名 | 将所有签名字段放入 Map(详见下节),调用 `CryptoSignatureUtils.verifySignature` 同样的规则:对 key 排序、跳过 `signature` 字段、使用 `&` 连接 `key=value`,再用 `MD5/SHA256` 计算;结果赋值给 `ZT-Signature`。*注意:签名使用明文 body。* | | 5 | 加密请求体 | 使用凭证的 `encryptionKey + encryptionType` 对 `plainBody` 进行对称加密,Base64 结果作为 HTTP Body;Content-Type 可设 `text/plain` 或 `application/json`。 | | 6 | 组装请求头 | `ZT-App-Id`, `ZT-Timestamp`, `ZT-Nonce`, `ZT-Signature`, `ZT-Tenant-Id`(可选), `X-Client-Id`(建议,与限流相关),如有自带 JWT 则设置 `Authorization`。 | | 7 | 发送请求 | URL = `https://{host}{basePath}/{apiCode}/{version}`,方法与 API 定义保持一致。 | #### 签名字段示例 ``` appId=demo-app &body={"orderNo":"SO20251120001"} &nonce=0c5e2df9a1 ×tamp=1732070400000 ``` - Query 参数将被拼接为 `key=value`(多值以逗号连接),自动忽略 `signature` 字段。 - Request Body 若非 JSON,则退化为字符串整体签名。 #### cURL 示例 ```bash curl -X POST "https://gw.example.com/admin-api/databus/api/portal/order.create/v1" \ -H "ZT-App-Id: demo-app" \ -H "ZT-Timestamp: 1732070400000" \ -H "ZT-Nonce: 0c5e2df9a1" \ -H "ZT-Signature: 8e377..." \ -H "X-Client-Id: mall" \ -H "Content-Type: text/plain" \ -d "Q2hhcnNldGV4dC1CYXNlNjQgZW5jcnlwdGVkIGJvZHk=" ``` > `-d` 的实际内容应当是 AES/ DES 加密后的 Base64 字符串。 ### 8.3 响应处理 1. 读取 HTTP 状态与 `ApiGatewayResponse.code/message/traceId`。 2. 若 `security.encrypt-response=true`,则响应体本身是加密串,需要使用同一 `encryptionKey/encryptionType` 解密得到 JSON,再解析 `response` 字段。 3. `traceId` 可用于后台日志及 `访问日志` 页面关联排查。 ### 8.4 错误与重试策略 | 场景 | 表现 | 处理建议 | | --- | --- | --- | | 时间戳/Nonce 不合法 | HTTP 401,`message` = `请求到达时间超出 300s`/`重复请求` | 校准服务器时间;`nonce` 不可重复(Redis TTL 默认 600s)。 | | 签名失败 | HTTP 401,`message` = `签名校验失败` | 检查签名字符串、字符编码、大小写。 | | 未配置密钥 | HTTP 500,`message` = `应用未配置加密密钥` | 在后台凭证中补齐密钥与算法,或取消强制加密。 | | 限流触发 | HTTP 429,`message` = `请求触发限流策略` | 调整 `X-Client-Id` 级并发或增大策略 `limit/windowSeconds`。 | | API 未发布 | HTTP 404,`message` = `API 定义未发布或已下线` | 确认 `status=ONLINE`,并刷新缓存。 | ## 9. 限流策略配置 - 存储在 `ApiPolicyRateLimitDO.config`,JSON 结构示例: ```json { "limit": 1000, "windowSeconds": 60, "keyTemplate": "${apiCode}:${tenantId}:${header.X-Client-Id}" // 预留扩展 } ``` - 当前默认实现读取 `limit`(默认 100)与 `windowSeconds`(默认 60)。 - Redis Key 格式:`databus:api:rl:{apiCode}:{version}:{X-Client-Id}`,当计数首次出现时自动设置过期。 - 限流拦截后会抛出 `API_RATE_LIMIT_EXCEEDED`,在访问日志中标记 `status=1/2`。 ## 10. 访问日志字段对照 | 字段 | 说明 | | --- | --- | | `traceId` | 来自 `TracerUtils`,可在日志与链路追踪中搜索。 | | `requestHeaders`, `requestBody`, `responseBody` | 默认截断至 4000 字符,JSON 序列化存储。 | | `status` | 0=成功,1=客户端错误,2=服务端错误,3=未知。 | | `stepResults` | 序列化的步骤执行列表(见 `ApiStepResult`),含 `request/response/elapsed/error`。 | | `extra` | 附加变量/属性,供排查自定义上下文。 | > 可通过 `/databus/gateway/access-log/page` + `traceId` 或 `apiCode` 条件快速定位第三方问题。