跳到主要内容
版本:Next

多表 Transform 能力边界

概述

SeaTunnel 的多表 Transform功能允许单个 Transform 节点在一条流水线中同时处理来自上游 数据源(通常是 CDC 连接器)的多张表。本文档精确描述了哪些功能受支持、哪些不受支持,以及 遇到能力边界时的替代方案。


1. 什么是多表 Transform?

在标准的单表流水线中,一个 Source 对应一条 Transform 链,再连接一个 Sink。在多表流水线中, 单个 Source(例如 MySQL-CDC)同时发送来自多张表的记录,下游的每个 Transform 或 Sink 需声明它适用于哪张(些)表。

MySQL-CDC ──► FieldMapper (orders 表) ──► Kafka Sink (orders topic)

├──► FieldMapper (users 表) ──► Kafka Sink (users topic)

└──► (未匹配的表直接透传) ──► Elasticsearch Sink

2. 能力边界一览

能力是否支持说明
按表字段重命名 / 映射✅ 支持使用带 plugin_inputtable_match_regexFieldMapper
按表列过滤✅ 支持使用带 plugin_inputtable_match_regexFilter
按表类型转换✅ 支持使用 FieldMapperdefine_sink_type 选项
单表 SQL Transform✅ 支持使用带 plugin_inputSQL Transform;必要时再用 table_match_regex 限定表范围
TableMerge——将多表合并为一张✅ 支持各表需具有兼容的 Schema
TableRename——重命名流中的表✅ 支持适合按表名路由到 Sink
行级过滤(按 rowkind 过滤)✅ 支持使用 FilterRowKind Transform
跨表 SQL JOIN❌ 不支持参见第 4 节替代方案
多张 CDC 表的聚合❌ 不支持在下游 OLAP 引擎中执行聚合
通过 JOIN 生成新表❌ 不支持使用专用 SQL 引擎
通配符将同一 Transform 应用于所有表⚠️ 部分支持先用 TableMerge 合并,再做单 Transform;Schema 需兼容
流中 Schema 变更(DDL 事件)⚠️ 有限支持取决于 Sink;部分 Sink 支持 Schema 演化,Transform 层不支持
按表 JSON 嵌套字段提取✅ 支持使用带 plugin_inputtable_match_regexJsonPath Transform

3. TableMerge 与 SQL Join 的区别

这是最容易混淆的两个特性。

3.1 TableMerge Transform

TableMerge 将多张表的行流合并为一张结果表。所有输入表必须具有相同(或兼容)的 Schema。通常用于将多表路由至同一个 Sink。

{
"plugin_name": "TableMerge",
"plugin_input": ["orders_2023", "orders_2024"],
"plugin_output": "all_orders",
"merge_by_field": true
}

适用场景:多个来源表结构相同,需要合并(例如分片表、多年分区表,或多库同构表)。

不适用场景:需要关联或补全不同结构的表数据,这类场景需要 JOIN。

3.2 SQL JOIN(多表流水线中不支持)

SQL JOIN 基于关联键将两张不同表的行进行关联。SeaTunnel 的 SQL Transform 在多表流式 流水线中不支持跨表 JOIN。 在单个 SQL Transform 中尝试关联来自两张上游表的记录会导致 配置报错。

推荐替代方案

  • 将两张表写入共享数据湖或数仓(如 Hudi、Iceberg、ClickHouse),在目标端执行 JOIN
  • 使用 Apache Flink 配合 SeaTunnel Flink 连接器进行有状态 JOIN
  • 将“维度表”物化到查找缓存(如 Redis、RocksDB)中,通过自定义 Transform 进行数据补全

4. 跨 Source JOIN 的限制

SeaTunnel 不支持两个输入侧来自不同 Source 的流式 JOIN(例如将 MySQL-CDC 与 PostgreSQL-CDC 流进行 JOIN)。

场景是否支持
单 Source 多表透传
单 Source TableMerge(相同 Schema)
跨 Source JOIN(MySQL-CDC + PG-CDC)
跨 Source JOIN(CDC + JDBC 批量)
同 Source 两张不同表的 JOIN

解决方法:将两个 Source 的数据写入公共 Sink(Kafka、Iceberg 等),再在专用 SQL 引擎 (Flink、Spark、ClickHouse 等)中执行 JOIN。


5. 按表独立配置 Transform 的模式

当你需要对同一 Source 的不同表应用不同的 Transform 时,可声明多个共享相同 plugin_input、但使用不同 table_match_regex 的 Transform 块:

{
"env": {
"job.name": "cdc-multi-table",
"job.mode": "STREAMING"
},
"source": [
{
"plugin_name": "MySQL-CDC",
"plugin_output": "cdc_stream",
"base-url": "jdbc:mysql://localhost:3306/mydb",
"username": "cdc_user",
"password": "password",
"database-names": ["mydb"],
"table-names": ["mydb.orders", "mydb.users", "mydb.products"]
}
],
"transform": [
{
"plugin_name": "FieldMapper",
"plugin_input": ["cdc_stream"],
"plugin_output": "orders_mapped",
"field_mapper": {
"order_id": "id",
"order_amount": "amount"
},
"table_match_regex": "mydb\\.orders"
},
{
"plugin_name": "FieldMapper",
"plugin_input": ["cdc_stream"],
"plugin_output": "users_mapped",
"field_mapper": {
"user_id": "id",
"user_email": "email"
},
"table_match_regex": "mydb\\.users"
}
],
"sink": [
{
"plugin_name": "Kafka",
"plugin_input": ["orders_mapped"],
"topic": "orders"
},
{
"plugin_name": "Kafka",
"plugin_input": ["users_mapped"],
"topic": "users"
},
{
"plugin_name": "Kafka",
"plugin_input": ["cdc_stream"],
"topic": "products",
"table_match_regex": "mydb\\.products"
}
]
}

6. 公共字段示例(共享 Schema)

如果多张表共享一组公共字段,可以先用 TableMerge 合并,再统一应用一个 Transform:

"transform": [
{
"plugin_name": "TableMerge",
"plugin_input": ["cdc_stream"],
"plugin_output": "all_events",
"table_match_regex": "mydb\\.(orders|payments|refunds)"
},
{
"plugin_name": "FieldMapper",
"plugin_input": ["all_events"],
"plugin_output": "all_events_mapped",
"field_mapper": {
"created_at": "event_time",
"event_type": "type"
}
}
]

此模式仅在三张表(orderspaymentsrefunds)都包含 created_atevent_type 字段时有效。若各表 Schema 不同,TableMerge 会在运行时报错。


7. 多表 Transform 的 EtLT 模式

EtLT(Extract、轻量 transform、Load,再在数仓中 Transform)是当 SeaTunnel Transform 层无法完成全量转换需求时的推荐架构模式:

CDC Source


轻量 Transform(字段重命名、类型转换、行过滤)


数据湖 / 数仓(Hudi / Iceberg / ClickHouse)


重型 Transform(JOIN、聚合、复杂 SQL)
在 dbt / Flink SQL / Spark SQL 中执行

适合在 SeaTunnel Transform 层完成的操作

  • 字段重命名 / 过滤
  • 类型标准化
  • 行级过滤
  • Schema 路由(不同表 -> 不同 Topic / 表)

建议下沉到下游处理的操作

  • 跨表 JOIN
  • 聚合计算
  • Pivot / Unpivot
  • 基于维度表的数据补全

8. 常见问题

Q: 我能否不指定每张表,直接将一个 Transform 应用于所有表?

不能直接实现。可以先用 TableMerge 将 Schema 兼容的表合并,再对合并结果应用单一 Transform。若各表 Schema 不同,则必须拆成多个 Transform 块,通常通过不同的 table_match_regex 分别处理。

Q: SQL Transform 支持 JOIN 吗?

不支持。SQL Transform 仅支持单表查询(SELECT、WHERE、表达式等)。如需执行 JOIN, 请先将数据加载至 Sink,再使用外部 SQL 引擎处理。

Q: 未被任何 Transform 匹配的表会怎样?

未匹配的表会继续在流水线中流动,可被使用正确 plugin_inputtable_match_regex 的 Sink 捕获。

Q: 能否在不停机的情况下向现有 CDC 流水线新增表?

这取决于具体连接器。MySQL-CDC 在部分配置下支持动态发现新表,但为新表添加 Transform 仍需重启流水线。建议使用 stop-with-savepoint 将数据丢失降至最低(参见 REST API v2 参考文档)。


参考文档