Skip to main content
Version: Next

FieldRename

FieldRename transform plugin

Description

FieldRename transform plugin for rename field name.

Options

nametyperequireddefault valueDescription
convert_casestringnoThe case conversion type. The options can be UPPER, LOWER
prefixstringnoThe prefix to be added to the field name
suffixstringnoThe suffix to be added to the field name
replacements_with_regexarraynoThe array of replacement rules with regex. The replacement rule is a map with replace_from and replace_to fields.

Examples

Convert field to uppercase

env {
parallelism = 1
job.mode = "BATCH"
}

source {
MySQL-CDC {
plugin_output = "customers_mysql_cdc"

username = "root"
password = "123456"
table-names = ["source.user_shop", "source.user_order"]
base-url = "jdbc:mysql://localhost:3306/source"
}
}

transform {
FieldRename {
plugin_input = "customers_mysql_cdc"
plugin_output = "trans_result"

convert_case = "UPPER"
prefix = "F_"
suffix = "_S"
replacements_with_regex = [
{
replace_from = "create_time"
replace_to = "SOURCE_CREATE_TIME"
}
]
}
}

sink {
Jdbc {
plugin_input = "trans_result"

driver="oracle.jdbc.OracleDriver"
url="jdbc:oracle:thin:@oracle-host:1521/ORCLCDB"
user="myuser"
password="mypwd"

generate_sink_sql = true
database = "ORCLCDB"
table = "${database_name}.${table_name}"
primary_keys = ["${primary_key}"]

schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"
data_save_mode = "APPEND_DATA"
}
}

Convert field name to lowercase

env {
parallelism = 1
job.mode = "BATCH"
}

source {
Oracle-CDC {
plugin_output = "customers_oracle_cdc"

base-url = "jdbc:oracle:thin:@localhost:1521/ORCLCDB"
username = "dbzuser"
password = "dbz"
database-names = ["ORCLCDB"]
schema-names = ["DEBEZIUM"]
table-names = ["SOURCE.USER_SHOP", "SOURCE.USER_ORDER"]
}
}

transform {
FieldRename {
plugin_input = "customers_oracle_cdc"
plugin_output = "trans_result"

convert_case = "LOWER"
prefix = "f_"
suffix = "_s"
replacements_with_regex = [
{
replace_from = "CREATE_TIME"
replace_to = "source_create_time"
}
]
}
}

sink {
Jdbc {
plugin_input = "trans_result"

url = "jdbc:mysql://localhost:3306/test"
driver = "com.mysql.cj.jdbc.Driver"
user = "st_user_sink"
password = "mysqlpw"

generate_sink_sql = true
database = "${schema_name}"
table = "${table_name}"
primary_keys = ["${primary_key}"]

schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"
data_save_mode = "APPEND_DATA"
}
}