Skip to main content
Version: Next

IoTDB

IoTDB sink connector

Support Those Engines​

Spark
Flink
SeaTunnel Zeta

Description​

Used to write data to IoTDB.

Using Dependency​

  1. You need to ensure that the jdbc driver jar package has been placed in directory ${SEATUNNEL_HOME}/plugins/.

For SeaTunnel Zeta Engine​

  1. You need to ensure that the jdbc driver jar package has been placed in directory ${SEATUNNEL_HOME}/lib/.

Key Features​

IoTDB supports the exactly-once feature through idempotent writing. If two pieces of data have the same key and timestamp, the new data will overwrite the old one.

tip

There is a conflict of thrift version between IoTDB and Spark.Therefore, you need to execute rm -f $SPARK_HOME/jars/libthrift* and cp $IOTDB_HOME/lib/libthrift* $SPARK_HOME/jars/ to resolve it.

Supported DataSource Info​

DatasourceSupported VersionsUrl
IoTDB>= 0.13.0localhost:6667

Data Type Mapping​

IotDB Data TypeSeaTunnel Data Type
BOOLEANBOOLEAN
INT32TINYINT
INT32SMALLINT
INT32INT
INT64BIGINT
FLOATFLOAT
DOUBLEDOUBLE
TEXTSTRING

Sink Options​

NameTypeRequiredDefaultDescription
node_urlsStringYes-IoTDB cluster address, the format is "host1:port" or "host1:port,host2:port"
usernameStringYes-IoTDB user username
passwordStringYes-IoTDB user password
key_deviceStringYes-Specify field name of the IoTDB deviceId in SeaTunnelRow
key_timestampStringNoprocessing timeSpecify field-name of the IoTDB timestamp in SeaTunnelRow. If not specified, use processing-time as timestamp
key_measurement_fieldsArrayNoexclude device & timestampSpecify field-name of the IoTDB measurement list in SeaTunnelRow. If not specified, include all fields but exclude device & timestamp
storage_groupArrayNo-Specify device storage group(path prefix)
example: deviceId = ${storage_group} + "." + ${key_device}
batch_sizeIntegerNo1024For batch writing, when the number of buffers reaches the number of batch_size or the time reaches batch_interval_ms, the data will be flushed into the IoTDB
max_retriesIntegerNo-The number of retries to flush failed
retry_backoff_multiplier_msIntegerNo-Using as a multiplier for generating the next delay for backoff
max_retry_backoff_msIntegerNo-The amount of time to wait before attempting to retry a request to IoTDB
default_thrift_buffer_sizeIntegerNo-Thrift init buffer size in IoTDB client
max_thrift_frame_sizeIntegerNo-Thrift max frame size in IoTDB client
zone_idstringNo-java.time.ZoneId in IoTDB client
enable_rpc_compressionBooleanNo-Enable rpc compression in IoTDB client
connection_timeout_in_msIntegerNo-The maximum time (in ms) to wait when connecting to IoTDB
common-optionsno-Sink plugin common parameters, please refer to Sink Common Options for details

Examples​

env {
parallelism = 2
job.mode = "BATCH"
}

source {
FakeSource {
row.num = 16
bigint.template = [1664035200001]
schema = {
fields {
device_name = "string"
temperature = "float"
moisture = "int"
event_ts = "bigint"
c_string = "string"
c_boolean = "boolean"
c_tinyint = "tinyint"
c_smallint = "smallint"
c_int = "int"
c_bigint = "bigint"
c_float = "float"
c_double = "double"
}
}
}
}

Upstream SeaTunnelRow data format is the following:

device_nametemperaturemoistureevent_tsc_stringc_booleanc_tinyintc_smallintc_intc_bigintc_floatc_double
root.test_group.device_a36.11001664035200001abc1true11121474836481.01.0
root.test_group.device_b36.21011664035200001abc2false22221474836492.02.0
root.test_group.device_c36.31021664035200001abc3false33321474836493.03.0

Case1​

only fill required config. use current processing time as timestamp. and include all fields but exclude device & timestamp as measurement fields

sink {
IoTDB {
node_urls = "localhost:6667"
username = "root"
password = "root"
key_device = "device_name" # specify the `deviceId` use device_name field
}
}

Output to IoTDB data format is the following:

IoTDB> SELECT * FROM root.test_group.* align by device;
+------------------------+------------------------+--------------+-----------+--------------+---------+----------+----------+-----------+------+-----------+--------+---------+
| Time| Device| temperature| moisture| event_ts| c_string| c_boolean| c_tinyint| c_smallint| c_int| c_bigint| c_float| c_double|
+------------------------+------------------------+--------------+-----------+--------------+---------+----------+----------+-----------+------+-----------+--------+---------+
|2023-09-01T00:00:00.001Z|root.test_group.device_a| 36.1| 100| 1664035200001| abc1| true| 1| 1| 1| 2147483648| 1.0| 1.0|
|2023-09-01T00:00:00.001Z|root.test_group.device_b| 36.2| 101| 1664035200001| abc2| false| 2| 2| 2| 2147483649| 2.0| 2.0|
|2023-09-01T00:00:00.001Z|root.test_group.device_c| 36.3| 102| 1664035200001| abc2| false| 3| 3| 3| 2147483649| 3.0| 3.0|
+------------------------+------------------------+--------------+-----------+--------------+---------+---------+-----------+-----------+------+-----------+--------+---------+

Case2​

use source event's time

sink {
IoTDB {
node_urls = "localhost:6667"
username = "root"
password = "root"
key_device = "device_name" # specify the `deviceId` use device_name field
key_timestamp = "event_ts" # specify the `timestamp` use event_ts field
}
}

Output to IoTDB data format is the following:

IoTDB> SELECT * FROM root.test_group.* align by device;
+------------------------+------------------------+--------------+-----------+--------------+---------+----------+----------+-----------+------+-----------+--------+---------+
| Time| Device| temperature| moisture| event_ts| c_string| c_boolean| c_tinyint| c_smallint| c_int| c_bigint| c_float| c_double|
+------------------------+------------------------+--------------+-----------+--------------+---------+----------+----------+-----------+------+-----------+--------+---------+
|2022-09-25T00:00:00.001Z|root.test_group.device_a| 36.1| 100| 1664035200001| abc1| true| 1| 1| 1| 2147483648| 1.0| 1.0|
|2022-09-25T00:00:00.001Z|root.test_group.device_b| 36.2| 101| 1664035200001| abc2| false| 2| 2| 2| 2147483649| 2.0| 2.0|
|2022-09-25T00:00:00.001Z|root.test_group.device_c| 36.3| 102| 1664035200001| abc2| false| 3| 3| 3| 2147483649| 3.0| 3.0|
+------------------------+------------------------+--------------+-----------+--------------+---------+---------+-----------+-----------+------+-----------+--------+---------+

Case3​

use source event's time and limit measurement fields

sink {
IoTDB {
node_urls = "localhost:6667"
username = "root"
password = "root"
key_device = "device_name"
key_timestamp = "event_ts"
key_measurement_fields = ["temperature", "moisture"]
}
}

Output to IoTDB data format is the following:

IoTDB> SELECT * FROM root.test_group.* align by device;
+------------------------+------------------------+--------------+-----------+
| Time| Device| temperature| moisture|
+------------------------+------------------------+--------------+-----------+
|2022-09-25T00:00:00.001Z|root.test_group.device_a| 36.1| 100|
|2022-09-25T00:00:00.001Z|root.test_group.device_b| 36.2| 101|
|2022-09-25T00:00:00.001Z|root.test_group.device_c| 36.3| 102|
+------------------------+------------------------+--------------+-----------+

Changelog​

2.2.0-beta 2022-09-26​

  • Add IoTDB Sink Connector

2.3.0-beta 2022-10-20​

  • [Improve] Improve IoTDB Sink Connector (2917)
    • Support align by sql syntax
    • Support sql split ignore case
    • Support restore split offset to at-least-once
    • Support read timestamp from RowRecord
  • [BugFix] Fix IoTDB connector sink NPE (3080)