11 KiB
11 KiB
Databus 模块 API 功能与第三方调用说明
适用范围:
zt-module-databus(Server 侧)+zt-module-databus-api(接口定义)。本文基于 2025-11-20 主干分支代码。
1. 模块定位与整体能力
- 目标:对外暴露统一的数据/业务编排网关,允许后台在可视化界面中配置 API、步骤、变换与限流策略,并即时发布到运行态。
- 核心特性:
- API 全生命周期管理(定义、版本、回滚、发布缓存刷新)。
- 编排引擎基于 Spring Integration 动态装配,支持 Start/HTTP/RPC/Script/End 步骤及 JSON 变换链路。
- 多重安全防护:IP 白/黑名单、应用凭证、时间戳 + 随机串、报文加解密、签名、防重放、租户隔离、匿名固定用户等。
- QoS 能力:可插拔限流策略(Redis 固定窗口计数)、审计日志、追踪 ID & Step 级结果入库。
- Debug 支持:管理端
POST /databus/gateway/invoke可注入任意参数模拟真实调用。
2. 运行时架构概览
| 组件 | 位置 | 作用 |
|---|---|---|
GatewaySecurityFilter |
framework.integration.gateway.security |
过滤并校验所有落在 databus.api-portal.base-path 之下的 HTTP 请求,完成 IP 校验、报文解密、签名、防重放、匿名用户注入、响应加密。 |
ApiGatewayExecutionService |
framework.integration.gateway.core |
将 HTTP 请求映射为 ApiInvocationContext,调度 Integration Flow,构造统一响应。 |
IntegrationFlowManager |
framework.integration.gateway.core |
按 apiCode + version 动态注册 Spring Integration Flow,支持热刷新与调试临时 Flow。 |
ApiFlowDispatcher |
同上 | 依据 apiCode/version 找到输入通道,发送请求并等待 ApiInvocationContext 回传。 |
PolicyAdvisorFactory + DefaultRateLimitPolicyEvaluator |
framework.integration.gateway.core/policy |
在 Flow 上织入限流等策略,当前默认实现支持 Redis 固定窗口。 |
ApiGatewayAccessLogger |
framework.integration.gateway.core |
生成访问日志 databus_api_access_log,记录 Trace、请求/响应、耗时、步骤结果等。 |
| 管控 REST 控制器 | controller.admin.gateway.* |
管理 API 定义、版本、凭证、策略、访问日志等。 |
4. 管控端 REST 接口速查
| 模块 | 方法 | 路径 | 说明 |
|---|---|---|---|
| API 定义 | GET | /databus/gateway/definition/page |
分页查询(支持 code/描述筛选)。 |
| GET | /databus/gateway/definition/{id} |
详情(含步骤、变换、限流绑定)。 | |
| POST | /databus/gateway/definition |
新建定义,必填步骤(至少 Start+End)。 | |
| PUT | /databus/gateway/definition |
更新并自动刷新对应 Flow。 | |
| DELETE | /databus/gateway/definition/{id} |
删除并注销 Flow。 | |
| API 网关 | POST | /databus/gateway/invoke |
管理端调试调用。 |
| GET | /databus/gateway/definitions |
拉取当前已上线定义(供灰度/网关缓存)。 | |
| POST | /databus/gateway/cache/refresh |
强制刷新所有 Flow 缓存。 | |
| API 版本 | GET | /databus/gateway/version/get?id= |
查询版本详情(自动还原 snapshotData)。 |
| GET | /databus/gateway/version/page |
分页。 | |
| GET | /databus/gateway/version/list?apiId= |
列出某 API 的全部版本。 | |
| PUT | /databus/gateway/version/rollback |
根据 id + remark 回滚。 |
|
| GET | /databus/gateway/version/compare |
差异对比(sourceId/targetId)。 | |
| 客户端凭证 | GET | /databus/gateway/credential/page |
分页。 |
| GET | /databus/gateway/credential/get?id= |
详情(含匿名配置)。 | |
| POST | /databus/gateway/credential/create |
新增凭证。 | |
| PUT | /databus/gateway/credential/update |
更新。 | |
| DELETE | /databus/gateway/credential/delete?id= |
删除。 | |
| GET | /databus/gateway/credential/list-simple |
下拉使用。 | |
| 限流策略 | GET | /databus/gateway/policy/rate-limit/page |
分页检索。 |
| GET | /databus/gateway/policy/rate-limit/{id} |
详情。 | |
| GET | /databus/gateway/policy/rate-limit/simple-list |
精简列表。 | |
| POST/PUT/DELETE | /databus/gateway/policy/rate-limit |
新增/更新/删除。 | |
| 访问日志 | GET | /databus/gateway/access-log/page |
分页(需 databus:gateway:access-log:query 权限)。 |
| GET | /databus/gateway/access-log/get?id= |
单条详情(自动补充 API 描述)。 |
所有接口默认返回
CommonResult包装,字段code/message/data。必要时参考对应 VO(位置controller.admin.gateway.vo)。
5. API 生命周期管理要点
- 状态机:
ApiStatusEnum(草稿/已上线/已下线/已废弃)。Integration Flow 只加载ONLINE状态定义。 - 版本快照:每次保存时写入
databus_api_version,可通过snapshotData一键恢复(rollback接口)。 - 变换校验:保存时会校验同一级
TransformPhaseEnum不可重复,并确保 Start/End 唯一且位于首尾。 - 缓存刷新:
- 单 API:创建/更新/删除后自动调用
IntegrationFlowManager.refresh(apiCode, version)。 - 全量:管理员可调用
/databus/gateway/cache/refresh做兜底。
- 单 API:创建/更新/删除后自动调用
6. 网关请求路径与响应格式
- 默认 Base Path:
/admin-api/databus/api/portal(可通过databus.api-portal.base-path覆盖;兼容旧版/databus/api/portal)。 - 最终路径:
{basePath}/{apiCode}/{version},示例/admin-api/databus/api/portal/order.create/v1。 - 支持方法:GET/POST/PUT/DELETE/PATCH,均被映射为
ApiInvocationContext.httpMethod。 - 响应包装:
{
"code": 200,
"message": "OK",
"response": { "bizField": "value" },
"traceId": "c8a3d52f-..."
}
code与 HTTP 状态保持一致;response为 API 变换后的业务体;所有错误也沿用该 Envelope(若启用响应加密则返回 Base64 字串)。
7. 配置项(application.yml)重点
databus:
api-portal:
base-path: /admin-api/databus/api/portal
allowed-ips: [10.0.0.0/24] # 可为空表示全放行
denied-ips: []
enable-tenant-header: true
tenant-header: ZT-Tenant-Id
enable-audit: true
enable-rate-limit: true
security:
enabled: true
signature-type: MD5 # 或 SHA256
encryption-type: AES # 或 DES
allowed-clock-skew-seconds: 300
nonce-ttl-seconds: 600
require-body-encryption: true
encrypt-response: true
GatewaySecurityFilter会自动注册到最高优先级 +10,确保该路径的请求先经过安全校验。
8. 第三方调用流程详解
8.1 前置准备
- 申请凭证:在后台创建
API 客户端凭证,得到:appId(对应ZT-App-Id头)encryptionKey(用于 AES/DES 对称加密,服务器使用CryptoSignatureUtils.decrypt解密)encryptionType、signatureTypeallowAnonymous= true 时需选择一个固定系统用户(服务器将自动颁发内部 JWT)。
- 确定 API:记录
apiCode、version、请求方法、入参/变换契约。 - 网络白名单:将第三方出口 IP 加入
allowed-ips,否则直接返回 403。 - Redis 要求:需保证 Redis 可用(用于 nonce、防重放、限流计数)。
8.2 请求构建步骤
| 序号 | 操作 | 说明 |
|---|---|---|
| 1 | 生成时间戳 | timestamp = System.currentTimeMillis(),与服务器时间差 ≤ 300s。 |
| 2 | 生成随机串 | nonce 长度≥8,可使用 UUID.randomUUID().toString().replace("-", "")。 |
| 3 | 准备明文 Body | 例如 {"orderNo":"SO20251120001"},记为 plainBody。 |
| 4 | 计算签名 | 将所有签名字段放入 Map(详见下节),调用 CryptoSignatureUtils.verifySignature 同样的规则:对 key 排序、跳过 signature 字段、使用 & 连接 key=value,再用 MD5/SHA256 计算;结果赋值给 ZT-Signature。注意:签名使用明文 body。 |
| 5 | 加密请求体 | 使用凭证的 encryptionKey + encryptionType 对 plainBody 进行对称加密,Base64 结果作为 HTTP Body;Content-Type 可设 text/plain 或 application/json。 |
| 6 | 组装请求头 | ZT-App-Id, ZT-Timestamp, ZT-Nonce, ZT-Signature, ZT-Tenant-Id(可选), X-Client-Id(建议,与限流相关),如有自带 JWT 则设置 Authorization。 |
| 7 | 发送请求 | URL = https://{host}{basePath}/{apiCode}/{version},方法与 API 定义保持一致。 |
签名字段示例
appId=demo-app
&body={"orderNo":"SO20251120001"}
&nonce=0c5e2df9a1
×tamp=1732070400000
- Query 参数将被拼接为
key=value(多值以逗号连接),自动忽略signature字段。 - Request Body 若非 JSON,则退化为字符串整体签名。
cURL 示例
curl -X POST "https://gw.example.com/admin-api/databus/api/portal/order.create/v1" \
-H "ZT-App-Id: demo-app" \
-H "ZT-Timestamp: 1732070400000" \
-H "ZT-Nonce: 0c5e2df9a1" \
-H "ZT-Signature: 8e377..." \
-H "X-Client-Id: mall" \
-H "Content-Type: text/plain" \
-d "Q2hhcnNldGV4dC1CYXNlNjQgZW5jcnlwdGVkIGJvZHk="
-d的实际内容应当是 AES/ DES 加密后的 Base64 字符串。
8.3 响应处理
- 读取 HTTP 状态与
ApiGatewayResponse.code/message/traceId。 - 若
security.encrypt-response=true,则响应体本身是加密串,需要使用同一encryptionKey/encryptionType解密得到 JSON,再解析response字段。 traceId可用于后台日志及访问日志页面关联排查。
8.4 错误与重试策略
| 场景 | 表现 | 处理建议 |
|---|---|---|
| 时间戳/Nonce 不合法 | HTTP 401,message = 请求到达时间超出 300s/重复请求 |
校准服务器时间;nonce 不可重复(Redis TTL 默认 600s)。 |
| 签名失败 | HTTP 401,message = 签名校验失败 |
检查签名字符串、字符编码、大小写。 |
| 未配置密钥 | HTTP 500,message = 应用未配置加密密钥 |
在后台凭证中补齐密钥与算法,或取消强制加密。 |
| 限流触发 | HTTP 429,message = 请求触发限流策略 |
调整 X-Client-Id 级并发或增大策略 limit/windowSeconds。 |
| API 未发布 | HTTP 404,message = API 定义未发布或已下线 |
确认 status=ONLINE,并刷新缓存。 |
9. 限流策略配置
- 存储在
ApiPolicyRateLimitDO.config,JSON 结构示例:
{
"limit": 1000,
"windowSeconds": 60,
"keyTemplate": "${apiCode}:${tenantId}:${header.X-Client-Id}" // 预留扩展
}
- 当前默认实现读取
limit(默认 100)与windowSeconds(默认 60)。 - Redis Key 格式:
databus:api:rl:{apiCode}:{version}:{X-Client-Id},当计数首次出现时自动设置过期。 - 限流拦截后会抛出
API_RATE_LIMIT_EXCEEDED,在访问日志中标记status=1/2。
10. 访问日志字段对照
| 字段 | 说明 |
|---|---|
traceId |
来自 TracerUtils,可在日志与链路追踪中搜索。 |
requestHeaders, requestBody, responseBody |
默认截断至 4000 字符,JSON 序列化存储。 |
status |
0=成功,1=客户端错误,2=服务端错误,3=未知。 |
stepResults |
序列化的步骤执行列表(见 ApiStepResult),含 request/response/elapsed/error。 |
extra |
附加变量/属性,供排查自定义上下文。 |
可通过
/databus/gateway/access-log/page+traceId或apiCode条件快速定位第三方问题。