IoTDB
IoTDB source connector
Support Those Engines
Spark
Flink
SeaTunnel Zeta
Description
Read external data source data through IoTDB.
提示
There is a conflict of thrift version between IoTDB and Spark.Therefore, you need to execute rm -f $SPARK_HOME/jars/libthrift* and cp $IOTDB_HOME/lib/libthrift* $SPARK_HOME/jars/ to resolve it.
Using Dependency
For Spark/Flink Engine
- You need to ensure that the jdbc driver jar package has been placed in directory
${SEATUNNEL_HOME}/plugins/.
For SeaTunnel Zeta Engine
- You need to ensure that the jdbc driver jar package has been placed in directory
${SEATUNNEL_HOME}/lib/.
Key features
supports query SQL and can achieve projection effect.
Supported DataSource Info
| Datasource | Supported Versions | Url | 
|---|---|---|
| IoTDB | >= 0.13.0 | localhost:6667 | 
Data Type Mapping
| IotDB Data Type | SeaTunnel Data Type | 
|---|---|
| BOOLEAN | BOOLEAN | 
| INT32 | TINYINT | 
| INT32 | SMALLINT | 
| INT32 | INT | 
| INT64 | BIGINT | 
| FLOAT | FLOAT | 
| DOUBLE | DOUBLE | 
| TEXT | STRING | 
Source Options
| Name | Type | Required | Default Value | Description | 
|---|---|---|---|---|
| node_urls | string | yes | - | IoTDBcluster address, the format is"host1:port"or"host1:port,host2:port" | 
| username | string | yes | - | IoTDBuser username | 
| password | string | yes | - | IoTDBuser password | 
| sql | string | yes | - | execute sql statement | 
| schema | config | yes | - | the data schema | 
| fetch_size | int | no | - | the fetch_size of the IoTDB when you select | 
| lower_bound | long | no | - | the lower_bound of the IoTDB when you select | 
| upper_bound | long | no | - | the upper_bound of the IoTDB when you select | 
| num_partitions | int | no | - | the num_partitions of the IoTDB when you select | 
| thrift_default_buffer_size | int | no | - | the thrift_default_buffer_size of the IoTDB when you select | 
| thrift_max_frame_size | int | no | - | the thrift max frame size | 
| enable_cache_leader | boolean | no | - | enable_cache_leader of the IoTDB when you select | 
| version | string | no | - | SQL semantic version used by the client, The possible values are: V_0_12, V_0_13 | 
| common-options | no | - | 
split partitions
we can split the partitions of the IoTDB and we used time column split
num_partitions [int]
split num
upper_bound [long]
upper bound of the time column
lower_bound [long]
lower bound of the time column
     split the time range into numPartitions parts
     if numPartitions is 1, use the whole time range
     if numPartitions < (upper_bound - lower_bound), use (upper_bound - lower_bound) partitions
     
     eg: lower_bound = 1, upper_bound = 10, numPartitions = 2
     sql = "select * from test where age > 0 and age < 10"
     
     split result
     split 1: select * from test  where (time >= 1 and time < 6)  and (  age > 0 and age < 10 )
     
     split 2: select * from test  where (time >= 6 and time < 11) and (  age > 0 and age < 10 )
common options
Source plugin common parameters, please refer to Source Common Options for details
Examples
env {
  parallelism = 2
  job.mode = "BATCH"
}
source {
  IoTDB {
    node_urls = "localhost:6667"
    username = "root"
    password = "root"
    sql = "SELECT temperature, moisture, c_int, c_bigint, c_float, c_double, c_string, c_boolean FROM root.test_group.* WHERE time < 4102329600000 align by device"
    schema {
      fields {
        ts = timestamp
        device_name = string
        temperature = float
        moisture = bigint
        c_int = int
        c_bigint = bigint
        c_float = float
        c_double = double
        c_string = string
        c_boolean = boolean
      }
    }
  }
}
sink {
  Console {
  }
}
Upstream IoTDB data format is the following:
IoTDB> SELECT temperature, moisture, c_int, c_bigint, c_float, c_double, c_string, c_boolean FROM root.test_group.* WHERE time < 4102329600000 align by device;
+------------------------+------------------------+--------------+-----------+--------+--------------+----------+---------+---------+----------+
|                    Time|                  Device|   temperature|   moisture|   c_int|      c_bigint|   c_float| c_double| c_string| c_boolean|
+------------------------+------------------------+--------------+-----------+--------+--------------+----------+---------+---------+----------+
|2022-09-25T00:00:00.001Z|root.test_group.device_a|          36.1|        100|       1|   21474836470|      1.0f|     1.0d|      abc|      true|
|2022-09-25T00:00:00.001Z|root.test_group.device_b|          36.2|        101|       2|   21474836470|      2.0f|     2.0d|      abc|      true|
|2022-09-25T00:00:00.001Z|root.test_group.device_c|          36.3|        102|       3|   21474836470|      3.0f|     3.0d|      abc|      true|
+------------------------+------------------------+--------------+-----------+--------+--------------+----------+---------+---------+----------+
Loaded to SeaTunnelRow data format is the following:
| ts | device_name | temperature | moisture | c_int | c_bigint | c_float | c_double | c_string | c_boolean | 
|---|---|---|---|---|---|---|---|---|---|
| 1664035200001 | root.test_group.device_a | 36.1 | 100 | 1 | 21474836470 | 1.0f | 1.0d | abc | true | 
| 1664035200001 | root.test_group.device_b | 36.2 | 101 | 2 | 21474836470 | 2.0f | 2.0d | abc | true | 
| 1664035200001 | root.test_group.device_c | 36.3 | 102 | 3 | 21474836470 | 3.0f | 3.0d | abc | true | 
Changelog
2.2.0-beta 2022-09-26
- Add IoTDB Source Connector
2.3.0-beta 2022-10-20
- [Improve] Improve IoTDB Source Connector (2917)- Support extract timestamp、device、measurement from SeaTunnelRow
- Support TINYINT、SMALLINT
- Support flush cache to database before prepareCommit