Skip to main content
Version: 2.3.8

Kudu

Kudu source connector

Support Kudu Version​

  • 1.11.1/1.12.0/1.13.0/1.14.0/1.15.0

Support Those Engines​

Spark
Flink
SeaTunnel Zeta

Key features​

Description​

Used to read data from Kudu.

The tested kudu version is 1.11.1.

Data Type Mapping​

kudu Data TypeSeaTunnel Data Type
BOOLBOOLEAN
INT8
INT16
INT32
INT
INT64BIGINT
DECIMALDECIMAL
FLOATFLOAT
DOUBLEDOUBLE
STRINGSTRING
UNIXTIME_MICROSTIMESTAMP
BINARYBYTES

Source Options​

NameTypeRequiredDefaultDescription
kudu_mastersStringYes-Kudu master address. Separated by ',',such as '192.168.88.110:7051'.
table_nameStringYes-The name of kudu table.
client_worker_countIntNo2 * Runtime.getRuntime().availableProcessors()Kudu worker count. Default value is twice the current number of cpu cores.
client_default_operation_timeout_msLongNo30000Kudu normal operation time out.
client_default_admin_operation_timeout_msLongNo30000Kudu admin operation time out.
enable_kerberosBoolNofalseKerberos principal enable.
kerberos_principalStringNo-Kerberos principal. Note that all zeta nodes require have this file.
kerberos_keytabStringNo-Kerberos keytab. Note that all zeta nodes require have this file.
kerberos_krb5confStringNo-Kerberos krb5 conf. Note that all zeta nodes require have this file.
scan_token_query_timeoutLongNo30000The timeout for connecting scan token. If not set, it will be the same as operationTimeout.
scan_token_batch_size_bytesIntNo1024 * 1024Kudu scan bytes. The maximum number of bytes read at a time, the default is 1MB.
filterIntNo1024 * 1024Kudu scan filter expressions,Not supported yet.
schemaMapNo1024 * 1024SeaTunnel Schema.
table_listArrayNo-The list of tables to be read. you can use this configuration instead of table_path example: table_list = [{ table_name = "kudu_source_table_1"},{ table_name = "kudu_source_table_2"}]
common-optionsNo-Source plugin common parameters, please refer to Source Common Options for details.

Task Example​

Simple:​

The following example is for a Kudu table named "kudu_source_table", The goal is to print the data from this table on the console and write kudu table "kudu_sink_table"

# Defining the runtime environment
env {
parallelism = 2
job.mode = "BATCH"
}

source {
# This is a example source plugin **only for test and demonstrate the feature source plugin**
kudu {
kudu_masters = "kudu-master:7051"
table_name = "kudu_source_table"
result_table_name = "kudu"
enable_kerberos = true
kerberos_principal = "xx@xx.COM"
kerberos_keytab = "xx.keytab"
}
}

transform {
}

sink {
console {
source_table_name = "kudu"
}

kudu {
source_table_name = "kudu"
kudu_masters = "kudu-master:7051"
table_name = "kudu_sink_table"
enable_kerberos = true
kerberos_principal = "xx@xx.COM"
kerberos_keytab = "xx.keytab"
}
}

Multiple Table​

env {
# You can set engine configuration here
parallelism = 1
job.mode = "STREAMING"
checkpoint.interval = 5000
}

source {
# This is a example source plugin **only for test and demonstrate the feature source plugin**
kudu{
kudu_masters = "kudu-master:7051"
table_list = [
{
table_name = "kudu_source_table_1"
},{
table_name = "kudu_source_table_2"
}
]
result_table_name = "kudu"
}
}

transform {
}

sink {
Assert {
rules {
table-names = ["kudu_source_table_1", "kudu_source_table_2"]
}
}
}

Changelog​

2.2.0-beta 2022-09-26​

  • Add Kudu Source Connector

Next Version​

  • Change plugin name from KuduSource to Kudu 3432