跳到主要内容
版本:Next

Clickhouse

Clickhouse 数据连接器

支持引擎

Spark
Flink
SeaTunnel Zeta

核心特性

Clickhouse sink 插件通过实现幂等写入可以达到精准一次,需要配合 aggregating merge tree 支持重复数据删除的引擎。

描述

用于将数据写入 Clickhouse。

支持的数据源信息

为了使用 Clickhouse 连接器,需要以下依赖项。它们可以通过 install-plugin.sh 或从 Maven 中央存储库下载。

数据源支持的版本依赖
Clickhouseuniversal下载

数据类型映射

SeaTunnel 数据类型Clickhouse 数据类型
STRINGString / Int128 / UInt128 / Int256 / UInt256 / Point / Ring / Polygon MultiPolygon
INTInt8 / UInt8 / Int16 / UInt16 / Int32
BIGINTUInt64 / Int64 / IntervalYear / IntervalQuarter / IntervalMonth / IntervalWeek / IntervalDay / IntervalHour / IntervalMinute / IntervalSecond
DOUBLEFloat64
DECIMALDecimal
FLOATFloat32
DATEDate
TIMEDateTime
ARRAYArray
MAPMap

输出选项

名称类型是否必须默认值描述
hostStringYes-ClickHouse 集群地址, 格式是host:port , 允许多个hosts配置. 例如 "host1:8123,host2:8123".
databaseStringYes-ClickHouse 数据库名称.
tableStringYes-表名称.
usernameStringYes-ClickHouse 用户账号.
passwordStringYes-ClickHouse 用户密码.
clickhouse.configMapNo除了上述必须由 clickhouse-jdbc 指定的必填参数外,用户还可以指定多个可选参数,这些参数涵盖了 clickhouse-jdbc 提供的所有参数.
bulk_sizeStringNo20000每次通过Clickhouse-jdbc 写入的行数,即默认是20000.
split_modeStringNofalse此模式仅支持引擎为Distributedclickhouse 表。选项 internal_replication 应该是 true 。他们将在 seatunnel 中拆分分布式表数据,并直接对每个分片进行写入。分片权重定义为 clickhouse 将计算在内。
sharding_keyStringNo-使用 split_mode 时,将数据发送到哪个节点是个问题,默认为随机选择,但可以使用sharding_key参数来指定分片算法的字段。此选项仅在split_modetrue 时有效.
primary_keyStringNo-标记clickhouse表中的主键列,并根据主键执行INSERT/UPDATE/DELETE到clickhouse表.
support_upsertBooleanNofalse支持按查询主键更新插入行.
allow_experimental_lightweight_deleteBooleanNofalse允许基于MergeTree表引擎实验性轻量级删除.
common-optionsNo-Sink插件查用参数,详见Sink常用选项.

如何创建一个clickhouse 同步任务

以下示例演示如何创建将随机生成的数据写入Clickhouse数据库的数据同步作业。

# Set the basic configuration of the task to be performed
env {
parallelism = 1
job.mode = "BATCH"
checkpoint.interval = 1000
}

source {
FakeSource {
row.num = 2
bigint.min = 0
bigint.max = 10000000
split.num = 1
split.read-interval = 300
schema {
fields {
c_bigint = bigint
}
}
}
}

sink {
Clickhouse {
host = "127.0.0.1:9092"
database = "default"
table = "test"
username = "xxxxx"
password = "xxxxx"
}
}

小提示

1.SeaTunnel 部署文档.
2.需要在同步前提前创建要写入的表.
3.当写入 ClickHouse 表,无需设置其结构,因为连接器会在写入前向 ClickHouse 查询当前表的结构信息.

Clickhouse 接收器配置

sink {
Clickhouse {
host = "localhost:8123"
database = "default"
table = "fake_all"
username = "xxxxx"
password = "xxxxx"
clickhouse.config = {
max_rows_to_read = "100"
read_overflow_mode = "throw"
}
}
}

切分模式

sink {
Clickhouse {
host = "localhost:8123"
database = "default"
table = "fake_all"
username = "xxxxx"
password = "xxxxx"

# split mode options
split_mode = true
sharding_key = "age"
}
}

CDC(Change data capture) Sink

sink {
Clickhouse {
host = "localhost:8123"
database = "default"
table = "fake_all"
username = "xxxxx"
password = "xxxxx"

# cdc options
primary_key = "id"
support_upsert = true
}
}

CDC(Change data capture) for *MergeTree engine

sink {
Clickhouse {
host = "localhost:8123"
database = "default"
table = "fake_all"
username = "xxxxx"
password = "xxxxx"

# cdc options
primary_key = "id"
support_upsert = true
allow_experimental_lightweight_delete = true
}
}