IoTDB
IoTDB source connector
Support Those Engines
Spark
Flink
SeaTunnel Zeta
Description
Used to read data from IoTDB.
Key features
- batch
 - stream
 - exactly-once
 -  column projection
IoTDB allows column projection using SQL query.
 - parallelism
 - support user-defined split
 
Supported DataSource Info
| Datasource | Supported Versions | Url | 
|---|---|---|
| IoTDB | 2.0 <= version | localhost:6667 | 
Data Type Mapping
| IotDB Data Type | SeaTunnel Data Type | 
|---|---|
| BOOLEAN | BOOLEAN | 
| INT32 | TINYINT | 
| INT32 | SMALLINT | 
| INT32 | INT | 
| INT64 | BIGINT | 
| FLOAT | FLOAT | 
| DOUBLE | DOUBLE | 
| TEXT | STRING | 
| STRING | STRING | 
| TIMESTAMP | BIGINT | 
| TIMESTAMP | TIMESTAMP | 
| BLOB | STRING | 
| DATE | DATE | 
Source Options
| Name | Type | Required | Default Value | Description | 
|---|---|---|---|---|
| node_urls | Array | Yes | - | IoTDB cluster address, the format is ["host1:port"] or ["host1:port","host2:port"] | 
| username | String | Yes | - | IoTDB username | 
| password | String | Yes | - | IoTDB user password | 
| sql_dialect | String | No | tree | The sql dialect of IoTDB, options available is "tree" or "table" | 
| database | String | No | - | The database selected (only valid when sql_dielct is "table") | 
| sql | String | Yes | - | The sql statement to be executed | 
| schema | Config | Yes | - | The data schema | 
| fetch_size | Integer | No | - | The fetch_size of the IoTDB when you select | 
| lower_bound | Long | No | - | The lower_bound of the IoTDB when you select | 
| upper_bound | Long | No | - | The upper_bound of the IoTDB when you select | 
| num_partitions | Integer | No | - | The num_partitions of the IoTDB when you select | 
| default_thrift_buffer_size | Integer | No | - | The thrift_default_buffer_size of the IoTDB when you select | 
| max_thrift_frame_size | Integer | No | - | The thrift max frame size | 
| enable_cache_leader | Boolean | No | - | Enable_cache_leader of the IoTDB when you select | 
| common-options | no | - | Source plugin common parameters, please refer to Source Common Options for details | 
We can use time column as a partition key in SQL queries.
num_partitions [int]
the number of partitions
upper_bound [long]
the upper bound of the time range
lower_bound [long]
the lower bound of the time range
     split the time range into numPartitions parts
     if numPartitions = 1, the whole time range will be used
     if numPartitions < (upper_bound - lower_bound), will use (upper_bound - lower_bound) as numPartitions
     
     eg: lower_bound = 1, upper_bound = 10, numPartitions = 2
     sql = "select * from test where age > 0 and age < 10"
     
     split result:
     split 1: select * from test  where (time >= 1 and time < 6)  and (  age > 0 and age < 10 )
     split 2: select * from test  where (time >= 6 and time < 11) and (  age > 0 and age < 10 )
Examples
Example 1: Read data from IoTDB-tree
env {
  parallelism = 2
  job.mode = "BATCH"
}
source {
  IoTDB {
    node_urls = ["localhost:6667"]
    username = "root"
    password = "root"
    sql = "SELECT temperature, moisture, c_int, c_bigint, c_float, c_double, c_string, c_boolean FROM root.test_group.* WHERE time < 4102329600000 align by device"
    schema {
      fields {
        ts = timestamp
        device_name = string
        temperature = float
        moisture = bigint
        c_int = int
        c_bigint = bigint
        c_float = float
        c_double = double
        c_string = string
        c_boolean = boolean
      }
    }
  }
}
sink {
  Console {
  }
}
The data format from upstream IoTDB is as follows:
IoTDB> SELECT temperature, moisture, c_int, c_bigint, c_float, c_double, c_string, c_boolean FROM root.test_group.* WHERE time < 4102329600000 align by device;
+------------------------+------------------------+--------------+-----------+--------+--------------+----------+---------+---------+----------+
|                    Time|                  Device|   temperature|   moisture|   c_int|      c_bigint|   c_float| c_double| c_string| c_boolean|
+------------------------+------------------------+--------------+-----------+--------+--------------+----------+---------+---------+----------+
|2022-09-25T00:00:00.001Z|root.test_group.device_a|          36.1|        100|       1|   21474836470|      1.0f|     1.0d|      abc|      true|
|2022-09-25T00:00:00.001Z|root.test_group.device_b|          36.2|        101|       2|   21474836470|      2.0f|     2.0d|      abc|      true|
|2022-09-25T00:00:00.001Z|root.test_group.device_c|          36.3|        102|       3|   21474836470|      3.0f|     3.0d|      abc|      true|
+------------------------+------------------------+--------------+-----------+--------+--------------+----------+---------+---------+----------+
The data format loaded to SeaTunnelRow is as follows:
| ts | device_name | temperature | moisture | c_int | c_bigint | c_float | c_double | c_string | c_boolean | 
|---|---|---|---|---|---|---|---|---|---|
| 1664035200001 | root.test_group.device_a | 36.1 | 100 | 1 | 21474836470 | 1.0f | 1.0d | abc | true | 
| 1664035200001 | root.test_group.device_b | 36.2 | 101 | 2 | 21474836470 | 2.0f | 2.0d | abc | true | 
| 1664035200001 | root.test_group.device_c | 36.3 | 102 | 3 | 21474836470 | 3.0f | 3.0d | abc | true | 
Example 2:Read data from IoTDB-table
env {
  parallelism = 2
  job.mode = "BATCH"
}
source {
  IoTDB {
    node_urls = ["localhost:6667"]
    username = "root"
    password = "root"
    sql_dialect = "table"
    database = "test_database"
    sql = "SELECT time, sn, type, bidprice, bidsize, domain, buyno, askprice FROM test_table"
    schema {
      fields {
        ts = timestamp
        sn = string
        type = string
        bidprice = int
        bidsize = double
        domain = boolean
        buyno = bigint
        askprice = string
      }
    }
  }
}
sink {
  Console {
  }
}
If database is specified in SQL query, the
databaseoption is not required.
The data format from upstream IoTDB is as follows:
IoTDB> SELECT time, sn, type, bidprice, bidsize, domain, buyno, askprice FROM test_table
+-----------------------------+------+----+--------+------------------+------+-----+-----------+
|                         time|    sn|type|bidprice|           bidsize|domain|buyno|   askprice|
+-----------------------------+------+----+--------+------------------+------+-----+-----------+
|2025-07-30T17:52:34.851+08:00|0700HK|  L1|       9|10.323907796459721|  true|   10|-1064754527|
|2025-07-30T17:52:34.951+08:00|0700HK|  L1|      10| 9.844574317657585| false|    9|-1088662576|
|2025-07-30T17:52:35.051+08:00|0700HK|  L1|       9| 9.272974132434069|  true|    9|  402003616|
+-----------------------------+------+----+--------+------------------+------+-----+-----------+
The data format loaded to SeaTunnelRow is as follows:
| ts | sn | type | bidprice | bidsize | domain | buyno | askprice | 
|---|---|---|---|---|---|---|---|
| 2025-07-30T17:52:34.851 | 0700HK | L1 | 9 | 10.323907796459721 | true | 10 | -1064754527 | 
| 2025-07-30T17:52:34.951 | 0700HK | L1 | 10 | 9.844574317657585 | false | 9 | -1088662576 | 
| 2025-07-30T17:52:35.051 | 0700HK | L1 | 9 | 9.272974132434069 | true | 9 | 402003616 | 
Changelog
Change Log
| Change | Commit | Version | 
|---|---|---|
| [Feature][Checkpoint] Add check script for source/sink state class serialVersionUID missing (#9118) | https://github.com/apache/seatunnel/commit/4f5adeb1c7 | 2.3.11 | 
| [improve] iotdb options (#8965) | https://github.com/apache/seatunnel/commit/6e073935f4 | 2.3.10 | 
| [Improve] restruct connector common options (#8634) | https://github.com/apache/seatunnel/commit/f3499a6eeb | 2.3.10 | 
| [Improve][dist]add shade check rule (#8136) | https://github.com/apache/seatunnel/commit/51ef800016 | 2.3.9 | 
| [Feature][Restapi] Allow metrics information to be associated to logical plan nodes (#7786) | https://github.com/apache/seatunnel/commit/6b7c53d03c | 2.3.9 | 
| [Improve][Common] Introduce new error define rule (#5793) | https://github.com/apache/seatunnel/commit/9d1b2582b2 | 2.3.4 | 
[Improve] Remove use SeaTunnelSink::getConsumedType method and mark it as deprecated (#5755) | https://github.com/apache/seatunnel/commit/8de7408100 | 2.3.4 | 
| Support config column/primaryKey/constraintKey in schema (#5564) | https://github.com/apache/seatunnel/commit/eac76b4e50 | 2.3.4 | 
| [Doc] update iotdb document (#5404) | https://github.com/apache/seatunnel/commit/856aedb3c9 | 2.3.4 | 
| [Improve][Connector-V2] Remove scheduler in IoTDB sink (#5270) | https://github.com/apache/seatunnel/commit/299637868c | 2.3.4 | 
| [Hotfix] Fix com.google.common.base.Preconditions to seatunnel shade one (#5284) | https://github.com/apache/seatunnel/commit/ed5eadcf73 | 2.3.3 | 
| Merge branch 'dev' into merge/cdc | https://github.com/apache/seatunnel/commit/4324ee1912 | 2.3.1 | 
| [Improve][Project] Code format with spotless plugin. | https://github.com/apache/seatunnel/commit/423b583038 | 2.3.1 | 
| [improve][api] Refactoring schema parse (#4157) | https://github.com/apache/seatunnel/commit/b2f573a13e | 2.3.1 | 
| [Improve][build] Give the maven module a human readable name (#4114) | https://github.com/apache/seatunnel/commit/d7cd601051 | 2.3.1 | 
| [Improve][Project] Code format with spotless plugin. (#4101) | https://github.com/apache/seatunnel/commit/a2ab166561 | 2.3.1 | 
| [Improve][SourceConnector] Unified schema parameter, update IoTDB sou… (#3896) | https://github.com/apache/seatunnel/commit/a0959c5fd1 | 2.3.1 | 
| [Feature][Connector] add get source method to all source connector (#3846) | https://github.com/apache/seatunnel/commit/417178fb84 | 2.3.1 | 
| [Feature][API & Connector & Doc] add parallelism and column projection interface (#3829) | https://github.com/apache/seatunnel/commit/b9164b8ba1 | 2.3.1 | 
| [Hotfix][OptionRule] Fix option rule about all connectors (#3592) | https://github.com/apache/seatunnel/commit/226dc6a119 | 2.3.0 | 
| [Improve][Connector-V2][Iotdb] Unified exception for iotdb source & sink connector (#3557) | https://github.com/apache/seatunnel/commit/7353fed6d6 | 2.3.0 | 
| [Feature][Connector V2] expose configurable options in IoTDB (#3387) | https://github.com/apache/seatunnel/commit/06359ea76a | 2.3.0 | 
| [Improve][Connector-V2][IotDB]Add IotDB sink parameter check (#3412) | https://github.com/apache/seatunnel/commit/91240a3dcb | 2.3.0 | 
| [Bug][Connector-v2] Fix IoTDB connector sink NPE (#3080) | https://github.com/apache/seatunnel/commit/e5edf02433 | 2.3.0-beta | 
| [Imporve][Connector-V2] Imporve iotdb connector (#2917) | https://github.com/apache/seatunnel/commit/3da11ce19b | 2.3.0-beta | 
| [DEV][Api] Replace SeaTunnelContext with JobContext and remove singleton pattern (#2706) | https://github.com/apache/seatunnel/commit/cbf82f755c | 2.2.0-beta | 
| [#2606]Dependency management split (#2630) | https://github.com/apache/seatunnel/commit/fc047be69b | 2.2.0-beta | 
| [chore][connector-common] Rename SeatunnelSchema to SeaTunnelSchema (#2538) | https://github.com/apache/seatunnel/commit/7dc2a27388 | 2.2.0-beta | 
| [Connectors-V2]Support IoTDB Source (#2431) | https://github.com/apache/seatunnel/commit/7b78d6c922 | 2.2.0-beta | 
| [Feature][Connector-V2] Support IoTDB sink (#2407) | https://github.com/apache/seatunnel/commit/c1bbbd59d5 | 2.2.0-beta |