跳到主要内容
版本:Next

Druid

Druid 接收器连接器

描述

通过 Druid 索引任务 API 将数据写入 Apache Druid。

主要特性

数据类型映射

SeaTunnel 数据类型Druid 数据类型
TINYINTLONG
SMALLINTLONG
INTLONG
BIGINTLONG
FLOATFLOAT
DOUBLEDOUBLE
DECIMALDOUBLE
STRINGSTRING
BOOLEANSTRING
TIMESTAMPSTRING

选项

名称类型必需默认值
coordinatorUrlstring-
datasourcestring-
batchSizeint10000
common-options-

coordinatorUrl [string]

Druid 协调器或路由节点的主机和端口,例如 router:8888

SeaTunnel 会向 http://{coordinatorUrl}/druid/indexer/v1/task 提交索引任务,所以这里只需要填写主机和端口,不要带协议和 API 路径。

datasource [string]

要写入的 Druid datasource 名称。

当上游有多张表时,可以使用 ${table_name} 这类占位符,把每张上游表写入不同的 Druid datasource。

batchSize [int]

SeaTunnel 缓存多少行之后向 Druid 提交一次索引任务。默认值为 10000

写入器关闭时,SeaTunnel 也会把剩余缓存数据提交到 Druid。

common options

Sink 插件通用参数,详见 Sink Common Options

多表写入时,可以配合通用参数里的 multi_table_sink_replica 使用。

写入行为

连接器会把 SeaTunnel 每一行数据转换成内联 CSV 数据,然后作为 Druid 原生批量索引任务提交。

Druid 写入需要主时间列。连接器会自动追加一个名为 timestamp 的处理时间列给 Druid 使用;上游的 TIMESTAMP 字段会按上面的类型映射写成字符串维度。

示例

写入单表

env {
parallelism = 1
job.mode = "BATCH"
}

source {
FakeSource {
plugin_output = "fake"
schema = {
fields {
c_boolean = boolean
c_timestamp = timestamp
c_string = string
c_tinyint = tinyint
c_smallint = smallint
c_int = int
c_bigint = bigint
c_float = float
c_double = double
c_decimal = "decimal(16, 1)"
}
}
rows = [
{
kind = INSERT
fields = [true, "2020-02-02T02:02:02", "NEW", 1, 2, 3, 4, 4.3, 5.3, 6.3]
},
{
kind = INSERT
fields = [false, "2012-12-21T12:34:56", "AAA", 1, 1, 333, 323232, 3.1, 9.33333, 99999.99999999]
}
]
}
}

sink {
Druid {
coordinatorUrl = "router:8888"
datasource = "testDataSource"
}
}

写入多表

使用 ${table_name} 可以把每张上游表写入同名的 Druid datasource。

env {
parallelism = 1
job.mode = "BATCH"
}

source {
FakeSource {
tables_configs = [
{
schema = {
table = "druid_sink_1"
fields {
id = int
val_bool = boolean
val_tinyint = tinyint
val_smallint = smallint
val_int = int
val_bigint = bigint
val_float = float
val_double = double
val_decimal = "decimal(16, 1)"
val_string = string
}
}
rows = [
{
kind = INSERT
fields = [1, true, 1, 2, 3, 4, 4.3, 5.3, 6.3, "NEW"]
}
]
},
{
schema = {
table = "druid_sink_2"
fields {
id = int
val_bool = boolean
val_tinyint = tinyint
val_smallint = smallint
val_int = int
val_bigint = bigint
val_float = float
val_double = double
val_decimal = "decimal(16, 1)"
}
}
rows = [
{
kind = INSERT
fields = [1, true, 1, 2, 3, 4, 4.3, 5.3, 6.3]
}
]
}
]
}
}

sink {
Druid {
coordinatorUrl = "router:8888"
datasource = "${table_name}"
}
}

变更日志