跳到主要内容
版本:Next

Hbase

Hbase 数据连接器

描述

将数据输出到hbase

主要特性

选项

名称类型是否必须默认值
zookeeper_quorumstringyes-
tablestringyes-
rowkey_columnlistyes-
family_nameconfigyes-
rowkey_delimiterstringno""
version_columnstringno-
null_modestringnoskip
wal_writebooleanyesfalse
write_buffer_sizestringno8 1024 1024
encodingstringnoutf8
hbase_extra_configstringno-
common-optionsno-
ttllongno-

zookeeper_quorum [string]

hbase的zookeeper集群主机, 示例: "hadoop001:2181,hadoop002:2181,hadoop003:2181"

table [string]

要写入的表名, 例如: "seatunnel"

rowkey_column [list]

行键的列名列表, 例如: ["id", "uuid"]

family_name [config]

字段的列簇名称映射。例如,上游的行如下所示:

idnameage
1tyrantlucifer27

id作为行键和其他写入不同列簇的字段,可以分配

family_name { name = "info1" age = "info2" }

这主要是name写入列簇info1,age写入将写给列簇 info2

如果要将其他字段写入同一列簇,可以分配

family_name { all_columns = "info" }

这意味着所有字段都将写入该列簇 info

rowkey_delimiter [string]

连接多行键的分隔符,默认 ""

version_column [string]

版本列名称,您可以使用它来分配 hbase 记录的时间戳

null_mode [double]

写入 null 值的模式,支持 [ skip , empty], 默认 skip

  • skip: 当字段为 null ,连接器不会将此字段写入 hbase
  • empty: 当字段为null时,连接器将写入并为此字段生成空值

wal_write [boolean]

wal log 写入标志,默认值 false

write_buffer_size [int]

hbase 客户端的写入缓冲区大小,默认 8 1024 1024

encoding [string]

字符串字段的编码,支持[ utf8 , gbk],默认 utf8

hbase_extra_config [config]

hbase扩展配置

ttl [long]

hbase 写入数据 TTL 时间,默认以表设置的TTL为准,单位毫秒

常见选项

Sink 插件常用参数,详见 Sink 常用选项 Sink Common Options

案例


Hbase {
zookeeper_quorum = "hadoop001:2181,hadoop002:2181,hadoop003:2181"
table = "seatunnel_test"
rowkey_column = ["name"]
family_name {
all_columns = seatunnel
}
}

写入多表

env {
# You can set engine configuration here
execution.parallelism = 1
job.mode = "BATCH"
}

source {
FakeSource {
tables_configs = [
{
schema = {
table = "hbase_sink_1"
fields {
name = STRING
c_string = STRING
c_double = DOUBLE
c_bigint = BIGINT
c_float = FLOAT
c_int = INT
c_smallint = SMALLINT
c_boolean = BOOLEAN
time = BIGINT
}
}
rows = [
{
kind = INSERT
fields = ["label_1", "sink_1", 4.3, 200, 2.5, 2, 5, true, 1627529632356]
}
]
},
{
schema = {
table = "hbase_sink_2"
fields {
name = STRING
c_string = STRING
c_double = DOUBLE
c_bigint = BIGINT
c_float = FLOAT
c_int = INT
c_smallint = SMALLINT
c_boolean = BOOLEAN
time = BIGINT
}
}
rows = [
{
kind = INSERT
fields = ["label_2", "sink_2", 4.3, 200, 2.5, 2, 5, true, 1627529632357]
}
]
}
]
}
}

sink {
Hbase {
zookeeper_quorum = "hadoop001:2181,hadoop002:2181,hadoop003:2181"
table = "${table_name}"
rowkey_column = ["name"]
family_name {
all_columns = info
}
}
}

写入指定列族

Hbase {
zookeeper_quorum = "hbase_e2e:2181"
table = "assign_cf_table"
rowkey_column = ["id"]
family_name {
c_double = "cf1"
c_bigint = "cf2"
}
}

更改日志

下一个版本

  • 添加 hbase 输出连接器 (4049)