跳到主要内容
版本:Next

InfluxDB

InfluxDB Sink 连接器

描述

将 SeaTunnel 行数据写入 InfluxDB 1.x。Sink 会把一行数据转换成一个 InfluxDB point,并通过 measurementkey_timekey_tags 决定写入的 measurement、时间戳、tag 和 field。

关键特性

选项

参数名类型必须默认值描述
urlstring-InfluxDB 连接 URL,例如 http://influxdb-host:8086
databasestring-InfluxDB 数据库名称。
measurementstring输入表完整名称InfluxDB measurement 名称。不配置时使用输入表名。
usernamestring-InfluxDB 用户名。必须和 password 一起配置。
passwordstring-InfluxDB 密码。必须和 username 一起配置。
key_timestringprocessing time作为 InfluxDB point 时间戳的字段名。不配置时使用处理时间。
key_tagsarray-写为 InfluxDB tag 的字段名。其他字段会写为 point field。
batch_sizeint1024缓存多少个 point 后写入 InfluxDB。
max_retriesint-写入失败时的最大重试次数。
write_timeoutint5InfluxDB 客户端写入超时时间。
retry_backoff_multiplier_msint-重试等待时间的倍数,单位毫秒。
max_retry_backoff_msint-两次重试之间的最大等待时间,单位毫秒。
rpstring-写入时使用的 retention policy。
epochstringn客户端使用的时间精度。写入精度识别大写值:HMSMSUNS
connect_timeout_mslong15000连接 InfluxDB 的超时时间,单位毫秒。
query_timeout_secint3InfluxDB 客户端读超时时间,单位秒。
multi_table_sink_replicaint-多表写入时的 sink writer 副本数。
common-optionsconfig-Sink 插件通用参数。

url

连接到 influxDB 的 url,例如

http://influxdb-host:8086

database [string]

influxDB 数据库的名称

measurement [string]

influxDB measurement 的名称。这个配置是可选项;不配置时,sink 会使用输入表完整名称作为 measurement 名称,这在多表写入场景很有用。

username [string]

influxDB 用户名

password [string]

influxDB 用户密码

key_time [string]

在 SeaTunnelRow 中指定 influxDB measurement 时间戳的字段名。如果未指定,则使用处理时间作为时间戳

key_tags [array]

在 SeaTunnelRow 中指定 influxDB measurement 标签的字段名。 如果未指定,则包含所有字段作为 influxDB measurement 字段

batch_size [int]

对于批量写入,当缓冲区数量达到 batch_size 数量或时间达到 checkpoint.interval 时,数据将被刷新到 influxDB

max_retries [int]

刷新失败的重试次数

retry_backoff_multiplier_ms [int]

用作生成下一个退避延迟的乘数

max_retry_backoff_ms [int]

在尝试重新请求 influxDB 之前等待的时间量

write_timeout [int]

InfluxDB 客户端写入超时时间。

rp [string]

写入 point 时使用的 retention policy。

epoch [string]

InfluxDB 客户端使用的时间精度。用于 sink 写入精度时,当前连接器识别大写值 HMSMSUNS。默认值 n 会按纳秒精度处理。

query_timeout_sec [int]

InfluxDB 客户端读超时时间,单位秒。

connect_timeout_ms [long]

连接到 InfluxDB 的超时时间,以毫秒为单位

通用选项

Sink 插件通用参数,请参考 Sink 通用选项 详见

示例

写入一个 measurement

sink {
InfluxDB {
url = "http://influxdb-host:8086"
database = "test"
measurement = "sink"
key_time = "time"
key_tags = ["label"]
batch_size = 1
}
}

不显式配置 measurement

不配置 measurement 时,sink 会使用输入表完整名称作为 measurement 名称。

env {
parallelism = 1
job.mode = "BATCH"
}

source {
FakeSource {
schema = {
table = "influxdb_sink"
fields {
label = STRING
c_string = STRING
c_double = DOUBLE
c_bigint = BIGINT
c_float = FLOAT
c_int = INT
c_smallint = SMALLINT
c_boolean = BOOLEAN
time = BIGINT
}
}
rows = [
{
kind = INSERT
fields = ["label_1", "sink_1", 4.3, 200, 2.5, 2, 5, true, 1627529632356]
}
]
}
}

sink {
InfluxDB {
url = "http://influxdb-host:8086"
database = "test"
key_time = "time"
key_tags = ["label"]
batch_size = 1
}
}

多表写入

env {
parallelism = 1
job.mode = "STREAMING"
checkpoint.interval = 5000
}

source {
Mysql-CDC {
url = "jdbc:mysql://127.0.0.1:3306/seatunnel"
username = "root"
password = "******"

table-names = ["seatunnel.role","seatunnel.user","galileo.Bucket"]
}
}

transform {
}

sink {
InfluxDB {
url = "http://influxdb-host:8086"
database = "test"
key_time = "time"
batch_size = 1
}
}

变更日志

Change Log
ChangeCommitVersion
[Feature][Checkpoint] Add check script for source/sink state class serialVersionUID missing (#9118)https://github.com/apache/seatunnel/commit/4f5adeb1c72.3.11
[Improve] influxdb options (#8966)https://github.com/apache/seatunnel/commit/9f498b81332.3.10
[Improve] restruct connector common options (#8634)https://github.com/apache/seatunnel/commit/f3499a6eeb2.3.10
[Improve][dist]add shade check rule (#8136)https://github.com/apache/seatunnel/commit/51ef8000162.3.9
[Feature][Restapi] Allow metrics information to be associated to logical plan nodes (#7786)https://github.com/apache/seatunnel/commit/6b7c53d03c2.3.9
[Improve] Improve some connectors prepare check error message (#7465)https://github.com/apache/seatunnel/commit/6930a25edd2.3.8
[Improve][Connector] Add multi-table sink option check (#7360)https://github.com/apache/seatunnel/commit/2489f6446b2.3.7
[Feature][Core] Support using upstream table placeholders in sink options and auto replacement (#7131)https://github.com/apache/seatunnel/commit/c4ca74122c2.3.6
Support multi-table sink feature for influxdb (#6278)https://github.com/apache/seatunnel/commit/56f13e920d2.3.5
[Improve][Zeta] Add classloader cache mode to fix metaspace leak (#6355)https://github.com/apache/seatunnel/commit/9c3c2f183d2.3.5
[Test][E2E] Add thread leak check for connector (#5773)https://github.com/apache/seatunnel/commit/1f2f3fc5f02.3.4
[BugFix][InfluxDBSource] Resolve invalid SQL in initColumnsIndex method caused by direct QUERY_LIMIT appendage with 'tz' function. (#4829)https://github.com/apache/seatunnel/commit/deed9c62c32.3.4
[Improve][Common] Introduce new error define rule (#5793)https://github.com/apache/seatunnel/commit/9d1b2582b22.3.4
[Improve] Remove use SeaTunnelSink::getConsumedType method and mark it as deprecated (#5755)https://github.com/apache/seatunnel/commit/8de74081002.3.4
Support config column/primaryKey/constraintKey in schema (#5564)https://github.com/apache/seatunnel/commit/eac76b4e502.3.4
[Improve][Connector-V2] Remove scheduler in InfluxDB sink (#5271)https://github.com/apache/seatunnel/commit/f459f500cb2.3.4
[Improve][CheckStyle] Remove useless 'SuppressWarnings' annotation of checkstyle. (#5260)https://github.com/apache/seatunnel/commit/51c0d709ba2.3.4
Merge branch 'dev' into merge/cdchttps://github.com/apache/seatunnel/commit/4324ee19122.3.1
[Improve][Project] Code format with spotless plugin.https://github.com/apache/seatunnel/commit/423b5830382.3.1
[improve][api] Refactoring schema parse (#4157)https://github.com/apache/seatunnel/commit/b2f573a13e2.3.1
[Improve][build] Give the maven module a human readable name (#4114)https://github.com/apache/seatunnel/commit/d7cd6010512.3.1
[Improve][Project] Code format with spotless plugin. (#4101)https://github.com/apache/seatunnel/commit/a2ab1665612.3.1
[Improve][SourceConnector] Unifie InfluxDB source fields to schema (#3897)https://github.com/apache/seatunnel/commit/85a984a64f2.3.1
[Feature][Connector] add get source method to all source connector (#3846)https://github.com/apache/seatunnel/commit/417178fb842.3.1
[Feature][API & Connector & Doc] add parallelism and column projection interface (#3829)https://github.com/apache/seatunnel/commit/b9164b8ba12.3.1
[Hotfix][OptionRule] Fix option rule about all connectors (#3592)https://github.com/apache/seatunnel/commit/226dc6a1192.3.0
[Improve][Connector-V2][Influxdb] Unified exception for influxdb source & sink connector (#3558)https://github.com/apache/seatunnel/commit/4686f35d682.3.0
[Feature][Connector][influx] Expose configurable options in influx db (#3392)https://github.com/apache/seatunnel/commit/b247ff0aef2.3.0
[Feature][Connector-V2] influxdb sink connector (#3174)https://github.com/apache/seatunnel/commit/630e8847912.3.0
[Feature][Connector-V2] Add influxDB connector source (#2697)https://github.com/apache/seatunnel/commit/1d70ea30842.3.0-beta