Files
zt-dsc/docs/数据总线模块大致功能与调用介绍.md
chenbowen cae0b9e4af 1. 新增 api 绑定客户凭证进行权限校验
2. 去除 api 定义的缓存策略
3. 新增短信渠道
4. 新增用户信息模糊查询
5. 修复全局的单元测试
2025-12-12 10:03:10 +08:00

13 KiB
Raw Blame History

Databus 模块 API 功能与第三方调用说明

适用范围:zt-module-databusServer 侧)+ zt-module-databus-api(接口定义)。本文基于 2025-11-20 主干分支代码。

1. 模块定位与整体能力

  • 目标:对外暴露统一的数据/业务编排网关,允许后台在可视化界面中配置 API、步骤、变换与限流策略并即时发布到运行态。
  • 核心特性
    1. API 全生命周期管理(定义、版本、回滚、发布缓存刷新)。
    2. 编排引擎基于 Spring Integration 动态装配,支持 Start/HTTP/RPC/Script/End 步骤及 JSON 变换链路。
    3. 多重安全防护:精确 IP 白/黑名单(仅支持单 IP不支持 CIDR 段)、应用凭证、时间戳 + 随机串、报文加解密、签名、防重放、租户隔离、匿名固定用户等。
    4. QoS 能力可插拔限流策略Redis 固定窗口计数)、访问日志、追踪 ID & Step 级结果入库。
    5. Debug 支持:管理端 POST /databus/gateway/invoke 可注入任意参数模拟真实调用。

2. 运行时架构概览

组件 位置 作用
GatewaySecurityFilter framework.integration.gateway.security 过滤并校验所有落在 databus.api-portal.base-path 之下的 HTTP 请求,完成 IP 校验、报文解密、签名、防重放、匿名用户注入、响应加密。
ApiGatewayExecutionService framework.integration.gateway.core 将 HTTP 请求映射为 ApiInvocationContext,调度 Integration Flow构造统一响应。
IntegrationFlowManager framework.integration.gateway.core apiCode + version 动态注册 Spring Integration Flow支持热刷新与调试临时 Flow。
ApiFlowDispatcher 同上 依据 apiCode/version 找到输入通道,发送请求并等待 ApiInvocationContext 回传。
PolicyAdvisorFactory + DefaultRateLimitPolicyEvaluator framework.integration.gateway.core/policy 在 Flow 上织入限流等策略,当前默认实现支持 Redis 固定窗口。
ApiGatewayAccessLogger framework.integration.gateway.core 生成访问日志 databus_api_access_log,记录 Trace、请求/响应、耗时、步骤结果等。
管控 REST 控制器 controller.admin.gateway.* 管理 API 定义、版本、凭证、策略、访问日志等。

4. 管控端 REST 接口速查

模块 方法 路径 说明
API 定义 GET /databus/gateway/definition/page 分页查询(支持 code/描述筛选)。
GET /databus/gateway/definition/{id} 详情(含步骤、变换、限流绑定)。
POST /databus/gateway/definition 新建定义,必填步骤(至少 Start+End
PUT /databus/gateway/definition 更新并自动刷新对应 Flow。
DELETE /databus/gateway/definition/{id} 删除并注销 Flow。
API 网关 POST /databus/gateway/invoke 管理端调试调用。
GET /databus/gateway/definitions 拉取当前已上线定义(供灰度/网关缓存)。
POST /databus/gateway/cache/refresh 强制刷新所有 Flow 缓存。
API 版本 GET /databus/gateway/version/get?id= 查询版本详情(自动还原 snapshotData
GET /databus/gateway/version/page 分页。
GET /databus/gateway/version/list?apiId= 列出某 API 的全部版本。
PUT /databus/gateway/version/rollback 根据 id + remark 回滚。
GET /databus/gateway/version/compare 差异对比sourceId/targetId
客户端凭证 GET /databus/gateway/credential/page 分页。
GET /databus/gateway/credential/get?id= 详情(含匿名配置)。
POST /databus/gateway/credential/create 新增凭证。
PUT /databus/gateway/credential/update 更新。
DELETE /databus/gateway/credential/delete?id= 删除。
GET /databus/gateway/credential/list-simple 下拉使用。
限流策略 GET /databus/gateway/policy/rate-limit/page 分页检索。
GET /databus/gateway/policy/rate-limit/{id} 详情。
GET /databus/gateway/policy/rate-limit/simple-list 精简列表。
POST/PUT/DELETE /databus/gateway/policy/rate-limit 新增/更新/删除。
访问日志 GET /databus/gateway/access-log/page 分页(需 databus:gateway:access-log:query 权限)。
GET /databus/gateway/access-log/get?id= 单条详情(自动补充 API 描述)。

所有接口默认返回 CommonResult 包装,字段 code/message/data。必要时参考对应 VO位置 controller.admin.gateway.vo)。

5. API 生命周期管理要点

  1. 状态机ApiStatusEnum(草稿/已上线/已下线/已废弃。Integration Flow 只加载 ONLINE 状态定义。
  2. 版本快照:每次保存时写入 databus_api_version,可通过 snapshotData 一键恢复(rollback 接口)。
  3. 变换校验:保存时会校验同一级 TransformPhaseEnum 不可重复,并确保 Start/End 唯一且位于首尾。
  4. 缓存刷新
    • 单 API创建/更新/删除后自动调用 IntegrationFlowManager.refresh(apiCode, version)
    • 全量:管理员可调用 /databus/gateway/cache/refresh 做兜底。

6. 网关请求路径与响应格式

  • 默认 Base Path/admin-api/databus/api/portal(可通过 databus.api-portal.base-path 覆盖;兼容旧版 /databus/api/portal)。
  • 最终路径{basePath}/{apiCode}/{version},示例 /admin-api/databus/api/portal/order.create/v1
  • 支持方法GET/POST/PUT/DELETE/PATCH均被映射为 ApiInvocationContext.httpMethod
  • 响应包装
{
  "code": 200,
  "message": "OK",
  "response": { "bizField": "value" },
  "traceId": "c8a3d52f-..."
}

code 与 HTTP 状态保持一致;response 为 API 变换后的业务体;所有错误也沿用该 Envelope若启用响应加密则返回 Base64 字串)。

7. 配置项(application.yml)重点

说明:下表均有默认值,除非特别标注“必填”,其余可按需覆盖或直接使用默认。未配置 allowed-ips 表示放行所有来源;如开启多租户透传才需设置 tenant-header

databus:
  api-portal:
    base-path: /admin-api/databus/api/portal
    # 仅支持精确 IP留空表示全放行。示例允许两个固定出口 IP
    allowed-ips: [10.0.0.12, 10.0.0.13]
    denied-ips: []
    enable-tenant-header: true
    tenant-header: ZT-Tenant-Id   # 仅当 enable-tenant-header=true 时才需要配置
    security:
      enabled: true
      signature-type: MD5            # 或 SHA256
      encryption-type: AES           # 或 DES
      allowed-clock-skew-seconds: 300
      nonce-ttl-seconds: 600
      require-body-encryption: true
      encrypt-response: true
  gateway:
    web-client:
      connection-pool-enabled: true  # 默认启用 Reactor Netty 连接池,可在排查连接复用/长连接异常时设为 false

必填/必选提示

  • 无需在 yml 中显式写入必填项;若需要启用租户透传,请同步配置 tenant-header
  • 安全校验依赖“客户端凭证”里的 encryptionKey/encryptionType/signatureType;当 require-body-encryption=true 且凭证缺少密钥时调用会失败HTTP 500 应用未配置加密密钥)。
  • 限流能力取决于 API 绑定的策略;未绑定即不做限流。connection-pool-enabled 仅用于排障,可保持默认。

GatewaySecurityFilter 会自动注册到最高优先级 +10确保该路径的请求先经过安全校验。

关闭连接池后,每次 HTTP Step 请求都会新建 TCP 连接,适合短期定位“连接被复用导致 Reset/超时”的场景,但会带来额外的握手开销;切换时可关注启动日志中的 Databus gateway WebClient pooling 提示。

8. 第三方调用流程详解

8.1 前置准备

  1. 申请凭证:在后台创建 API 客户端凭证,得到:
    • appId(对应 ZT-App-Id 头)
    • encryptionKey(用于 AES/DES 对称加密,服务器使用 CryptoSignatureUtils.decrypt 解密)
    • encryptionTypesignatureType
    • allowAnonymous = true 时需选择一个固定系统用户(服务器将自动颁发内部 JWT
  2. 确定 API:记录 apiCodeversion、请求方法、入参/变换契约。
  3. 网络白名单:将第三方出口 IP 加入 allowed-ips,否则直接返回 403。
  4. Redis 要求:需保证 Redis 可用(用于 nonce、防重放、限流计数

8.2 请求构建步骤

序号 操作 说明
1 生成时间戳 timestamp = System.currentTimeMillis(),与服务器时间差 ≤ 300s。
2 生成随机串 nonce 长度≥8可使用 UUID.randomUUID().toString().replace("-", "")
3 准备明文 Body 例如 {"orderNo":"SO20251120001"},记为 plainBody
4 计算签名 使用明文请求数据构造签名载荷Query 参数 + JSON Body 字段(若 Body 非 JSON 则整体放入 body 字段)+ ZT-App-Id + ZT-Timestamp + ZT-Nonce,按字典序拼接 key=value& 连接,使用 MD5/SHA256 计算;结果写入 ZT-Signature
5 加密请求体 使用凭证的 encryptionKey + encryptionTypeplainBody 进行对称加密Base64 结果作为 HTTP BodyContent-Type 可设 text/plainapplication/json
6 组装请求头 ZT-App-Id, ZT-Timestamp, ZT-Nonce, ZT-Signature, ZT-Tenant-Id(可选), X-Client-Id(建议,与限流相关),如有自带 JWT 则设置 Authorization
7 发送请求 URL = https://{host}{basePath}/{apiCode}/{version},方法与 API 定义保持一致。

签名字段示例

ZT-App-Id=demo-app
&ZT-Nonce=0c5e2df9a1
&ZT-Timestamp=1732070400000
&orderNo=SO20251120001
  • Query 参数将被拼接为 key=value(多值以逗号连接),自动忽略 signature 字段。
  • Body 为 JSON 时会按字段展开参与签名;仅在非 JSON 场景下才使用整体报文字符串作为 body 字段参与签名。

cURL 示例

curl -X POST "https://gw.example.com/admin-api/databus/api/portal/order.create/v1" \
  -H "ZT-App-Id: demo-app" \
  -H "ZT-Timestamp: 1732070400000" \
  -H "ZT-Nonce: 0c5e2df9a1" \
  -H "ZT-Signature: 8e377..." \
  -H "X-Client-Id: mall" \
  -H "Content-Type: text/plain" \
  -d "Q2hhcnNldGV4dC1CYXNlNjQgZW5jcnlwdGVkIGJvZHk="

-d 的实际内容应当是 AES/ DES 加密后的 Base64 字符串。

8.3 响应处理

  1. 读取 HTTP 状态与 ApiGatewayResponse.code/message/traceId
  2. security.encrypt-response=true,则响应体本身是加密串,需要使用同一 encryptionKey/encryptionType 解密得到 JSON再解析 response 字段。
  3. traceId 可用于后台日志及 访问日志 页面关联排查。

8.4 错误与重试策略

场景 表现 处理建议
时间戳/Nonce 不合法 HTTP 401message = 请求到达时间超出 300s/重复请求 校准服务器时间;nonce 不可重复Redis TTL 默认 600s
签名失败 HTTP 401message = 签名校验失败 检查签名字符串、字符编码、大小写。
未配置密钥 HTTP 500message = 应用未配置加密密钥 在后台凭证中补齐密钥与算法,或取消强制加密。
限流触发 HTTP 429message = 请求触发限流策略 调整 X-Client-Id 级并发或增大策略 limit/windowSeconds
API 未发布 HTTP 404message = API 定义未发布或已下线 确认 status=ONLINE,并刷新缓存。

9. 限流策略配置

  • 存储在 ApiPolicyRateLimitDO.config,仅支持字段:
{
  "limit": 1000,
  "windowSeconds": 60
}
  • 当前实现只读取上述两个字段Key 模板不可配置。
  • Redis Key 固定为:databus:api:rl:{apiCode}:{version}:{X-Client-Id}X-Client-Id 缺省时使用 anonymous,计数首次出现会自动设置过期。
  • 限流拦截后会抛出 API_RATE_LIMIT_EXCEEDED,在访问日志中标记 status=1/2

10. 访问日志字段对照

字段 说明
traceId 来自 TracerUtils,可在日志与链路追踪中搜索。
requestHeaders, requestBody, responseBody 默认截断至 4000 字符JSON 序列化存储。
status 0=成功,1=客户端错误,2=服务端错误,3=未知。
stepResults 序列化的步骤执行列表(见 ApiStepResult),含 request/response/elapsed/error
extra 附加变量/属性,供排查自定义上下文。

可通过 /databus/gateway/access-log/page + traceIdapiCode 条件快速定位第三方问题。