Hudi
Hudi 接收器连接器
描述
用于将数据写入 Hudi。
主要特点
选项
基础配置:
| 名称 | 名称 | 是否必需 | 默认值 |
|---|---|---|---|
| table_dfs_path | string | 是 | - |
| conf_files_path | string | 否 | - |
| table_list | string | 否 | - |
| schema_save_mode | enum | 否 | CREATE_SCHEMA_WHEN_NOT_EXIST |
| common-options | config | 否 | - |
表清单配置:
| 名称 | 类型 | 是否必需 | 默认值 |
|---|---|---|---|
| table_name | string | yes | - |
| database | string | no | default |
| table_type | enum | no | COPY_ON_WRITE |
| op_type | enum | no | insert |
| record_key_fields | string | no | - |
| partition_fields | string | no | - |
| batch_interval_ms | Int | no | 1000 |
| batch_size | Int | no | 1000 |
| insert_shuffle_parallelism | Int | no | 2 |
| upsert_shuffle_parallelism | Int | no | 2 |
| min_commits_to_keep | Int | no | 20 |
| max_commits_to_keep | Int | no | 30 |
| index_type | enum | no | BLOOM |
| index_class_name | string | no | - |
| record_byte_size | Int | no | 1024 |
| cdc_enabled | boolean | no | false |
注意: 当此配置对应于单个表时,您可以将table_list中的配置项展平到外层。
table_name [string]
table_name Hudi 表的名称。
database [string]
database Hudi 表的database.
table_dfs_path [string]
table_dfs_path Hudi 表的 DFS 根路径,例如 "hdfs://nameservice/data/hudi/"。
table_type [enum]
table_type Hudi 表的类型。
record_key_fields [string]
record_key_fields Hudi 表的记录键字段, 当op_type是UPSERT类型时, 必须配置该项.
partition_fields [string]
partition_fields Hudi 表的分区字段.
index_type [string]
index_type Hudi 表的索引类型. 当前只支持BLOOM, SIMPLE, GLOBAL SIMPLE三种类型.
index_class_name [string]
index_class_name Hudi 表自定义索引名称,例如: org.apache.seatunnel.connectors.seatunnel.hudi.index.CustomHudiIndex.
record_byte_size [Int]
record_byte_size Hudi 表单行记录的大小, 该值可用于预估每个hudi数据文件中记录的大致数量。调整此参数与batch_size可以有效减少hudi数据文件写放大次数.
conf_files_path [string]
conf_files_path 环境配置文件路径列表(本地路径),用于初始化 HDFS 客户端以读取 Hudi 表文件。示例:"/home/test/hdfs-site.xml;/home/test/core-site.xml;/home/test/yarn-site.xml"。
op_type [enum]
op_type Hudi 表的操作类型。值可以是 insert、upsert 或 bulk_insert。
batch_interval_ms [Int]
batch_interval_ms 批量写入 Hudi 表的时间间隔。
batch_size [Int]
batch_size 批量写入 Hudi 表的记录数大小.
insert_shuffle_parallelism [Int]
insert_shuffle_parallelism 插入数据到 Hudi 表的并行度。
upsert_shuffle_parallelism [Int]
upsert_shuffle_parallelism 更新插入数据到 Hudi 表的并行度。
min_commits_to_keep [Int]
min_commits_to_keep Hudi 表保留的最少提交数。
max_commits_to_keep [Int]
max_commits_to_keep Hudi 表保留的最多提交数。
cdc_enabled [boolean]
cdc_enabled 是否持久化Hudi表的CDC变更日志。启用后,在必要时持久化更改数据,表可以作为CDC模式进行查询.
schema_save_mode [Enum]
在启动同步任务之前,针对目标侧已有的表结构选择不同的处理方案
选项介绍:
RECREATE_SCHEMA:当表不存在时会创建,当表已存在时会删除并重建
CREATE_SCHEMA_WHEN_NOT_EXIST:当表不存在时会创建,当表已存在时则跳过创建
ERROR_WHEN_SCHEMA_NOT_EXIST:当表不存在时将抛出错误
IGNORE :忽略对表的处理
通用选项
数据源插件的通用参数,请参考 Source Common Options 了解详细信息。
示例
单表
sink {
Hudi {
table_dfs_path = "hdfs://nameserivce/data/"
database = "st"
table_name = "test_table"
table_type = "COPY_ON_WRITE"
conf_files_path = "/home/test/hdfs-site.xml;/home/test/core-site.xml;/home/test/yarn-site.xml"
batch_size = 10000
use.kerberos = true
kerberos.principal = "test_user@xxx"
kerberos.principal.file = "/home/test/test_user.keytab"
}
}
多表
env {
parallelism = 1
job.mode = "STREAMING"
checkpoint.interval = 5000
}
source {
Mysql-CDC {
base-url = "jdbc:mysql://127.0.0.1:3306/seatunnel"
username = "root"
password = "******"
table-names = ["seatunnel.role","seatunnel.user","galileo.Bucket"]
}
}
transform {
}
sink {
Hudi {
table_dfs_path = "hdfs://nameserivce/data/"
conf_files_path = "/home/test/hdfs-site.xml;/home/test/core-site.xml;/home/test/yarn-site.xml"
table_list = [
{
database = "st1"
table_name = "role"
table_type = "COPY_ON_WRITE"
op_type="INSERT"
batch_size = 10000
},
{
database = "st1"
table_name = "user"
table_type = "COPY_ON_WRITE"
op_type="UPSERT"
# op_type is 'UPSERT', must configured record_key_fields
record_key_fields = "user_id"
batch_size = 10000
},
{
database = "st1"
table_name = "Bucket"
table_type = "MERGE_ON_READ"
}
]
...
}
}