跳到主要内容
版本:Next

IoTDB

IoTDB数据接收器

支持引擎

Spark
Flink
SeaTunnel Zeta

描述

将数据写入IoTDB。

Using Dependency

适用于Spark/Flink引擎

  1. 您需要确保jdbc驱动程序jar包已放置在目录${SEATUNNEL_HOME}/plugins/中。

适用于SeaTunnelZeta引擎

  1. 您需要确保jdbc驱动程序jar包已放置在目录“${SEATUNNEL_HOME}/lib/”中。

主要特性

IoTDB通过幂等写支持 exactly-once 功能。如果两条数据 如果使用相同的keytimestamp,新数据将覆盖旧数据。

:::提示

IoTDB和Spark之间存在节俭版本冲突。因此,您需要执行rm -f $SPARK_HOME/jars/libthrift*cp $IOTDB_HOME/lib/libthrift* $SPARK_HOME/jars/来解决这个问题。

:::

支持的数据源信息

数据源Supported 版本地址
IoTDB>= 0.13.0localhost:6667

数据类型映射

IotDB 数据类型SeaTunnel 数据类型
BOOLEANBOOLEAN
INT32TINYINT
INT32SMALLINT
INT32INT
INT64BIGINT
FLOATFLOAT
DOUBLEDOUBLE
TEXTSTRING

Sink 选项

名称类型是否必传默认值描述
node_urlsString-IoTDB 集群地址,格式为 "host1:port""host1:port,host2:port"
usernameString-IoTDB 用户的用户名
passwordString-IoTDB 用户的密码
key_deviceString-在SeaTunnelRow中指定IoTDB设备ID的字段名
key_timestampStringprocessing time在SeaTunnelRow中指定IoTDB时间戳的字段名。如果未指定,则使用处理时间作为时间戳
key_measurement_fieldsArrayexclude device & timestamp在SeaTunnelRow中指定IoTDB测量列表的字段名称。如果未指定,则包括所有字段,但排除 device & timestamp
storage_groupArray-指定设备存储组(路径前缀)
例如: deviceId = ${storage_group} + "." + ${key_device}
batch_sizeInteger1024对于批写入,当缓冲区的数量达到batch_size的数量或时间达到batch_interval_ms时,数据将被刷新到IoTDB中
max_retriesInteger-刷新的重试次数 failed
retry_backoff_multiplier_msInteger-用作生成下一个退避延迟的乘数
max_retry_backoff_msInteger-尝试重试对IoTDB的请求之前等待的时间量
default_thrift_buffer_sizeInteger-IoTDB客户端中节省初始化缓冲区大小
max_thrift_frame_sizeInteger-IoTDB客户端中节约最大帧大小
zone_idstring-IoTDB java.time.ZoneId client
enable_rpc_compressionBoolean-IoTDB客户端中启用rpc压缩
connection_timeout_in_msInteger-连接到IoTDB时等待的最长时间(毫秒)
common-options-Sink插件常用参数,详见[Sink common Options](../Sink common Options.md)

示例

env {
parallelism = 2
job.mode = "BATCH"
}

source {
FakeSource {
row.num = 16
bigint.template = [1664035200001]
schema = {
fields {
device_name = "string"
temperature = "float"
moisture = "int"
event_ts = "bigint"
c_string = "string"
c_boolean = "boolean"
c_tinyint = "tinyint"
c_smallint = "smallint"
c_int = "int"
c_bigint = "bigint"
c_float = "float"
c_double = "double"
}
}
}
}

上游SeaTunnelRow数据格式如下:

device_nametemperaturemoistureevent_tsc_stringc_booleanc_tinyintc_smallintc_intc_bigintc_floatc_double
root.test_group.device_a36.11001664035200001abc1true11121474836481.01.0
root.test_group.device_b36.21011664035200001abc2false22221474836492.02.0
root.test_group.device_c36.31021664035200001abc3false33321474836493.03.0

案例1

只填写所需的配置。 使用当前处理时间作为时间戳。并包括所有字段,但不包括device & timestamp作为测量字段

sink {
IoTDB {
node_urls = "localhost:6667"
username = "root"
password = "root"
key_device = "device_name" # specify the `deviceId` use device_name field
}
}

"IoTDB"数据格式的输出如下:

IoTDB> SELECT * FROM root.test_group.* align by device;
+------------------------+------------------------+--------------+-----------+--------------+---------+----------+----------+-----------+------+-----------+--------+---------+
| Time| Device| temperature| moisture| event_ts| c_string| c_boolean| c_tinyint| c_smallint| c_int| c_bigint| c_float| c_double|
+------------------------+------------------------+--------------+-----------+--------------+---------+----------+----------+-----------+------+-----------+--------+---------+
|2023-09-01T00:00:00.001Z|root.test_group.device_a| 36.1| 100| 1664035200001| abc1| true| 1| 1| 1| 2147483648| 1.0| 1.0|
|2023-09-01T00:00:00.001Z|root.test_group.device_b| 36.2| 101| 1664035200001| abc2| false| 2| 2| 2| 2147483649| 2.0| 2.0|
|2023-09-01T00:00:00.001Z|root.test_group.device_c| 36.3| 102| 1664035200001| abc2| false| 3| 3| 3| 2147483649| 3.0| 3.0|
+------------------------+------------------------+--------------+-----------+--------------+---------+---------+-----------+-----------+------+-----------+--------+---------+

案例2

使用源事件的时间

sink {
IoTDB {
node_urls = "localhost:6667"
username = "root"
password = "root"
key_device = "device_name" # specify the `deviceId` use device_name field
key_timestamp = "event_ts" # specify the `timestamp` use event_ts field
}
}

"IoTDB"数据格式的输出如下:

IoTDB> SELECT * FROM root.test_group.* align by device;
+------------------------+------------------------+--------------+-----------+--------------+---------+----------+----------+-----------+------+-----------+--------+---------+
| Time| Device| temperature| moisture| event_ts| c_string| c_boolean| c_tinyint| c_smallint| c_int| c_bigint| c_float| c_double|
+------------------------+------------------------+--------------+-----------+--------------+---------+----------+----------+-----------+------+-----------+--------+---------+
|2022-09-25T00:00:00.001Z|root.test_group.device_a| 36.1| 100| 1664035200001| abc1| true| 1| 1| 1| 2147483648| 1.0| 1.0|
|2022-09-25T00:00:00.001Z|root.test_group.device_b| 36.2| 101| 1664035200001| abc2| false| 2| 2| 2| 2147483649| 2.0| 2.0|
|2022-09-25T00:00:00.001Z|root.test_group.device_c| 36.3| 102| 1664035200001| abc2| false| 3| 3| 3| 2147483649| 3.0| 3.0|
+------------------------+------------------------+--------------+-----------+--------------+---------+---------+-----------+-----------+------+-----------+--------+---------+

案例3

使用源事件的时间和限制度量字段

sink {
IoTDB {
node_urls = "localhost:6667"
username = "root"
password = "root"
key_device = "device_name"
key_timestamp = "event_ts"
key_measurement_fields = ["temperature", "moisture"]
}
}

"IoTDB"数据格式的输出如下:

IoTDB> SELECT * FROM root.test_group.* align by device;
+------------------------+------------------------+--------------+-----------+
| Time| Device| temperature| moisture|
+------------------------+------------------------+--------------+-----------+
|2022-09-25T00:00:00.001Z|root.test_group.device_a| 36.1| 100|
|2022-09-25T00:00:00.001Z|root.test_group.device_b| 36.2| 101|
|2022-09-25T00:00:00.001Z|root.test_group.device_c| 36.3| 102|
+------------------------+------------------------+--------------+-----------+

更改日志

2.2.0-beta 2022-09-26

  • 添加IoTDB数据接收器

2.3.0-beta 2022-10-20

  • [Improve] 改进IoTDB数据接收器 (2917)
    • 支持sql语法对齐
    • 支持sql拆分忽略案例
    • 支持将拆分偏移量恢复到至少一次
    • 支持从RowRecord读取时间戳
  • [BugFix] 固定IoTDB连接器写入NPE (3080)