跳到主要内容
版本:Next

Metadata

Metadata 转换插件

描述

Metadata 转换插件用于将数据行中的元数据信息提取并转换为普通字段,方便后续处理和分析。

核心功能:

  • 将元数据(如数据库名、表名、行类型等)提取为可见字段
  • 支持自定义输出字段名称
  • 不改变原有数据字段,只是新增元数据字段

典型应用场景:

  • CDC 数据同步时需要记录数据来源(库名、表名)
  • 需要追踪数据变更类型(INSERT、UPDATE、DELETE)
  • 需要记录数据的事件时间和延迟信息
  • 多表合并时需要标识数据来源

支持的元数据字段

元数据Key输出类型说明数据来源
Databasestring数据所属的数据库名称所有连接器
Tablestring数据所属的表名称所有连接器
RowKindstring行的变更类型,值为:+I(插入)、-U(更新前)、+U(更新后)、-D(删除)所有连接器
EventTimelong数据变更的事件时间戳(毫秒)CDC 连接器
Delaylong数据采集延迟时间(毫秒),即数据抽取时间与数据库变更时间的差值CDC 连接器
Partitionstring数据所属的分区信息,多个分区字段使用逗号分隔支持分区的连接器

重要说明

  1. 元数据字段区分大小写:配置时必须严格按照上表中的 Key 名称(如 DatabaseTableRowKind 等)
  2. CDC 专有字段EventTimeDelay 仅在使用 CDC 连接器时有效(TiDB-CDC 除外)

配置选项

参数名类型是否必填默认值说明
metadata_fieldsmap空映射元数据字段与输出字段的映射关系,格式为 元数据Key = 输出字段名

metadata_fields [map]

定义元数据字段到输出字段的映射关系。

配置格式:

metadata_fields {
<元数据Key> = <输出字段名>
<元数据Key> = <输出字段名>
...
}

配置示例:

metadata_fields {
Database = source_db # 将数据库名映射到 source_db 字段
Table = source_table # 将表名映射到 source_table 字段
RowKind = op_type # 将行类型映射到 op_type 字段
EventTime = event_ts # 将事件时间映射到 event_ts 字段
Delay = sync_delay # 将延迟时间映射到 sync_delay 字段
Partition = partition_info # 将分区信息映射到 partition_info 字段
}

注意事项:

  • 左侧必须是支持的元数据 Key(见上表),且严格区分大小写
  • 右侧是自定义的输出字段名,不能与原有字段重名
  • 可以只选择需要的元数据字段,不必全部配置

完整示例

示例 1:MySQL CDC 数据同步,提取所有元数据

从 MySQL 数据库同步数据,并提取所有可用的元数据信息。

env {
parallelism = 1
job.mode = "STREAMING"
checkpoint.interval = 5000
}

source {
MySQL-CDC {
plugin_output = "mysql_cdc_source"
server-id = 5652
username = "root"
password = "your_password"
table-names = ["mydb.users"]
url = "jdbc:mysql://localhost:3306/mydb"
}
}

transform {
Metadata {
plugin_input = "mysql_cdc_source"
plugin_output = "metadata_added"
metadata_fields {
Database = source_database # 提取数据库名
Table = source_table # 提取表名
RowKind = change_type # 提取变更类型
EventTime = event_timestamp # 提取事件时间
Delay = sync_delay_ms # 提取同步延迟
}
}
}

sink {
Console {
plugin_input = "metadata_added"
}
}

输入数据示例:

原始数据行(来自 mydb.users 表):
id=1, name="张三", age=25
RowKind: +I (INSERT)

输出数据示例:

转换后的数据行:
id=1, name="张三", age=25, source_database="mydb", source_table="users",
change_type="+I", event_timestamp=1699000000000, sync_delay_ms=100

示例 2:只提取部分元数据

只提取数据来源信息(库名和表名),用于多表合并场景。

env {
parallelism = 1
job.mode = "STREAMING"
}

source {
MySQL-CDC {
plugin_output = "multi_table_source"
server-id = 5652
username = "root"
password = "your_password"
table-names = ["db1.orders", "db2.orders"]
url = "jdbc:mysql://localhost:3306"
}
}

transform {
Metadata {
plugin_input = "multi_table_source"
plugin_output = "with_source_info"
metadata_fields {
Database = db_name
Table = table_name
}
}
}

sink {
Jdbc {
plugin_input = "with_source_info"
url = "jdbc:mysql://localhost:3306/target_db"
table = "merged_orders"
# 目标表会包含 db_name 和 table_name 字段,用于标识数据来源
}
}