Files
zt-dsc/docs/数据总线模块大致功能与调用介绍.md
2025-11-28 18:12:00 +08:00

211 lines
12 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# Databus 模块 API 功能与第三方调用说明
> 适用范围:`zt-module-databus`Server 侧)+ `zt-module-databus-api`(接口定义)。本文基于 2025-11-20 主干分支代码。
## 1. 模块定位与整体能力
- **目标**:对外暴露统一的数据/业务编排网关,允许后台在可视化界面中配置 API、步骤、变换与限流策略并即时发布到运行态。
- **核心特性**
1. API 全生命周期管理(定义、版本、回滚、发布缓存刷新)。
2. 编排引擎基于 Spring Integration 动态装配,支持 Start/HTTP/RPC/Script/End 步骤及 JSON 变换链路。
3. 多重安全防护IP 白/黑名单、应用凭证、时间戳 + 随机串、报文加解密、签名、防重放、租户隔离、匿名固定用户等。
4. QoS 能力可插拔限流策略Redis 固定窗口计数)、审计日志、追踪 ID & Step 级结果入库。
5. Debug 支持:管理端 `POST /databus/gateway/invoke` 可注入任意参数模拟真实调用。
## 2. 运行时架构概览
| 组件 | 位置 | 作用 |
| --- | --- | --- |
| `GatewaySecurityFilter` | `framework.integration.gateway.security` | 过滤并校验所有落在 `databus.api-portal.base-path` 之下的 HTTP 请求,完成 IP 校验、报文解密、签名、防重放、匿名用户注入、响应加密。 |
| `ApiGatewayExecutionService` | `framework.integration.gateway.core` | 将 HTTP 请求映射为 `ApiInvocationContext`,调度 Integration Flow构造统一响应。 |
| `IntegrationFlowManager` | `framework.integration.gateway.core` | 按 `apiCode + version` 动态注册 Spring Integration Flow支持热刷新与调试临时 Flow。 |
| `ApiFlowDispatcher` | 同上 | 依据 apiCode/version 找到输入通道,发送请求并等待 `ApiInvocationContext` 回传。 |
| `PolicyAdvisorFactory` + `DefaultRateLimitPolicyEvaluator` | `framework.integration.gateway.core/policy` | 在 Flow 上织入限流等策略,当前默认实现支持 Redis 固定窗口。 |
| `ApiGatewayAccessLogger` | `framework.integration.gateway.core` | 生成访问日志 `databus_api_access_log`,记录 Trace、请求/响应、耗时、步骤结果等。 |
| 管控 REST 控制器 | `controller.admin.gateway.*` | 管理 API 定义、版本、凭证、策略、访问日志等。 |
## 4. 管控端 REST 接口速查
| 模块 | 方法 | 路径 | 说明 |
| --- | --- | --- | --- |
| API 定义 | GET | `/databus/gateway/definition/page` | 分页查询(支持 code/描述筛选)。 |
| | GET | `/databus/gateway/definition/{id}` | 详情(含步骤、变换、限流绑定)。 |
| | POST | `/databus/gateway/definition` | 新建定义,必填步骤(至少 Start+End。 |
| | PUT | `/databus/gateway/definition` | 更新并自动刷新对应 Flow。 |
| | DELETE | `/databus/gateway/definition/{id}` | 删除并注销 Flow。 |
| API 网关 | POST | `/databus/gateway/invoke` | 管理端调试调用。 |
| | GET | `/databus/gateway/definitions` | 拉取当前已上线定义(供灰度/网关缓存)。 |
| | POST | `/databus/gateway/cache/refresh` | 强制刷新所有 Flow 缓存。 |
| API 版本 | GET | `/databus/gateway/version/get?id=` | 查询版本详情(自动还原 snapshotData。 |
| | GET | `/databus/gateway/version/page` | 分页。 |
| | GET | `/databus/gateway/version/list?apiId=` | 列出某 API 的全部版本。 |
| | PUT | `/databus/gateway/version/rollback` | 根据 `id + remark` 回滚。 |
| | GET | `/databus/gateway/version/compare` | 差异对比sourceId/targetId。 |
| 客户端凭证 | GET | `/databus/gateway/credential/page` | 分页。 |
| | GET | `/databus/gateway/credential/get?id=` | 详情(含匿名配置)。 |
| | POST | `/databus/gateway/credential/create` | 新增凭证。 |
| | PUT | `/databus/gateway/credential/update` | 更新。 |
| | DELETE | `/databus/gateway/credential/delete?id=` | 删除。 |
| | GET | `/databus/gateway/credential/list-simple` | 下拉使用。 |
| 限流策略 | GET | `/databus/gateway/policy/rate-limit/page` | 分页检索。 |
| | GET | `/databus/gateway/policy/rate-limit/{id}` | 详情。 |
| | GET | `/databus/gateway/policy/rate-limit/simple-list` | 精简列表。 |
| | POST/PUT/DELETE | `/databus/gateway/policy/rate-limit` | 新增/更新/删除。 |
| 访问日志 | GET | `/databus/gateway/access-log/page` | 分页(需 `databus:gateway:access-log:query` 权限)。 |
| | GET | `/databus/gateway/access-log/get?id=` | 单条详情(自动补充 API 描述)。 |
> 所有接口默认返回 `CommonResult` 包装,字段 `code/message/data`。必要时参考对应 VO位置 `controller.admin.gateway.vo`)。
## 5. API 生命周期管理要点
1. **状态机**`ApiStatusEnum`(草稿/已上线/已下线/已废弃。Integration Flow 只加载 `ONLINE` 状态定义。
2. **版本快照**:每次保存时写入 `databus_api_version`,可通过 `snapshotData` 一键恢复(`rollback` 接口)。
3. **变换校验**:保存时会校验同一级 `TransformPhaseEnum` 不可重复,并确保 Start/End 唯一且位于首尾。
4. **缓存刷新**
- 单 API创建/更新/删除后自动调用 `IntegrationFlowManager.refresh(apiCode, version)`
- 全量:管理员可调用 `/databus/gateway/cache/refresh` 做兜底。
## 6. 网关请求路径与响应格式
- **默认 Base Path**`/admin-api/databus/api/portal`(可通过 `databus.api-portal.base-path` 覆盖;兼容旧版 `/databus/api/portal`)。
- **最终路径**`{basePath}/{apiCode}/{version}`,示例 `/admin-api/databus/api/portal/order.create/v1`
- **支持方法**GET/POST/PUT/DELETE/PATCH均被映射为 `ApiInvocationContext.httpMethod`
- **响应包装**
```json
{
"code": 200,
"message": "OK",
"response": { "bizField": "value" },
"traceId": "c8a3d52f-..."
}
```
> `code` 与 HTTP 状态保持一致;`response` 为 API 变换后的业务体;所有错误也沿用该 Envelope若启用响应加密则返回 Base64 字串)。
## 7. 配置项(`application.yml`)重点
```yaml
databus:
api-portal:
base-path: /admin-api/databus/api/portal
allowed-ips: [10.0.0.0/24] # 可为空表示全放行
denied-ips: []
enable-tenant-header: true
tenant-header: ZT-Tenant-Id
enable-audit: true
enable-rate-limit: true
security:
enabled: true
signature-type: MD5 # 或 SHA256
encryption-type: AES # 或 DES
allowed-clock-skew-seconds: 300
nonce-ttl-seconds: 600
require-body-encryption: true
encrypt-response: true
gateway:
web-client:
connection-pool-enabled: true # 默认启用 Reactor Netty 连接池,可在排查连接复用/长连接异常时设为 false
```
> `GatewaySecurityFilter` 会自动注册到最高优先级 +10确保该路径的请求先经过安全校验。
关闭连接池后,每次 HTTP Step 请求都会新建 TCP 连接,适合短期定位“连接被复用导致 Reset/超时”的场景,但会带来额外的握手开销;切换时可关注启动日志中的 `Databus gateway WebClient pooling` 提示。
## 8. 第三方调用流程详解
### 8.1 前置准备
1. **申请凭证**:在后台创建 `API 客户端凭证`,得到:
- `appId`(对应 `ZT-App-Id` 头)
- `encryptionKey`(用于 AES/DES 对称加密,服务器使用 `CryptoSignatureUtils.decrypt` 解密)
- `encryptionType``signatureType`
- `allowAnonymous` = true 时需选择一个固定系统用户(服务器将自动颁发内部 JWT
2. **确定 API**:记录 `apiCode``version`、请求方法、入参/变换契约。
3. **网络白名单**:将第三方出口 IP 加入 `allowed-ips`,否则直接返回 403。
4. **Redis 要求**:需保证 Redis 可用(用于 nonce、防重放、限流计数
### 8.2 请求构建步骤
| 序号 | 操作 | 说明 |
| --- | --- | --- |
| 1 | 生成时间戳 | `timestamp = System.currentTimeMillis()`,与服务器时间差 ≤ 300s。 |
| 2 | 生成随机串 | `nonce` 长度≥8可使用 `UUID.randomUUID().toString().replace("-", "")`。 |
| 3 | 准备明文 Body | 例如 `{"orderNo":"SO20251120001"}`,记为 `plainBody`。 |
| 4 | 计算签名 | 将所有签名字段放入 Map详见下节调用 `CryptoSignatureUtils.verifySignature` 同样的规则:对 key 排序、跳过 `signature` 字段、使用 `&` 连接 `key=value`,再用 `MD5/SHA256` 计算;结果赋值给 `ZT-Signature`。*注意:签名使用明文 body。* |
| 5 | 加密请求体 | 使用凭证的 `encryptionKey + encryptionType``plainBody` 进行对称加密Base64 结果作为 HTTP BodyContent-Type 可设 `text/plain``application/json`。 |
| 6 | 组装请求头 | `ZT-App-Id`, `ZT-Timestamp`, `ZT-Nonce`, `ZT-Signature`, `ZT-Tenant-Id`(可选), `X-Client-Id`(建议,与限流相关),如有自带 JWT 则设置 `Authorization`。 |
| 7 | 发送请求 | URL = `https://{host}{basePath}/{apiCode}/{version}`,方法与 API 定义保持一致。 |
#### 签名字段示例
```text
appId=demo-app
&body={"orderNo":"SO20251120001"}
&nonce=0c5e2df9a1
&timestamp=1732070400000
```
- Query 参数将被拼接为 `key=value`(多值以逗号连接),自动忽略 `signature` 字段。
- Request Body 若非 JSON则退化为字符串整体签名。
#### cURL 示例
```bash
curl -X POST "https://gw.example.com/admin-api/databus/api/portal/order.create/v1" \
-H "ZT-App-Id: demo-app" \
-H "ZT-Timestamp: 1732070400000" \
-H "ZT-Nonce: 0c5e2df9a1" \
-H "ZT-Signature: 8e377..." \
-H "X-Client-Id: mall" \
-H "Content-Type: text/plain" \
-d "Q2hhcnNldGV4dC1CYXNlNjQgZW5jcnlwdGVkIGJvZHk="
```
> `-d` 的实际内容应当是 AES/ DES 加密后的 Base64 字符串。
### 8.3 响应处理
1. 读取 HTTP 状态与 `ApiGatewayResponse.code/message/traceId`
2.`security.encrypt-response=true`,则响应体本身是加密串,需要使用同一 `encryptionKey/encryptionType` 解密得到 JSON再解析 `response` 字段。
3. `traceId` 可用于后台日志及 `访问日志` 页面关联排查。
### 8.4 错误与重试策略
| 场景 | 表现 | 处理建议 |
| --- | --- | --- |
| 时间戳/Nonce 不合法 | HTTP 401`message` = `请求到达时间超出 300s`/`重复请求` | 校准服务器时间;`nonce` 不可重复Redis TTL 默认 600s。 |
| 签名失败 | HTTP 401`message` = `签名校验失败` | 检查签名字符串、字符编码、大小写。 |
| 未配置密钥 | HTTP 500`message` = `应用未配置加密密钥` | 在后台凭证中补齐密钥与算法,或取消强制加密。 |
| 限流触发 | HTTP 429`message` = `请求触发限流策略` | 调整 `X-Client-Id` 级并发或增大策略 `limit/windowSeconds`。 |
| API 未发布 | HTTP 404`message` = `API 定义未发布或已下线` | 确认 `status=ONLINE`,并刷新缓存。 |
## 9. 限流策略配置
- 存储在 `ApiPolicyRateLimitDO.config`JSON 结构示例:
```json
{
"limit": 1000,
"windowSeconds": 60,
"keyTemplate": "${apiCode}:${tenantId}:${header.X-Client-Id}" // 预留扩展
}
```
- 当前默认实现读取 `limit`(默认 100`windowSeconds`(默认 60
- Redis Key 格式:`databus:api:rl:{apiCode}:{version}:{X-Client-Id}`,当计数首次出现时自动设置过期。
- 限流拦截后会抛出 `API_RATE_LIMIT_EXCEEDED`,在访问日志中标记 `status=1/2`
## 10. 访问日志字段对照
| 字段 | 说明 |
| --- | --- |
| `traceId` | 来自 `TracerUtils`,可在日志与链路追踪中搜索。 |
| `requestHeaders`, `requestBody`, `responseBody` | 默认截断至 4000 字符JSON 序列化存储。 |
| `status` | 0=成功,1=客户端错误,2=服务端错误,3=未知。 |
| `stepResults` | 序列化的步骤执行列表(见 `ApiStepResult`),含 `request/response/elapsed/error`。 |
| `extra` | 附加变量/属性,供排查自定义上下文。 |
> 可通过 `/databus/gateway/access-log/page` + `traceId` 或 `apiCode` 条件快速定位第三方问题。