Hudi
Hudi 接收器连接器
描述
用于将数据写入 Hudi。
主要特点
选项
基础配置:
名称 | 名称 | 是否必需 | 默认值 |
---|---|---|---|
table_dfs_path | string | 是 | - |
conf_files_path | string | 否 | - |
table_list | string | 否 | - |
schema_save_mode | enum | 否 | CREATE_SCHEMA_WHEN_NOT_EXIST |
common-options | config | 否 | - |
表清单配置:
名称 | 类型 | 是否必需 | 默认值 |
---|---|---|---|
table_name | string | yes | - |
database | string | no | default |
table_type | enum | no | COPY_ON_WRITE |
op_type | enum | no | insert |
record_key_fields | string | no | - |
partition_fields | string | no | - |
batch_interval_ms | Int | no | 1000 |
batch_size | Int | no | 1000 |
insert_shuffle_parallelism | Int | no | 2 |
upsert_shuffle_parallelism | Int | no | 2 |
min_commits_to_keep | Int | no | 20 |
max_commits_to_keep | Int | no | 30 |
index_type | enum | no | BLOOM |
index_class_name | string | no | - |
record_byte_size | Int | no | 1024 |
cdc_enabled | boolean | no | false |
注意: 当此配置对应于单个表时,您可以将table_list中的配置项展平到外层。
table_name [string]
table_name
Hudi 表的名称。
database [string]
database
Hudi 表的database.
table_dfs_path [string]
table_dfs_path
Hudi 表的 DFS 根路径,例如 "hdfs://nameservice/data/hudi/"。
table_type [enum]
table_type
Hudi 表的类型。
record_key_fields [string]
record_key_fields
Hudi 表的记录键字段, 当op_type是UPSERT
类型时, 必须配置该项.
partition_fields [string]
partition_fields
Hudi 表的分区字段.
index_type [string]
index_type
Hudi 表的索引类型. 当前只支持BLOOM
, SIMPLE
, GLOBAL SIMPLE
三种类型.
index_class_name [string]
index_class_name
Hudi 表自定义索引名称,例如: org.apache.seatunnel.connectors.seatunnel.hudi.index.CustomHudiIndex
.
record_byte_size [Int]
record_byte_size
Hudi 表单行记录的大小, 该值可用于预估每个hudi数据文件中记录的大致数量。调整此参数与batch_size
可以有效减少hudi数据文件写放大次数.
conf_files_path [string]
conf_files_path
环境配置文件路径列表(本地路径),用于初始化 HDFS 客户端以读取 Hudi 表文件。示例:"/home/test/hdfs-site.xml;/home/test/core-site.xml;/home/test/yarn-site.xml"。
op_type [enum]
op_type
Hudi 表的操作类型。值可以是 insert
、upsert
或 bulk_insert
。
batch_interval_ms [Int]
batch_interval_ms
批量写入 Hudi 表的时间间隔。
batch_size [Int]
batch_size
批量写入 Hudi 表的记录数大小.
insert_shuffle_parallelism [Int]
insert_shuffle_parallelism
插入数据到 Hudi 表的并行度。
upsert_shuffle_parallelism [Int]
upsert_shuffle_parallelism
更新插入数据到 Hudi 表的并行度。
min_commits_to_keep [Int]
min_commits_to_keep
Hudi 表保留的最少提交数。
max_commits_to_keep [Int]
max_commits_to_keep
Hudi 表保留的最多提交数。
cdc_enabled [boolean]
cdc_enabled
是否持久化Hudi表的CDC变更日志。启用后,在必要时持久化更改数据,表可以作为CDC模式进行查询.
schema_save_mode [Enum]
在启动同步任务之前,针对目标侧已有的表结构选择不同的处理方案
选项介绍:
RECREATE_SCHEMA
:当表不存在时会创建,当表已存在时会删除并重建
CREATE_SCHEMA_WHEN_NOT_EXIST
:当表不存在时会创建,当表已存在时则跳过创建
ERROR_WHEN_SCHEMA_NOT_EXIST
:当表不存在时将抛出错误
IGNORE
:忽略对表的处理
通用选项
数据源插件的通用参数,请参考 Source Common Options 了解详细信息。
示例
单表
sink {
Hudi {
table_dfs_path = "hdfs://nameserivce/data/"
database = "st"
table_name = "test_table"
table_type = "COPY_ON_WRITE"
conf_files_path = "/home/test/hdfs-site.xml;/home/test/core-site.xml;/home/test/yarn-site.xml"
batch_size = 10000
use.kerberos = true
kerberos.principal = "test_user@xxx"
kerberos.principal.file = "/home/test/test_user.keytab"
}
}
多表
env {
parallelism = 1
job.mode = "STREAMING"
checkpoint.interval = 5000
}
source {
Mysql-CDC {
base-url = "jdbc:mysql://127.0.0.1:3306/seatunnel"
username = "root"
password = "******"
table-names = ["seatunnel.role","seatunnel.user","galileo.Bucket"]
}
}
transform {
}
sink {
Hudi {
table_dfs_path = "hdfs://nameserivce/data/"
conf_files_path = "/home/test/hdfs-site.xml;/home/test/core-site.xml;/home/test/yarn-site.xml"
table_list = [
{
database = "st1"
table_name = "role"
table_type = "COPY_ON_WRITE"
op_type="INSERT"
batch_size = 10000
},
{
database = "st1"
table_name = "user"
table_type = "COPY_ON_WRITE"
op_type="UPSERT"
# op_type is 'UPSERT', must configured record_key_fields
record_key_fields = "user_id"
batch_size = 10000
},
{
database = "st1"
table_name = "Bucket"
table_type = "MERGE_ON_READ"
}
]
...
}
}
变更日志
Change Log
Change | Commit | Version |
---|---|---|
[improve] hudi options (#8952) | https://github.com/apache/seatunnel/commit/b24d0e7f8 | 2.3.10 |
[Improve] restruct connector common options (#8634) | https://github.com/apache/seatunnel/commit/f3499a6ee | 2.3.10 |
[Improve][CI]skip ui module, improve module dependent (#8225) | https://github.com/apache/seatunnel/commit/81de0a69c | 2.3.9 |
[Feature][Connector-V2] Support write cdc changelog event into hudi sink (#7845) | https://github.com/apache/seatunnel/commit/934434cc7 | 2.3.9 |
[Feature][Restapi] Allow metrics information to be associated to logical plan nodes (#7786) | https://github.com/apache/seatunnel/commit/6b7c53d03 | 2.3.9 |
[Feature][Connector-V2] Optimize hudi sink (#7662) | https://github.com/apache/seatunnel/commit/0d12520f9 | 2.3.8 |
[Improve][Connector] Add multi-table sink option check (#7360) | https://github.com/apache/seatunnel/commit/2489f6446 | 2.3.7 |
[Feature][Core] Support using upstream table placeholders in sink options and auto replacement (#7131) | https://github.com/apache/seatunnel/commit/c4ca74122 | 2.3.6 |
Bump org.xerial.snappy:snappy-java (#7144) | https://github.com/apache/seatunnel/commit/aa26471fb | 2.3.6 |
[Feature][Connector-V2] [Hudi]Add hudi sink connector (#4405) | https://github.com/apache/seatunnel/commit/dc271dcfb | 2.3.6 |
[Fix][Connector-V2] Fix connector support SPI but without no args constructor (#6551) | https://github.com/apache/seatunnel/commit/5f3c9c36a | 2.3.5 |
[Improve][Common] Adapt FILE_OPERATION_FAILED to CommonError (#5928) | https://github.com/apache/seatunnel/commit/b3dc0bbc2 | 2.3.4 |
[Improve][Common] Introduce new error define rule (#5793) | https://github.com/apache/seatunnel/commit/9d1b2582b | 2.3.4 |
[Hotfix][Zeta] Fix conflict dependency of hadoop-hdfs (#4509) | https://github.com/apache/seatunnel/commit/66923fbdb | 2.3.2 |
[Improve][build] Give the maven module a human readable name (#4114) | https://github.com/apache/seatunnel/commit/d7cd60105 | 2.3.1 |
[Improve][Project] Code format with spotless plugin. (#4101) | https://github.com/apache/seatunnel/commit/a2ab16656 | 2.3.1 |
[Feature][Connector] add get source method to all source connector (#3846) | https://github.com/apache/seatunnel/commit/417178fb8 | 2.3.1 |
[Feature][API & Connector & Doc] add parallelism and column projection interface (#3829) | https://github.com/apache/seatunnel/commit/b9164b8ba | 2.3.1 |
[Feature][Connector V2] expose configurable options in Hudi (#3383) | https://github.com/apache/seatunnel/commit/fd4cec3a9 | 2.3.0 |
fix hudi connector v2 compile error. (#3728) | https://github.com/apache/seatunnel/commit/4fba0aa02 | 2.3.0 |
[Improve][Connector-V2][Hudi] Unified exception for hudi source connector (#3581) | https://github.com/apache/seatunnel/commit/b2fda11dd | 2.3.0 |
[bug][Connector-V2][Hudi] HashCode may be negative (#3184) | https://github.com/apache/seatunnel/commit/8beffbb60 | 2.3.0 |
[DEV][Api] Replace SeaTunnelContext with JobContext and remove singleton pattern (#2706) | https://github.com/apache/seatunnel/commit/cbf82f755 | 2.2.0-beta |
[#2606]Dependency management split (#2630) | https://github.com/apache/seatunnel/commit/fc047be69 | 2.2.0-beta |
[improve][UT] Upgrade junit to 5.+ (#2305) | https://github.com/apache/seatunnel/commit/362319ff3 | 2.2.0-beta |
StateT of SeaTunnelSource should extend Serializable (#2214) | https://github.com/apache/seatunnel/commit/8c426ef85 | 2.2.0-beta |
[Connector-V2] Add Hive sink connector v2 (#2158) | https://github.com/apache/seatunnel/commit/23ad4ee73 | 2.2.0-beta |
[Connector-V2]Add Hudi Source (#2147) | https://github.com/apache/seatunnel/commit/eaedc0a3c | 2.2.0-beta |