Doris
Doris sink 连接器
支持的doris版本
- exactly-once & cdc 支持  Doris version is >= 1.1.x
- 支持数组数据类型 Doris version is >= 1.2.x
- 将支持Map数据类型 Doris version is 2.x
引擎支持
Spark
Flink
SeaTunnel Zeta
主要特性
描述
用于发送数据到doris. 同时支持流模式和批模式处理. Doris Sink连接器的内部实现是通过stream load批量缓存和导入的。
依赖
对于 Spark/Flink
- 你需要下载 jdbc driver jar package 并添加到目录
${SEATUNNEL_HOME}/plugins/.
对于 SeaTunnel Zeta
- 你需要下载 jdbc driver jar package 并添加到目录
${SEATUNNEL_HOME}/lib/.
Sink 选项
| Name | Type | Required | Default | Description | 
|---|---|---|---|---|
| fenodes | String | Yes | - | Doris集群 fenodes 地址, 格式是"fe_ip:fe_http_port, ..." | 
| query-port | int | No | 9030 | DorisFenodes mysql协议查询端口 | 
| username | String | Yes | - | Doris用户名 | 
| password | String | Yes | - | Doris密码 | 
| database | String | Yes | - | Doris数据库名称 , 使用${database_name}表示上游数据库名称。 | 
| table | String | Yes | - | Doris表名,  使用${table_name}表示上游表名。 | 
| table.identifier | String | Yes | - | Doris表的名称,2.3.5 版本后将弃用,请使用database和table代替。 | 
| sink.label-prefix | String | Yes | - | stream load导入使用的标签前缀。 在2pc场景下,需要全局唯一性来保证SeaTunnel的EOS语义。 | 
| sink.enable-2pc | bool | No | false | 是否启用两阶段提交(2pc),默认为 false。 对于两阶段提交,请参考此处。 | 
| sink.enable-delete | bool | No | - | 是否启用删除。 该选项需要Doris表开启批量删除功能(0.15+版本默认开启),且仅支持Unique模型。 您可以在此link获得更多详细信息 | 
| sink.check-interval | int | No | 10000 | 加载过程中检查异常时间间隔。 | 
| sink.max-retries | int | No | 3 | 向数据库写入记录失败时的最大重试次数。 | 
| sink.buffer-size | int | No | 256 * 1024 | 用于缓存stream load数据的缓冲区大小。 | 
| sink.buffer-count | int | No | 3 | 用于缓存stream load数据的缓冲区计数。 | 
| doris.batch.size | int | No | 1024 | 每次http请求写入doris的批量大小,当row达到该大小或者执行checkpoint时,缓存的数据就会写入服务器。 | 
| needs_unsupported_type_casting | boolean | No | false | 是否启用不支持的类型转换,例如 Decimal64 到 Double。 | 
| case_sensitive | boolean | No | true | 是否保留表名和字段名的原始大小写。当设置为 false 时,表名和字段名将被转换为小写。 | 
| schema_save_mode | Enum | no | CREATE_SCHEMA_WHEN_NOT_EXIST | schema保存模式,请参考下面的 schema_save_mode | 
| data_save_mode | Enum | no | APPEND_DATA | 数据保存模式,请参考下面的 data_save_mode。 | 
| save_mode_create_template | string | no | see below | 见下文。 | 
| custom_sql | String | no | - | 当data_save_mode选择CUSTOM_PROCESSING时,需要填写CUSTOM_SQL参数。 该参数通常填写一条可以执行的SQL。 SQL将在同步任务之前执行。 | 
| doris.config | map | yes | - | 该选项用于支持自动生成sql时的insert、delete、update等操作,以及支持的格式。 | 
schema_save_mode [Enum]
在开启同步任务之前,针对现有的表结构选择不同的处理方案。
选项介绍:
RECREATE_SCHEMA :表不存在时创建,表保存时删除并重建。
CREATE_SCHEMA_WHEN_NOT_EXIST :表不存在时会创建,表存在时跳过。
ERROR_WHEN_SCHEMA_NOT_EXIST :表不存在时会报错。
IGNORE :忽略对表的处理。
data_save_mode [Enum]
在开启同步任务之前,针对目标端已有的数据选择不同的处理方案。
选项介绍:
DROP_DATA: 保留数据库结构并删除数据。
APPEND_DATA:保留数据库结构,保留数据。
CUSTOM_PROCESSING:用户自定义处理。
ERROR_WHEN_DATA_EXISTS:有数据时报错。
save_mode_create_template
使用模板自动创建Doris表, 会根据上游数据类型和schema类型创建相应的建表语句, 默认模板可以根据情况进行修改。
默认模板:
CREATE TABLE IF NOT EXISTS `${database}`.`${table_name}` (
${rowtype_primary_key},
${rowtype_fields}
) ENGINE=OLAP
 UNIQUE KEY (${rowtype_primary_key})
COMMENT '${comment}'
DISTRIBUTED BY HASH (${rowtype_primary_key})
 PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
"in_memory" = "false",
"storage_format" = "V2",
"disable_auto_compaction" = "false"
)
如果模板中填写了自定义字段,例如添加 id 字段
CREATE TABLE IF NOT EXISTS `${database}`.`${table_name}`
(   
    id,
    ${rowtype_fields}
) ENGINE = OLAP UNIQUE KEY (${rowtype_primary_key})
    COMMENT '${comment}'
    DISTRIBUTED BY HASH (${rowtype_primary_key})
    PROPERTIES
(
    "replication_num" = "1"
);
连接器会自动从上游获取对应类型完成填充, 并从"rowtype_fields"中删除 id 字段。 该方法可用于自定义字段类型和属性的修改。
可以使用以下占位符:
- database:用于获取上游schema中的数据库。
- table_name:用于获取上游schema中的表名。
- rowtype_fields:用于获取上游schema中的所有字段,自动映射到Doris的字段描述。
- rowtype_primary_key:用于获取上游模式中的主键(可能是列表)。
- rowtype_unique_key:用于获取上游模式中的唯一键(可能是列表)。
- comment:用于获取上游模式中的表注释。
数据类型映射
| Doris 数据类型 | SeaTunnel 数据类型 | 
|---|---|
| BOOLEAN | BOOLEAN | 
| TINYINT | TINYINT | 
| SMALLINT | SMALLINT TINYINT | 
| INT | INT SMALLINT TINYINT | 
| BIGINT | BIGINT INT SMALLINT TINYINT | 
| LARGEINT | BIGINT INT SMALLINT TINYINT | 
| FLOAT | FLOAT | 
| DOUBLE | DOUBLE FLOAT | 
| DECIMAL | DECIMAL DOUBLE FLOAT | 
| DATE | DATE | 
| DATETIME | TIMESTAMP | 
| CHAR | STRING | 
| VARCHAR | STRING | 
| STRING | STRING | 
| ARRAY | ARRAY | 
| MAP | MAP | 
| JSON | STRING | 
| HLL | 尚不支持 | 
| BITMAP | 尚不支持 | 
| QUANTILE_STATE | 尚不支持 | 
| STRUCT | 尚不支持 | 
支持的导入数据格式
支持的格式包括 CSV 和 JSON。
调优指南
适当增加sink.buffer-size和doris.batch.size的值可以提高写性能。
在流模式下,如果doris.batch.size和checkpoint.interval都配置为较大的值,最后到达的数据可能会有较大的延迟(延迟的时间就是检查点间隔的时间)。
这是因为最后到达的数据总量可能不会超过doris.batch.size指定的阈值。因此,在接收到数据的数据量没有超过该阈值之前只有检查点才会触发提交操作。因此,需要选择一个合适的检查点间隔。
此外,如果你通过sink.enable-2pc=true属性启用2pc。sink.buffer-size将会失去作用,只有检查点才能触发提交。
任务示例
简单示例
下面的例子描述了向Doris写入多种数据类型,用户需要在下游创建对应的表。
env {
  parallelism = 1
  job.mode = "BATCH"
  checkpoint.interval = 10000
}
source {
  FakeSource {
    row.num = 10
    map.size = 10
    array.size = 10
    bytes.length = 10
    string.length = 10
    schema = {
      fields {
        c_map = "map<string, array<int>>"
        c_array = "array<int>"
        c_string = string
        c_boolean = boolean
        c_tinyint = tinyint
        c_smallint = smallint
        c_int = int
        c_bigint = bigint
        c_float = float
        c_double = double
        c_decimal = "decimal(16, 1)"
        c_null = "null"
        c_bytes = bytes
        c_date = date
        c_timestamp = timestamp
      }
    }
    }
}
sink {
  Doris {
    fenodes = "doris_cdc_e2e:8030"
    username = root
    password = ""
    database = "test"
    table = "e2e_table_sink"
    sink.label-prefix = "test-cdc"
    sink.enable-2pc = "true"
    sink.enable-delete = "true"
    doris.config {
      format = "json"
      read_json_by_line = "true"
    }
  }
}
CDC(监听数据变更捕获)事件
本示例定义了一个SeaTunnel同步任务,通过FakeSource自动生成数据并发送给Doris Sink,FakeSource使用schema、score(int类型)模拟CDC数据,Doris需要创建一个名为test.e2e_table_sink的sink任务及其对应的表 。
env {
  parallelism = 1
  job.mode = "BATCH"
  checkpoint.interval = 10000
}
source {
  FakeSource {
    schema = {
      fields {
        pk_id = bigint
        name = string
        score = int
        sex = boolean
        number = tinyint
        height = float
        sight = double
        create_time = date
        update_time = timestamp
      }
    }
    rows = [
      {
        kind = INSERT
        fields = [1, "A", 100, true, 1, 170.0, 4.3, "2020-02-02", "2020-02-02T02:02:02"]
      },
      {
        kind = INSERT
        fields = [2, "B", 100, true, 1, 170.0, 4.3, "2020-02-02", "2020-02-02T02:02:02"]
      },
      {
        kind = INSERT
        fields = [3, "C", 100, true, 1, 170.0, 4.3, "2020-02-02", "2020-02-02T02:02:02"]
      },
      {
        kind = UPDATE_BEFORE
        fields = [1, "A", 100, true, 1, 170.0, 4.3, "2020-02-02", "2020-02-02T02:02:02"]
      },
      {
        kind = UPDATE_AFTER
        fields = [1, "A_1", 100, true, 1, 170.0, 4.3, "2020-02-02", "2020-02-02T02:02:02"]
      },
      {
        kind = DELETE
        fields = [2, "B", 100, true, 1, 170.0, 4.3, "2020-02-02", "2020-02-02T02:02:02"]
      }
    ]
  }
}
sink {
  Doris {
    fenodes = "doris_cdc_e2e:8030"
    username = root
    password = ""
    database = "test"
    table = "e2e_table_sink"
    sink.label-prefix = "test-cdc"
    sink.enable-2pc = "true"
    sink.enable-delete = "true"
    doris.config {
      format = "json"
      read_json_by_line = "true"
    }
  }
}
使用JSON格式导入数据
sink {
    Doris {
        fenodes = "e2e_dorisdb:8030"
        username = root
        password = ""
        database = "test"
        table = "e2e_table_sink"
        sink.enable-2pc = "true"
        sink.label-prefix = "test_json"
        doris.config = {
            format="json"
            read_json_by_line="true"
        }
    }
}
使用CSV格式导入数据
sink {
    Doris {
        fenodes = "e2e_dorisdb:8030"
        username = root
        password = ""
        database = "test"
        table = "e2e_table_sink"
        sink.enable-2pc = "true"
        sink.label-prefix = "test_csv"
        doris.config = {
          format = "csv"
          column_separator = ","
        }
    }
}
大小写敏感配置
sink {
    Doris {
        fenodes = "e2e_dorisdb:8030"
        username = root
        password = ""
        database = "Test_DB"  # 保留原始大小写
        table = "Test_Table"  # 保留原始大小写
        case_sensitive = true # 默认值,保留原始大小写
        sink.enable-2pc = "true"
        sink.label-prefix = "test_case_sensitive"
        doris.config = {
          format = "json"
          read_json_by_line = "true"
        }
    }
}
变更日志
Change Log
| Change | Commit | Version | 
|---|---|---|
| [Feature][Core] Add plugin directory support for each connector (#9650) | https://github.com/apache/seatunnel/commit/4beb2b9336 | 2.3.12 | 
| [Improve][API] Optimize the enumerator API semantics and reduce lock calls at the connector level (#9671) | https://github.com/apache/seatunnel/commit/9212a77140 | 2.3.12 | 
| [Fix][Connector-V2] Fix misleading parameter name in DorisStreamLoad (#9685) | https://github.com/apache/seatunnel/commit/16618c8019 | 2.3.12 | 
| [improve]improve FE node failover logging for better observability (#9657) | https://github.com/apache/seatunnel/commit/ebc9ee3915 | 2.3.12 | 
| [Feature][Connector-doris] Adds case insensitivity feature (#9306) | https://github.com/apache/seatunnel/commit/9d1cffa5e1 | 2.3.11 | 
| [Feature][Transform] Support define sink column type (#9114) | https://github.com/apache/seatunnel/commit/ab7119e507 | 2.3.11 | 
| [Feature][Checkpoint] Add check script for source/sink state class serialVersionUID missing (#9118) | https://github.com/apache/seatunnel/commit/4f5adeb1c7 | 2.3.11 | 
| [Improve][connector-doris] Improved doris source enumerator splits allocation algorithm for subtasks (#9108) | https://github.com/apache/seatunnel/commit/5f55e31c29 | 2.3.11 | 
| [Improve] doris options (#8745) | https://github.com/apache/seatunnel/commit/268d76cbf3 | 2.3.10 | 
| [Improve] restruct connector common options (#8634) | https://github.com/apache/seatunnel/commit/f3499a6eeb | 2.3.10 | 
| [Fix][Connector-V2] fix starRocks automatically creates tables with comment (#8568) | https://github.com/apache/seatunnel/commit/c4cb1fc4a3 | 2.3.10 | 
| [Fix][Connector-V2] Fixed adding table comments (#8514) | https://github.com/apache/seatunnel/commit/edca75b0d6 | 2.3.10 | 
| [Fix][Doris] Fix catalog not closed (#8415) | https://github.com/apache/seatunnel/commit/2d1db66b9f | 2.3.9 | 
| [Feature]Connector-V2[Doris]Support sink ddl (#8250) | https://github.com/apache/seatunnel/commit/ecd8269f2e | 2.3.9 | 
| [Feature][Connector-V2]Support Doris Fe Node HA (#8311) | https://github.com/apache/seatunnel/commit/3e86102f47 | 2.3.9 | 
| [Feature][Core] Support read arrow data (#8137) | https://github.com/apache/seatunnel/commit/4710ea0f8d | 2.3.9 | 
| [Feature][Clickhouse] Support sink savemode (#8086) | https://github.com/apache/seatunnel/commit/e6f92fd79b | 2.3.9 | 
| [Improve][dist]add shade check rule (#8136) | https://github.com/apache/seatunnel/commit/51ef800016 | 2.3.9 | 
| [Feature][Doris] Support multi-table source read (#7895) | https://github.com/apache/seatunnel/commit/10c37acb34 | 2.3.9 | 
| [Improve][Connector-V2] Add doris/starrocks create table with comment (#7847) | https://github.com/apache/seatunnel/commit/207b8c16fd | 2.3.9 | 
| [Feature][Restapi] Allow metrics information to be associated to logical plan nodes (#7786) | https://github.com/apache/seatunnel/commit/6b7c53d03c | 2.3.9 | 
| [Fixbug] doris custom sql work (#7464) | https://github.com/apache/seatunnel/commit/5c6a7c6984 | 2.3.8 | 
| [Improve][API] Move catalog open to SaveModeHandler (#7439) | https://github.com/apache/seatunnel/commit/8c2c5c79a1 | 2.3.8 | 
| [Improve][Connector-V2] Close all ResultSet after used (#7389) | https://github.com/apache/seatunnel/commit/853e973212 | 2.3.8 | 
| Revert "[Fix][Connector-V2] Fix doris primary key order and fields order are inconsistent (#7377)" (#7402) | https://github.com/apache/seatunnel/commit/bb72d91770 | 2.3.8 | 
| [Fix][Connector-V2] Fix doris primary key order and fields order are inconsistent (#7377) | https://github.com/apache/seatunnel/commit/464da8fb9b | 2.3.7 | 
| [Bugfix][Doris-connector] Fix Json serialization, null value causes data error problem | https://github.com/apache/seatunnel/commit/7b19df585f | 2.3.7 | 
| [Improve][Connector-V2] Improve doris error msg (#7343) | https://github.com/apache/seatunnel/commit/16950a67cd | 2.3.7 | 
| [Fix][Doris] Fix the abnormality of deleting data in CDC scenario. (#7315) | https://github.com/apache/seatunnel/commit/bb2c912404 | 2.3.7 | 
| fix [Bug] Unable to create a source for identifier 'Iceberg'. #7182 (#7279) | https://github.com/apache/seatunnel/commit/4897491708 | 2.3.7 | 
| [Fix][Connector-V2] Fix doris TRANSFER_ENCODING header error (#7267) | https://github.com/apache/seatunnel/commit/d886495584 | 2.3.6 | 
| [Improve][Doris Connector] Unified serialization method,Use RowToJsonConverter and TextSerializationSchema (#7229) | https://github.com/apache/seatunnel/commit/4b3af9bef4 | 2.3.6 | 
| [Feature][Core] Support using upstream table placeholders in sink options and auto replacement (#7131) | https://github.com/apache/seatunnel/commit/c4ca74122c | 2.3.6 | 
| [Improve][Zeta] Move SaveMode behavior to master (#6843) | https://github.com/apache/seatunnel/commit/80cf91318d | 2.3.6 | 
| [bugFix][Connector-V2][Doris] The multi-FE configuration is supported (#6341) | https://github.com/apache/seatunnel/commit/b6d075194b | 2.3.6 | 
| [Feature][Doris] Add Doris type converter (#6354) | https://github.com/apache/seatunnel/commit/5189991843 | 2.3.6 | 
| [Improve] Improve doris create table template default value (#6720) | https://github.com/apache/seatunnel/commit/bd64740314 | 2.3.6 | 
| [Bug Fix] Sink Doris error status(#6753) (#6755) | https://github.com/apache/seatunnel/commit/0ce2c0f220 | 2.3.6 | 
| [Improve] Improve doris stream load client side error message (#6688) | https://github.com/apache/seatunnel/commit/007a9940e3 | 2.3.6 | 
| [Fix][Connector-v2] Fix the sql statement error of create table for doris and starrocks (#6679) | https://github.com/apache/seatunnel/commit/88263cd69f | 2.3.6 | 
| [Fix][Connector-V2] Fixed doris/starrocks create table sql parse error (#6580) | https://github.com/apache/seatunnel/commit/f2ed1fbde0 | 2.3.5 | 
| [Fix][Connector-V2] Fix doris sink can not be closed when stream load not read any data (#6570) | https://github.com/apache/seatunnel/commit/341615f488 | 2.3.5 | 
| [Fix][Connector-V2] Fix connector support SPI but without no args constructor (#6551) | https://github.com/apache/seatunnel/commit/5f3c9c36a5 | 2.3.5 | 
| [Improve] Add SaveMode log of process detail (#6375) | https://github.com/apache/seatunnel/commit/b0d70ce224 | 2.3.5 | 
| [Feature] Support nanosecond in Doris DateTimeV2 type (#6358) | https://github.com/apache/seatunnel/commit/76967066bf | 2.3.5 | 
| [Fix][Connector-V2] Fix doris source select fields loss primary key information (#6339) | https://github.com/apache/seatunnel/commit/78abe2f202 | 2.3.5 | 
| [Improve][API] Unify type system api(data & type) (#5872) | https://github.com/apache/seatunnel/commit/b38c7edcc9 | 2.3.5 | 
| [Fix] Fix doris stream load failed not reported error (#6315) | https://github.com/apache/seatunnel/commit/a09a5a2bb8 | 2.3.5 | 
| [Improve][Connector-V2] Doris stream load use FE instead of BE (#6235) | https://github.com/apache/seatunnel/commit/0a7acdce95 | 2.3.4 | 
| [Feature][Connector-V2][Doris] Add Doris ConnectorV2 Source (#6161) | https://github.com/apache/seatunnel/commit/fc2d80382a | 2.3.4 | 
| [Improve] Improve doris sink to random use be (#6132) | https://github.com/apache/seatunnel/commit/869417660e | 2.3.4 | 
| [Feature] Support SaveMode on Doris (#6085) | https://github.com/apache/seatunnel/commit/b2375fffe8 | 2.3.4 | 
| [Improve] Add batch flush in doris sink (#6024) | https://github.com/apache/seatunnel/commit/2c5b48e907 | 2.3.4 | 
| [Fix] Fix DorisCatalog not implement namemethod (#5988) | https://github.com/apache/seatunnel/commit/d4a323efef | 2.3.4 | 
| [Feature][Catalog] Doris Catalog (#5175) | https://github.com/apache/seatunnel/commit/1d3e335d8e | 2.3.4 | 
| [Improve][Common] Introduce new error define rule (#5793) | https://github.com/apache/seatunnel/commit/9d1b2582b2 | 2.3.4 | 
| [Improve] Remove use SeaTunnelSink::getConsumedTypemethod and mark it as deprecated (#5755) | https://github.com/apache/seatunnel/commit/8de7408100 | 2.3.4 | 
| [Improve][Connector] Add field name to DataTypeConvertorto improve error message (#5782) | https://github.com/apache/seatunnel/commit/ab60790f0d | 2.3.4 | 
| [Chore] Using try-with-resources to simplify the code. (#4995) | https://github.com/apache/seatunnel/commit/d0aff52425 | 2.3.4 | 
| [Fix] Fix RestService report NullPointerException (#5319) | https://github.com/apache/seatunnel/commit/5d4b319477 | 2.3.4 | 
| [feature][doris] Doris factory type (#5061) | https://github.com/apache/seatunnel/commit/d952cea43c | 2.3.3 | 
| [Bug][connector-v2][doris] add streamload Content-type for doris URLdecode error (#4880) | https://github.com/apache/seatunnel/commit/1b91816021 | 2.3.3 | 
| [Bug][Connector-V2][Doris] update last checkpoint id when doing snapshot (#4881) | https://github.com/apache/seatunnel/commit/0360e7e518 | 2.3.2 | 
| [Improve] Add a jobId to the doris label to distinguish between tasks (#4839) | https://github.com/apache/seatunnel/commit/6672e94077 | 2.3.2 | 
| [BUG][Doris] Add a jobId to the doris label to distinguish between tasks (#4853) | https://github.com/apache/seatunnel/commit/20ee2faecf | 2.3.2 | 
| [Improve][Connector-V2][Doris]Remove serialization code that is no longer used (#4313) | https://github.com/apache/seatunnel/commit/0c0e5f978e | 2.3.1 | 
| [Improve][Connector-V2][Doris] Refactor some Doris Sink code as well as support 2pc and cdc (#4235) | https://github.com/apache/seatunnel/commit/7c4005af85 | 2.3.1 | 
| [Hotfix][Connector][Doris] Fix Content Length header already present (#4277) | https://github.com/apache/seatunnel/commit/df82b77153 | 2.3.1 | 
| [Improve][build] Give the maven module a human readable name (#4114) | https://github.com/apache/seatunnel/commit/d7cd601051 | 2.3.1 | 
| [Improve][Project] Code format with spotless plugin. (#4101) | https://github.com/apache/seatunnel/commit/a2ab166561 | 2.3.1 | 
| [Improve][Connector-V2][Doris] Change Doris Config Prefix (#3856) | https://github.com/apache/seatunnel/commit/16e39a506b | 2.3.1 | 
| [Feature][Connector-V2][Doris] Add Doris StreamLoad sink connector (#3631) | https://github.com/apache/seatunnel/commit/72158be395 | 2.3.0 |