Files
zt-dsc/docs/数据总线模块大致功能与调用介绍.md
chenbowen 0b646295da 1. 新增 iwork 同步用户组织信息接口
2. 修复错误设置版本信息在 zt-dependencies 的 bug
2025-11-20 18:27:01 +08:00

11 KiB
Raw Permalink Blame History

Databus 模块 API 功能与第三方调用说明

适用范围:zt-module-databusServer 侧)+ zt-module-databus-api(接口定义)。本文基于 2025-11-20 主干分支代码。

1. 模块定位与整体能力

  • 目标:对外暴露统一的数据/业务编排网关,允许后台在可视化界面中配置 API、步骤、变换与限流策略并即时发布到运行态。
  • 核心特性
    1. API 全生命周期管理(定义、版本、回滚、发布缓存刷新)。
    2. 编排引擎基于 Spring Integration 动态装配,支持 Start/HTTP/RPC/Script/End 步骤及 JSON 变换链路。
    3. 多重安全防护IP 白/黑名单、应用凭证、时间戳 + 随机串、报文加解密、签名、防重放、租户隔离、匿名固定用户等。
    4. QoS 能力可插拔限流策略Redis 固定窗口计数)、审计日志、追踪 ID & Step 级结果入库。
    5. Debug 支持:管理端 POST /databus/gateway/invoke 可注入任意参数模拟真实调用。

2. 运行时架构概览

组件 位置 作用
GatewaySecurityFilter framework.integration.gateway.security 过滤并校验所有落在 databus.api-portal.base-path 之下的 HTTP 请求,完成 IP 校验、报文解密、签名、防重放、匿名用户注入、响应加密。
ApiGatewayExecutionService framework.integration.gateway.core 将 HTTP 请求映射为 ApiInvocationContext,调度 Integration Flow构造统一响应。
IntegrationFlowManager framework.integration.gateway.core apiCode + version 动态注册 Spring Integration Flow支持热刷新与调试临时 Flow。
ApiFlowDispatcher 同上 依据 apiCode/version 找到输入通道,发送请求并等待 ApiInvocationContext 回传。
PolicyAdvisorFactory + DefaultRateLimitPolicyEvaluator framework.integration.gateway.core/policy 在 Flow 上织入限流等策略,当前默认实现支持 Redis 固定窗口。
ApiGatewayAccessLogger framework.integration.gateway.core 生成访问日志 databus_api_access_log,记录 Trace、请求/响应、耗时、步骤结果等。
管控 REST 控制器 controller.admin.gateway.* 管理 API 定义、版本、凭证、策略、访问日志等。

4. 管控端 REST 接口速查

模块 方法 路径 说明
API 定义 GET /databus/gateway/definition/page 分页查询(支持 code/描述筛选)。
GET /databus/gateway/definition/{id} 详情(含步骤、变换、限流绑定)。
POST /databus/gateway/definition 新建定义,必填步骤(至少 Start+End
PUT /databus/gateway/definition 更新并自动刷新对应 Flow。
DELETE /databus/gateway/definition/{id} 删除并注销 Flow。
API 网关 POST /databus/gateway/invoke 管理端调试调用。
GET /databus/gateway/definitions 拉取当前已上线定义(供灰度/网关缓存)。
POST /databus/gateway/cache/refresh 强制刷新所有 Flow 缓存。
API 版本 GET /databus/gateway/version/get?id= 查询版本详情(自动还原 snapshotData
GET /databus/gateway/version/page 分页。
GET /databus/gateway/version/list?apiId= 列出某 API 的全部版本。
PUT /databus/gateway/version/rollback 根据 id + remark 回滚。
GET /databus/gateway/version/compare 差异对比sourceId/targetId
客户端凭证 GET /databus/gateway/credential/page 分页。
GET /databus/gateway/credential/get?id= 详情(含匿名配置)。
POST /databus/gateway/credential/create 新增凭证。
PUT /databus/gateway/credential/update 更新。
DELETE /databus/gateway/credential/delete?id= 删除。
GET /databus/gateway/credential/list-simple 下拉使用。
限流策略 GET /databus/gateway/policy/rate-limit/page 分页检索。
GET /databus/gateway/policy/rate-limit/{id} 详情。
GET /databus/gateway/policy/rate-limit/simple-list 精简列表。
POST/PUT/DELETE /databus/gateway/policy/rate-limit 新增/更新/删除。
访问日志 GET /databus/gateway/access-log/page 分页(需 databus:gateway:access-log:query 权限)。
GET /databus/gateway/access-log/get?id= 单条详情(自动补充 API 描述)。

所有接口默认返回 CommonResult 包装,字段 code/message/data。必要时参考对应 VO位置 controller.admin.gateway.vo)。

5. API 生命周期管理要点

  1. 状态机ApiStatusEnum(草稿/已上线/已下线/已废弃。Integration Flow 只加载 ONLINE 状态定义。
  2. 版本快照:每次保存时写入 databus_api_version,可通过 snapshotData 一键恢复(rollback 接口)。
  3. 变换校验:保存时会校验同一级 TransformPhaseEnum 不可重复,并确保 Start/End 唯一且位于首尾。
  4. 缓存刷新
    • 单 API创建/更新/删除后自动调用 IntegrationFlowManager.refresh(apiCode, version)
    • 全量:管理员可调用 /databus/gateway/cache/refresh 做兜底。

6. 网关请求路径与响应格式

  • 默认 Base Path/admin-api/databus/api/portal(可通过 databus.api-portal.base-path 覆盖;兼容旧版 /databus/api/portal)。
  • 最终路径{basePath}/{apiCode}/{version},示例 /admin-api/databus/api/portal/order.create/v1
  • 支持方法GET/POST/PUT/DELETE/PATCH均被映射为 ApiInvocationContext.httpMethod
  • 响应包装
{
  "code": 200,
  "message": "OK",
  "response": { "bizField": "value" },
  "traceId": "c8a3d52f-..."
}

code 与 HTTP 状态保持一致;response 为 API 变换后的业务体;所有错误也沿用该 Envelope若启用响应加密则返回 Base64 字串)。

7. 配置项(application.yml)重点

databus:
  api-portal:
    base-path: /admin-api/databus/api/portal
    allowed-ips: [10.0.0.0/24]        # 可为空表示全放行
    denied-ips: []
    enable-tenant-header: true
    tenant-header: ZT-Tenant-Id
    enable-audit: true
    enable-rate-limit: true
    security:
      enabled: true
      signature-type: MD5            # 或 SHA256
      encryption-type: AES           # 或 DES
      allowed-clock-skew-seconds: 300
      nonce-ttl-seconds: 600
      require-body-encryption: true
      encrypt-response: true

GatewaySecurityFilter 会自动注册到最高优先级 +10确保该路径的请求先经过安全校验。

8. 第三方调用流程详解

8.1 前置准备

  1. 申请凭证:在后台创建 API 客户端凭证,得到:
    • appId(对应 ZT-App-Id 头)
    • encryptionKey(用于 AES/DES 对称加密,服务器使用 CryptoSignatureUtils.decrypt 解密)
    • encryptionTypesignatureType
    • allowAnonymous = true 时需选择一个固定系统用户(服务器将自动颁发内部 JWT
  2. 确定 API:记录 apiCodeversion、请求方法、入参/变换契约。
  3. 网络白名单:将第三方出口 IP 加入 allowed-ips,否则直接返回 403。
  4. Redis 要求:需保证 Redis 可用(用于 nonce、防重放、限流计数

8.2 请求构建步骤

序号 操作 说明
1 生成时间戳 timestamp = System.currentTimeMillis(),与服务器时间差 ≤ 300s。
2 生成随机串 nonce 长度≥8可使用 UUID.randomUUID().toString().replace("-", "")
3 准备明文 Body 例如 {"orderNo":"SO20251120001"},记为 plainBody
4 计算签名 将所有签名字段放入 Map详见下节调用 CryptoSignatureUtils.verifySignature 同样的规则:对 key 排序、跳过 signature 字段、使用 & 连接 key=value,再用 MD5/SHA256 计算;结果赋值给 ZT-Signature注意:签名使用明文 body。
5 加密请求体 使用凭证的 encryptionKey + encryptionTypeplainBody 进行对称加密Base64 结果作为 HTTP BodyContent-Type 可设 text/plainapplication/json
6 组装请求头 ZT-App-Id, ZT-Timestamp, ZT-Nonce, ZT-Signature, ZT-Tenant-Id(可选), X-Client-Id(建议,与限流相关),如有自带 JWT 则设置 Authorization
7 发送请求 URL = https://{host}{basePath}/{apiCode}/{version},方法与 API 定义保持一致。

签名字段示例

appId=demo-app
&body={"orderNo":"SO20251120001"}
&nonce=0c5e2df9a1
&timestamp=1732070400000
  • Query 参数将被拼接为 key=value(多值以逗号连接),自动忽略 signature 字段。
  • Request Body 若非 JSON则退化为字符串整体签名。

cURL 示例

curl -X POST "https://gw.example.com/admin-api/databus/api/portal/order.create/v1" \
  -H "ZT-App-Id: demo-app" \
  -H "ZT-Timestamp: 1732070400000" \
  -H "ZT-Nonce: 0c5e2df9a1" \
  -H "ZT-Signature: 8e377..." \
  -H "X-Client-Id: mall" \
  -H "Content-Type: text/plain" \
  -d "Q2hhcnNldGV4dC1CYXNlNjQgZW5jcnlwdGVkIGJvZHk="

-d 的实际内容应当是 AES/ DES 加密后的 Base64 字符串。

8.3 响应处理

  1. 读取 HTTP 状态与 ApiGatewayResponse.code/message/traceId
  2. security.encrypt-response=true,则响应体本身是加密串,需要使用同一 encryptionKey/encryptionType 解密得到 JSON再解析 response 字段。
  3. traceId 可用于后台日志及 访问日志 页面关联排查。

8.4 错误与重试策略

场景 表现 处理建议
时间戳/Nonce 不合法 HTTP 401message = 请求到达时间超出 300s/重复请求 校准服务器时间;nonce 不可重复Redis TTL 默认 600s
签名失败 HTTP 401message = 签名校验失败 检查签名字符串、字符编码、大小写。
未配置密钥 HTTP 500message = 应用未配置加密密钥 在后台凭证中补齐密钥与算法,或取消强制加密。
限流触发 HTTP 429message = 请求触发限流策略 调整 X-Client-Id 级并发或增大策略 limit/windowSeconds
API 未发布 HTTP 404message = API 定义未发布或已下线 确认 status=ONLINE,并刷新缓存。

9. 限流策略配置

  • 存储在 ApiPolicyRateLimitDO.configJSON 结构示例:
{
  "limit": 1000,
  "windowSeconds": 60,
  "keyTemplate": "${apiCode}:${tenantId}:${header.X-Client-Id}"  // 预留扩展
}
  • 当前默认实现读取 limit(默认 100windowSeconds(默认 60
  • Redis Key 格式:databus:api:rl:{apiCode}:{version}:{X-Client-Id},当计数首次出现时自动设置过期。
  • 限流拦截后会抛出 API_RATE_LIMIT_EXCEEDED,在访问日志中标记 status=1/2

10. 访问日志字段对照

字段 说明
traceId 来自 TracerUtils,可在日志与链路追踪中搜索。
requestHeaders, requestBody, responseBody 默认截断至 4000 字符JSON 序列化存储。
status 0=成功,1=客户端错误,2=服务端错误,3=未知。
stepResults 序列化的步骤执行列表(见 ApiStepResult),含 request/response/elapsed/error
extra 附加变量/属性,供排查自定义上下文。

可通过 /databus/gateway/access-log/page + traceIdapiCode 条件快速定位第三方问题。