Sink 参数占位符
介绍
SeaTunnel 提供了 Sink 参数占位符自动替换功能,可让您通过占位符获取上游表元数据。
当您需要动态获取上游表元数据(例如多表写入)时,此功能至关重要。
本文档将指导您如何使用这些占位符以及如何有效地利用它们。
支持的引擎
SeaTunnel Zeta
Flink
Spark
占位符变量
占位符主要通过以下表达式实现:
- ${database_name}- 用于获取上游表中的数据库名称
- 也可以通过表达式指定默认值:${database_name:default_my_db}
 
- ${schema_name}- 用于获取上游表中的 schema 名称
- 也可以通过表达式指定默认值:${schema_name:default_my_schema}
 
- ${table_name}- 用于获取上游表中的 table 名称
- 也可以通过表达式指定默认值:${table_name:default_my_table}
 
- ${schema_full_name}- 用于获取上游表中的 schema 全路径名称,包含 database/schema 名称
 
- ${table_full_name}- 用于获取上游表中的 table 全路径名称,包含 database/schema/table 名称
 
- ${primary_key}- 用于获取上游表中的主键字段名称列表
 
- ${unique_key}- 用于获取上游表中的唯一键字段名称列表
 
- ${field_names}- 用于获取上游表中的所有字段名称列表
 
配置
先决条件:
- 确认 Sink 连接器已经支持了 TableSinkFactoryAPI
配置示例 1
env {
  // ignore...
}
source {
  MySQL-CDC {
    // ignore...
  }
}
transform {
  // ignore...
}
sink {
  jdbc {
    url = "jdbc:mysql://localhost:3306"
    driver = "com.mysql.cj.jdbc.Driver"
    user = "root"
    password = "123456"
    database = "${database_name}_test"
    table = "${table_name}_test"
    primary_keys = ["${primary_key}"]
  }
}
配置示例 2
env {
  // ignore...
}
source {
  Oracle-CDC {
    // ignore...
  }
}
transform {
  // ignore...
}
sink {
  jdbc {
    url = "jdbc:mysql://localhost:3306"
    driver = "com.mysql.cj.jdbc.Driver"
    user = "root"
    password = "123456"
    database = "${schema_name}_test"
    table = "${table_name}_test"
    primary_keys = ["${primary_key}"]
  }
}
占位符的替换将在连接器启动之前完成,确保 Sink 参数在使用前已准备就绪。 若该占位符变量没有被替换,则可能是上游表元数据缺少该选项,例如:
- mysqlsource 连接器不包含- ${schema_name}元数据
- oraclesource 连接器不包含- ${databse_name}元数据
- ...