Kudu
Kudu数据接收器
支持Kudu版本
- 1.11.1/1.12.0/1.13.0/1.14.0/1.15.0
支持引擎
Spark
Flink
SeaTunnel Zeta
主要特性
数据类型映射
SeaTunnel 数据类型 | Kudu 数据类型 |
---|---|
BOOLEAN | BOOL |
INT | INT8 INT16 INT32 |
BIGINT | INT64 |
DECIMAL | DECIMAL |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
STRING | STRING |
TIMESTAMP | UNIXTIME_MICROS |
BYTES | BINARY |
Sink 选项
名称 | 类型 | 是否必填 | 默认值 | 描述 |
---|---|---|---|---|
kudu_masters | String | 是 | - | Kudu主地址。用“,”分隔,例如“192.168.88.110:7051”。 |
table_name | String | 是 | - | Kudu表的名字。 |
client_worker_count | Int | 否 | 2 * Runtime.getRuntime().availableProcessors() | Kudu工人数。默认值是当前cpu核数的两倍。 |
client_default_operation_timeout_ms | Long | 否 | 30000 | Kudu正常运行超时。 |
client_default_admin_operation_timeout_ms | Long | 否 | 30000 | Kudu管理员操作超时。 |
enable_kerberos | Bool | 否 | false | 启用Kerberos主体。 |
kerberos_principal | String | 否 | - | Kerberos主体。请注意,所有zeta节点都需要此文件。 |
kerberos_keytab | String | 否 | - | Kerberos密钥表。请注意,所有zeta节点都需要此文件。 |
kerberos_krb5conf | String | 否 | - | Kerberos krb5 conf.请注意,所有zeta节点都需要此文件。 |
save_mode | String | 否 | - | 存储模式,支持 overwrite 和 append . |
session_flush_mode | String | 否 | AUTO_FLUSH_SYNC | Kudu刷新模式。默认AUTO_FLUSH_SYNC。 |
batch_size | Int | 否 | 1024 | 超过此记录数的刷新最大大小(包括所有追加、追加和删除记录)将刷新数据。默认值为100 |
buffer_flush_interval | Int | 否 | 10000 | 刷新间隔期间,异步线程将刷新数据。 |
ignore_not_found | Bool | 否 | false | 如果为true,则忽略所有未找到的行。 |
ignore_not_duplicate | Bool | 否 | false | 如果为true,则忽略所有dulicate行。 |
common-options | 否 | - | 源插件常用参数,详见[Source common Options](../sink common-Options.md)。 |
任务示例
简单示例:
以下示例引用了FakeSource kudu写入表kudu_sink_table
env {
parallelism = 1
job.mode = "BATCH"
}
source {
FakeSource {
plugin_output = "kudu"
schema = {
fields {
id = int
val_bool = boolean
val_int8 = tinyint
val_int16 = smallint
val_int32 = int
val_int64 = bigint
val_float = float
val_double = double
val_decimal = "decimal(16, 1)"
val_string = string
val_unixtime_micros = timestamp
}
}
rows = [
{
kind = INSERT
fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"]
},
{
kind = INSERT
fields = [2, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"]
},
{
kind = INSERT
fields = [3, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"]
},
{
kind = UPDATE_BEFORE
fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"]
},
{
kind = UPDATE_AFTER
fields = [1, true, 2, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"]
},
{
kind = DELETE
fields = [2, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"]
}
]
}
}
sink {
kudu{
plugin_input = "kudu"
kudu_masters = "kudu-master-cdc:7051"
table_name = "kudu_sink_table"
enable_kerberos = true
kerberos_principal = "xx@xx.COM"
kerberos_keytab = "xx.keytab"
}
}
多表
示例1
env {
parallelism = 1
job.mode = "STREAMING"
checkpoint.interval = 5000
}
source {
Mysql-CDC {
base-url = "jdbc:mysql://127.0.0.1:3306/seatunnel"
username = "root"
password = "******"
table-names = ["seatunnel.role","seatunnel.user","galileo.Bucket"]
}
}
transform {
}
sink {
kudu{
kudu_masters = "kudu-master-cdc:7051"
table_name = "${database_name}_${table_name}_test"
}
}
示例2
env {
parallelism = 1
job.mode = "BATCH"
}
source {
Jdbc {
driver = oracle.jdbc.driver.OracleDriver
url = "jdbc:oracle:thin:@localhost:1521/XE"
user = testUser
password = testPassword
table_list = [
{
table_path = "TESTSCHEMA.TABLE_1"
},
{
table_path = "TESTSCHEMA.TABLE_2"
}
]
}
}
transform {
}
sink {
kudu{
kudu_masters = "kudu-master-cdc:7051"
table_name = "${schema_name}_${table_name}_test"
}
}
变更日志
Change Log
Change | Commit | Version |
---|---|---|
[Improve] restruct connector common options (#8634) | https://github.com/apache/seatunnel/commit/f3499a6eeb | dev |
[Improve][Transform] Rename sql transform table name from 'fake' to 'dual' (#8298) | https://github.com/apache/seatunnel/commit/e6169684fb | 2.3.9 |
[Improve][dist]add shade check rule (#8136) | https://github.com/apache/seatunnel/commit/51ef800016 | 2.3.9 |
[Improve][API] Unified tables_configs and table_list (#8100) | https://github.com/apache/seatunnel/commit/84c0b8d660 | 2.3.9 |
[Feature][Core] Rename result_table_name /source_table_name to plugin_input/plugin_output (#8072) | https://github.com/apache/seatunnel/commit/c7bbd322db | 2.3.9 |
[Feature][Restapi] Allow metrics information to be associated to logical plan nodes (#7786) | https://github.com/apache/seatunnel/commit/6b7c53d03c | 2.3.9 |
[Improve][Connector] Add multi-table sink option check (#7360) | https://github.com/apache/seatunnel/commit/2489f6446b | 2.3.7 |
[Feature][Core] Support using upstream table placeholders in sink options and auto replacement (#7131) | https://github.com/apache/seatunnel/commit/c4ca74122c | 2.3.6 |
correct the typo of kudu kerberos config (#6905) | https://github.com/apache/seatunnel/commit/fcb8554972 | 2.3.6 |
[Fix][KuduCatalogFactory]: Fix KuduCatalogFactory.optionRule() will throw an Exception (#6787) | https://github.com/apache/seatunnel/commit/45a4e1532d | 2.3.6 |
[Feature][Engine] Unify job env parameters (#6003) | https://github.com/apache/seatunnel/commit/2410ab38f0 | 2.3.4 |
[Feature][Connector-V2] Support multi-table sink feature for kudu (#5951) | https://github.com/apache/seatunnel/commit/82460c0bf0 | 2.3.4 |
[Feature] Add unsupported datatype check for all catalog (#5890) | https://github.com/apache/seatunnel/commit/b9791285a0 | 2.3.4 |
[Feature][Kudu] Support multi-table source read (#5878) | https://github.com/apache/seatunnel/commit/8d9a0b7d11 | 2.3.4 |
[Improve][Common] Introduce new error define rule (#5793) | https://github.com/apache/seatunnel/commit/9d1b2582b2 | 2.3.4 |
[Feature][Connector-V2] Support TableSourceFactory/TableSinkFactory on kudu (#5789) | https://github.com/apache/seatunnel/commit/10e791d60a | 2.3.4 |
[Improve] Remove use SeaTunnelSink::getConsumedType method and mark it as deprecated (#5755) | https://github.com/apache/seatunnel/commit/8de7408100 | 2.3.4 |
[Feature][Kudu] Refactor Kudu functionality and Sink support CDC data. (#5437) | https://github.com/apache/seatunnel/commit/22110eb7b3 | 2.3.4 |
[Improve][build] Give the maven module a human readable name (#4114) | https://github.com/apache/seatunnel/commit/d7cd601051 | 2.3.1 |
[Improve][Project] Code format with spotless plugin. (#4101) | https://github.com/apache/seatunnel/commit/a2ab166561 | 2.3.1 |
[Hotfix][Connector-V2] Fix connector source snapshot state NPE (#4027) | https://github.com/apache/seatunnel/commit/e39c4988cc | 2.3.1 |
[Feature][Connector] add get source method to all source connector (#3846) | https://github.com/apache/seatunnel/commit/417178fb84 | 2.3.1 |
[Feature][API & Connector & Doc] add parallelism and column projection interface (#3829) | https://github.com/apache/seatunnel/commit/b9164b8ba1 | 2.3.1 |
[Hotfix][OptionRule] Fix option rule about all connectors (#3592) | https://github.com/apache/seatunnel/commit/226dc6a119 | 2.3.0 |
[Improve][Connector-V2] Bad smell ToArrayCallWithZeroLengthArrayArgument: (#3577) | https://github.com/apache/seatunnel/commit/cc448d98c4 | 2.3.0 |
[Improve][Connector-V2][Kudu] Unified exception for kudu source & sink connector (#3564) | https://github.com/apache/seatunnel/commit/273418ddc9 | 2.3.0 |
[Connector][Dependency] Add Miss Dependency Cassandra And Change Kudu Plugin Name (#3432) | https://github.com/apache/seatunnel/commit/6ac6a0a0cd | 2.3.0 |
[Feature][Connector V2] expose configurable options in Kudu (#3365) | https://github.com/apache/seatunnel/commit/c422210e2c | 2.3.0 |
[Feature][Core][Connector-V2] Unified The way of setting JobName (#2908) | https://github.com/apache/seatunnel/commit/bf2c97484b | 2.3.0-beta |
remove duplicate ExceptionUtil class (#3037) | https://github.com/apache/seatunnel/commit/c9dc7c50c2 | 2.3.0-beta |
[Improve][all] change Log to @Slf4j (#3001) | https://github.com/apache/seatunnel/commit/6016100f12 | 2.3.0-beta |
[Improve][Connector-V2]Kudu Sink Connector Support to upsert row | https://github.com/apache/seatunnel/commit/1ece805ab1 | 2.3.0-beta |
[DEV][Api] Replace SeaTunnelContext with JobContext and remove singleton pattern (#2706) | https://github.com/apache/seatunnel/commit/cbf82f755c | 2.2.0-beta |
[#2606]Dependency management split (#2630) | https://github.com/apache/seatunnel/commit/fc047be69b | 2.2.0-beta |
[Connector-V2] Add Kudu source and sink connector (#2254) | https://github.com/apache/seatunnel/commit/0483cbc2df | 2.2.0-beta |