Druid
Druid 接收器连接器
描述
通过 Druid 索引任务 API 将数据写入 Apache Druid。
主要特性
数据类型映射
| SeaTunnel 数据类型 | Druid 数据类型 |
|---|---|
| TINYINT | LONG |
| SMALLINT | LONG |
| INT | LONG |
| BIGINT | LONG |
| FLOAT | FLOAT |
| DOUBLE | DOUBLE |
| DECIMAL | DOUBLE |
| STRING | STRING |
| BOOLEAN | STRING |
| TIMESTAMP | STRING |
选项
| 名称 | 类型 | 必需 | 默认值 |
|---|---|---|---|
| coordinatorUrl | string | 是 | - |
| datasource | string | 是 | - |
| batchSize | int | 否 | 10000 |
| common-options | 否 | - |
coordinatorUrl [string]
Druid 协调器或路由节点的主机和端口,例如 router:8888。
SeaTunnel 会向 http://{coordinatorUrl}/druid/indexer/v1/task 提交索引任务,所以这里只需要填写主机和端口,不要带协议和 API 路径。
datasource [string]
要写入的 Druid datasource 名称。
当上游有多张表时,可以使用 ${table_name} 这类占位符,把每张上游表写入不同的 Druid datasource。
batchSize [int]
SeaTunnel 缓存多少行之后向 Druid 提交一次索引任务。默认值为 10000。
写入器关闭时,SeaTunnel 也会把剩余缓存数据提交到 Druid。
common options
Sink 插件通用参数,详见 Sink Common Options。
多表写入时,可以配合通用参数里的 multi_table_sink_replica 使用。
写入行为
连接器会把 SeaTunnel 每一行数据转换成内联 CSV 数据,然后作为 Druid 原生批量索引任务提交。
Druid 写入需要主时间列。连接器会自动追加一个名为 timestamp 的处理时间列给 Druid 使用;上游的 TIMESTAMP 字段会按上面的类型映射写成字符串维度。
示例
写入单表
env {
parallelism = 1
job.mode = "BATCH"
}
source {
FakeSource {
plugin_output = "fake"
schema = {
fields {
c_boolean = boolean
c_timestamp = timestamp
c_string = string
c_tinyint = tinyint
c_smallint = smallint
c_int = int
c_bigint = bigint
c_float = float
c_double = double
c_decimal = "decimal(16, 1)"
}
}
rows = [
{
kind = INSERT
fields = [true, "2020-02-02T02:02:02", "NEW", 1, 2, 3, 4, 4.3, 5.3, 6.3]
},
{
kind = INSERT
fields = [false, "2012-12-21T12:34:56", "AAA", 1, 1, 333, 323232, 3.1, 9.33333, 99999.99999999]
}
]
}
}
sink {
Druid {
coordinatorUrl = "router:8888"
datasource = "testDataSource"
}
}
写入多表
使用 ${table_name} 可以把每张上游表写入同名的 Druid datasource。
env {
parallelism = 1
job.mode = "BATCH"
}
source {
FakeSource {
tables_configs = [
{
schema = {
table = "druid_sink_1"
fields {
id = int
val_bool = boolean
val_tinyint = tinyint
val_smallint = smallint
val_int = int
val_bigint = bigint
val_float = float
val_double = double
val_decimal = "decimal(16, 1)"
val_string = string
}
}
rows = [
{
kind = INSERT
fields = [1, true, 1, 2, 3, 4, 4.3, 5.3, 6.3, "NEW"]
}
]
},
{
schema = {
table = "druid_sink_2"
fields {
id = int
val_bool = boolean
val_tinyint = tinyint
val_smallint = smallint
val_int = int
val_bigint = bigint
val_float = float
val_double = double
val_decimal = "decimal(16, 1)"
}
}
rows = [
{
kind = INSERT
fields = [1, true, 1, 2, 3, 4, 4.3, 5.3, 6.3]
}
]
}
]
}
}
sink {
Druid {
coordinatorUrl = "router:8888"
datasource = "${table_name}"
}
}