Skip to main content
Version: Next

InfluxDB

InfluxDB sink connector

Description

Write data to InfluxDB.

Key features

Options

nametyperequireddefault value
urlstringyes-
databasestringyes
measurementstringyes
usernamestringno-
passwordstringno-
key_timestringnoprocessing time
key_tagsarraynoexclude field & key_time
batch_sizeintno1024
max_retriesintno-
retry_backoff_multiplier_msintno-
connect_timeout_mslongno15000
common-optionsconfigno-

url

the url to connect to influxDB e.g.

http://influxdb-host:8086

database [string]

The name of influxDB database

measurement [string]

The name of influxDB measurement

username [string]

influxDB user username

password [string]

influxDB user password

key_time [string]

Specify field-name of the influxDB measurement timestamp in SeaTunnelRow. If not specified, use processing-time as timestamp

key_tags [array]

Specify field-name of the influxDB measurement tags in SeaTunnelRow. If not specified, include all fields with influxDB measurement field

batch_size [int]

For batch writing, when the number of buffers reaches the number of batch_size or the time reaches checkpoint.interval, the data will be flushed into the influxDB

max_retries [int]

The number of retries to flush failed

retry_backoff_multiplier_ms [int]

Using as a multiplier for generating the next delay for backoff

max_retry_backoff_ms [int]

The amount of time to wait before attempting to retry a request to influxDB

connect_timeout_ms [long]

the timeout for connecting to InfluxDB, in milliseconds

common options

Sink plugin common parameters, please refer to Sink Common Options for details

Examples

sink {
InfluxDB {
url = "http://influxdb-host:8086"
database = "test"
measurement = "sink"
key_time = "time"
key_tags = ["label"]
batch_size = 1
}
}

Multiple table

example1

env {
parallelism = 1
job.mode = "STREAMING"
checkpoint.interval = 5000
}

source {
Mysql-CDC {
base-url = "jdbc:mysql://127.0.0.1:3306/seatunnel"
username = "root"
password = "******"

table-names = ["seatunnel.role","seatunnel.user","galileo.Bucket"]
}
}

transform {
}

sink {
InfluxDB {
url = "http://influxdb-host:8086"
database = "test"
measurement = "${table_name}_test"
}
}

Changelog

next version

  • Add InfluxDB Sink Connector