{
/**
* 处理增量消息
+ *
+ * 兼容服务端 SyncMessage 格式:
+ * - syncId: 同步ID
+ * - eventRecordId: 事件记录ID
+ * - eventType: 事件类型
+ * - eventAction: 事件动作
+ * - dataSnapshot: 业务数据快照(JSON字符串)
+ * - dataVersion: 数据版本
+ * - timestamp: 时间戳
*/
@SuppressWarnings("unchecked")
private void handleIncrementalMessage(String body, DatabusEventType eventType) {
@@ -114,14 +165,36 @@ public class DatabusClientConsumer implements RocketMQListener {
return;
}
- // 2. 解析增量消息
- DatabusMessage> message = JSON.parseObject(body, DatabusMessage.class);
+ // 2. 解析增量消息(兼容服务端 SyncMessage 格式)
+ var json = JSON.parseObject(body);
+
+ // 从 dataSnapshot 字段解析业务数据
+ String dataSnapshot = json.getString("dataSnapshot");
+ Object data = null;
+ Long dataId = null;
+ if (dataSnapshot != null && !dataSnapshot.isEmpty()) {
+ // dataSnapshot 是 JSON 字符串,需要解析成具体类型
+ var dataJson = JSON.parseObject(dataSnapshot);
+ // 获取数据类型并反序列化
+ Class> dataType = handler.getDataType();
+ data = JSON.parseObject(dataSnapshot, dataType);
+ // 提取 dataId
+ dataId = dataJson.getLong("id");
+ }
+
+ // 构建 DatabusMessage
+ DatabusMessage