Skip to main content
Version: 2.3.10

StarRocks

StarRocks sink connector

Support These Engines

Spark
Flink
SeaTunnel Zeta

Key Features

Description

Used to send data to StarRocks. Both support streaming and batch mode. The internal implementation of StarRocks sink connector is cached and imported by stream load in batches.

Using Dependency

  1. You need to ensure that the jdbc driver jar package has been placed in directory ${SEATUNNEL_HOME}/plugins/.

For SeaTunnel Zeta Engine

  1. You need to ensure that the jdbc driver jar package has been placed in directory ${SEATUNNEL_HOME}/lib/.

Sink Options

NameTypeRequiredDefaultDescription
nodeUrlslistyes-StarRocks cluster address, the format is ["fe_ip:fe_http_port", ...]
base-urlstringyes-The JDBC URL like jdbc:mysql://localhost:9030/ or jdbc:mysql://localhost:9030 or jdbc:mysql://localhost:9030/db
usernamestringyes-StarRocks user username
passwordstringyes-StarRocks user password
databasestringyes-The name of StarRocks database
tablestringno-The name of StarRocks table, If not set, the table name will be the name of the upstream table
labelPrefixstringno-The prefix of StarRocks stream load label
batch_max_rowslongno1024For batch writing, when the number of buffers reaches the number of batch_max_rows or the byte size of batch_max_bytes or the time reaches checkpoint.interval, the data will be flushed into the StarRocks
batch_max_bytesintno5 1024 1024For batch writing, when the number of buffers reaches the number of batch_max_rows or the byte size of batch_max_bytes or the time reaches checkpoint.interval, the data will be flushed into the StarRocks
max_retriesintno-The number of retries to flush failed
retry_backoff_multiplier_msintno-Using as a multiplier for generating the next delay for backoff
max_retry_backoff_msintno-The amount of time to wait before attempting to retry a request to StarRocks
enable_upsert_deletebooleannofalseWhether to enable upsert/delete, only supports PrimaryKey model.
save_mode_create_templatestringnosee belowsee below
starrocks.configmapno-The parameter of the stream load data_desc
http_socket_timeout_msintno180000Set http socket timeout, default is 3 minutes.
schema_save_modeEnumnoCREATE_SCHEMA_WHEN_NOT_EXISTBefore the synchronous task is turned on, different treatment schemes are selected for the existing surface structure of the target side.
data_save_modeEnumnoAPPEND_DATABefore the synchronous task is turned on, different processing schemes are selected for data existing data on the target side.
custom_sqlStringno-When data_save_mode selects CUSTOM_PROCESSING, you should fill in the CUSTOM_SQL parameter. This parameter usually fills in a SQL that can be executed. SQL will be executed before synchronization tasks.

save_mode_create_template

We use templates to automatically create starrocks tables, which will create corresponding table creation statements based on the type of upstream data and schema type, and the default template can be modified according to the situation. Only work on multi-table mode at now.

Default template:

CREATE TABLE IF NOT EXISTS `${database}`.`${table}` (
${rowtype_primary_key},
${rowtype_fields}
) ENGINE=OLAP
PRIMARY KEY (${rowtype_primary_key})
COMMENT '${comment}'
DISTRIBUTED BY HASH (${rowtype_primary_key})PROPERTIES (
"replication_num" = "1"
)

If a custom field is filled in the template, such as adding an id field

CREATE TABLE IF NOT EXISTS `${database}`.`${table}`
(
id,
${rowtype_fields}
) ENGINE = OLAP
COMMENT '${comment}'
DISTRIBUTED BY HASH (${rowtype_primary_key})
PROPERTIES
(
"replication_num" = "1"
);

The connector will automatically obtain the corresponding type from the upstream to complete the filling, and remove the id field from rowtype_fields. This method can be used to customize the modification of field types and attributes.

You can use the following placeholders

  • database: Used to get the database in the upstream schema
  • table_name: Used to get the table name in the upstream schema
  • rowtype_fields: Used to get all the fields in the upstream schema, we will automatically map to the field description of StarRocks
  • rowtype_primary_key: Used to get the primary key in the upstream schema (maybe a list)
  • rowtype_unique_key: Used to get the unique key in the upstream schema (maybe a list)
  • comment: Used to get the table comment in the upstream schema

table [string]

Use database and this table-name auto-generate sql and receive upstream input datas write to database.

This option is mutually exclusive with query and has a higher priority.

The table parameter can fill in the name of an unwilling table, which will eventually be used as the table name of the creation table, and supports variables (${table_name}, ${schema_name}). Replacement rules: ${schema_name} will replace the SCHEMA name passed to the target side, and ${table_name} will replace the name of the table passed to the table at the target side.

for example:

  1. test${schema_name}${table_name}_test
  2. sink_sinktable
  3. ss_${table_name}

schema_save_mode[Enum]

Before the synchronous task is turned on, different treatment schemes are selected for the existing surface structure of the target side.
Option introduction:
RECREATE_SCHEMA :Will create when the table does not exist, delete and rebuild when the table is saved
CREATE_SCHEMA_WHEN_NOT_EXIST :Will Created when the table does not exist, skipped when the table is saved
ERROR_WHEN_SCHEMA_NOT_EXIST :Error will be reported when the table does not exist
IGNORE :Ignore the treatment of the table

data_save_mode[Enum]

Before the synchronous task is turned on, different processing schemes are selected for data existing data on the target side.
Option introduction:
DROP_DATA: Preserve database structure and delete data
APPEND_DATA:Preserve database structure, preserve data
CUSTOM_PROCESSING:User defined processing
ERROR_WHEN_DATA_EXISTS:When there is data, an error is reported

custom_sql[String]

When data_save_mode selects CUSTOM_PROCESSING, you should fill in the CUSTOM_SQL parameter. This parameter usually fills in a SQL that can be executed. SQL will be executed before synchronization tasks.

Data Type Mapping

StarRocks Data typeSeaTunnel Data type
BOOLEANBOOLEAN
TINYINTTINYINT
SMALLINTSMALLINT
INTINT
BIGINTBIGINT
FLOATFLOAT
DOUBLEDOUBLE
DECIMALDECIMAL
DATESTRING
TIMESTRING
DATETIMESTRING
STRINGSTRING
ARRAYSTRING
MAPSTRING
BYTESSTRING

Supported import data formats

The supported formats include CSV and JSON

Task Example

Simple:

The following example describes writing multiple data types to StarRocks, and users need to create corresponding tables downstream

env {
parallelism = 1
job.mode = "BATCH"
checkpoint.interval = 10000
}

source {
FakeSource {
row.num = 10
map.size = 10
array.size = 10
bytes.length = 10
string.length = 10
schema = {
fields {
c_map = "map<string, array<int>>"
c_array = "array<int>"
c_string = string
c_boolean = boolean
c_tinyint = tinyint
c_smallint = smallint
c_int = int
c_bigint = bigint
c_float = float
c_double = double
c_decimal = "decimal(16, 1)"
c_null = "null"
c_bytes = bytes
c_date = date
c_timestamp = timestamp
}
}
}
}

sink {
StarRocks {
nodeUrls = ["e2e_starRocksdb:8030"]
username = root
password = ""
database = "test"
table = "e2e_table_sink"
batch_max_rows = 10
starrocks.config = {
format = "JSON"
strip_outer_array = true
}
}
}

Support write cdc changelog event(INSERT/UPDATE/DELETE)

sink {
StarRocks {
nodeUrls = ["e2e_starRocksdb:8030"]
username = root
password = ""
database = "test"
table = "e2e_table_sink"
...

// Support upsert/delete event synchronization (enable_upsert_delete=true), only supports PrimaryKey model.
enable_upsert_delete = true
}
}

Use JSON format to import data

sink {
StarRocks {
nodeUrls = ["e2e_starRocksdb:8030"]
base-url = "jdbc:mysql://e2e_starRocksdb:9030/"
username = root
password = ""
database = "test"
table = "e2e_table_sink"
batch_max_rows = 10
starrocks.config = {
format = "JSON"
strip_outer_array = true
}
}
}

Use CSV format to import data

sink {
StarRocks {
nodeUrls = ["e2e_starRocksdb:8030"]
base-url = "jdbc:mysql://e2e_starRocksdb:9030/"
username = root
password = ""
database = "test"
table = "e2e_table_sink"
batch_max_rows = 10
starrocks.config = {
format = "CSV"
column_separator = "\\x01"
row_delimiter = "\\x02"
}
}
}

Use save_mode function

sink {
StarRocks {
nodeUrls = ["e2e_starRocksdb:8030"]
base-url = "jdbc:mysql://e2e_starRocksdb:9030/"
username = root
password = ""
database = "test"
table = "test_${schema_name}_${table_name}"
schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"
data_save_mode="APPEND_DATA"
batch_max_rows = 10
starrocks.config = {
format = "CSV"
column_separator = "\\x01"
row_delimiter = "\\x02"
}
}
}

Multiple table

example1

env {
parallelism = 1
job.mode = "STREAMING"
checkpoint.interval = 5000
}

source {
Mysql-CDC {
base-url = "jdbc:mysql://127.0.0.1:3306/seatunnel"
username = "root"
password = "******"

table-names = ["seatunnel.role","seatunnel.user","galileo.Bucket"]
}
}

transform {
}

sink {
StarRocks {
nodeUrls = ["e2e_starRocksdb:8030"]
base-url = "jdbc:mysql://e2e_starRocksdb:9030/"
username = root
password = ""
database = "${database_name}_test"
table = "${table_name}_test"
...

// Support upsert/delete event synchronization (enable_upsert_delete=true), only supports PrimaryKey model.
enable_upsert_delete = true
}
}

example2

env {
parallelism = 1
job.mode = "BATCH"
}

source {
Jdbc {
driver = oracle.jdbc.driver.OracleDriver
url = "jdbc:oracle:thin:@localhost:1521/XE"
user = testUser
password = testPassword

table_list = [
{
table_path = "TESTSCHEMA.TABLE_1"
},
{
table_path = "TESTSCHEMA.TABLE_2"
}
]
}
}

transform {
}

sink {
StarRocks {
nodeUrls = ["e2e_starRocksdb:8030"]
base-url = "jdbc:mysql://e2e_starRocksdb:9030/"
username = root
password = ""
database = "${schema_name}_test"
table = "${table_name}_test"
...

// Support upsert/delete event synchronization (enable_upsert_delete=true), only supports PrimaryKey model.
enable_upsert_delete = true
}
}

Changelog

Change Log
ChangeCommitVersion
[Fix][Connector-V2] Fix StarRocksCatalogTest#testCatalog() NPE (#8987)https://github.com/apache/seatunnel/commit/53f0a9eb52.3.10
[Improve][Connector-V2] Random pick the starrocks fe address which can be connected (#8898)https://github.com/apache/seatunnel/commit/bef76078f2.3.10
[Feature][Connector-v2] Support multi starrocks source (#8789)https://github.com/apache/seatunnel/commit/26b5529aa2.3.10
[Fix][Connector-V2] Fix possible data loss in scenarios of request_tablet_size is less than the number of BUCKETS (#8768)https://github.com/apache/seatunnel/commit/3c6f216132.3.10
[Fix][Connector-V2]Fix Descriptions for CUSTOM_SQL in Connector (#8778)https://github.com/apache/seatunnel/commit/96b610eb72.3.10
[Improve] restruct connector common options (#8634)https://github.com/apache/seatunnel/commit/f3499a6ee2.3.10
[improve] add StarRocks options (#8639)https://github.com/apache/seatunnel/commit/da8d9cbd32.3.10
[Fix][Connector-V2] fix starRocks automatically creates tables with comment (#8568)https://github.com/apache/seatunnel/commit/c4cb1fc4a2.3.10
[Fix][Connector-V2] Fixed adding table comments (#8514)https://github.com/apache/seatunnel/commit/edca75b0d2.3.10
[Feature][Connector-V2] Starrocks implements multi table sink (#8467)https://github.com/apache/seatunnel/commit/55eebfa8a2.3.9
[Improve][Connector-V2] Add pre-check starrocks version before exeucte alter table field name (#8237)https://github.com/apache/seatunnel/commit/c24e3b12b2.3.9
[Fix][Connector-starrocks] Fix drop column bug for starrocks (#8216)https://github.com/apache/seatunnel/commit/082814da12.3.9
[Feature][Core] Support read arrow data (#8137)https://github.com/apache/seatunnel/commit/4710ea0f82.3.9
[Feature][Clickhouse] Support sink savemode (#8086)https://github.com/apache/seatunnel/commit/e6f92fd792.3.9
[Feature][Connector-V2] StarRocks-sink support schema evolution (#8082)https://github.com/apache/seatunnel/commit/d33b0da8a2.3.9
[Improve][dist]add shade check rule (#8136)https://github.com/apache/seatunnel/commit/51ef800012.3.9
[Improve][Connector-V2] Add doris/starrocks create table with comment (#7847)https://github.com/apache/seatunnel/commit/207b8c16f2.3.9
[Feature][Restapi] Allow metrics information to be associated to logical plan nodes (#7786)https://github.com/apache/seatunnel/commit/6b7c53d032.3.9
[Improve][API] Move catalog open to SaveModeHandler (#7439)https://github.com/apache/seatunnel/commit/8c2c5c79a2.3.8
[Improve][Connector-V2] Reuse connection in StarRocksCatalog (#7342)https://github.com/apache/seatunnel/commit/8ee129d202.3.8
[Improve][Connector-V2] Remove system table limit (#7391)https://github.com/apache/seatunnel/commit/adf888e002.3.8
[Improve][Connector-V2] Close all ResultSet after used (#7389)https://github.com/apache/seatunnel/commit/853e973212.3.8
[Feature][Core] Support using upstream table placeholders in sink options and auto replacement (#7131)https://github.com/apache/seatunnel/commit/c4ca741222.3.6
[Fix][Connector-V2] Fix starrocks Content-Length header already present error (#7034)https://github.com/apache/seatunnel/commit/a485a74ef2.3.6
[Feature][Connector-V2]Support StarRocks Fe Node HAhttps://github.com/apache/seatunnel/commit/9c36c45812.3.6
[Fix][Connector-v2] Fix the sql statement error of create table for doris and starrocks (#6679)https://github.com/apache/seatunnel/commit/88263cd692.3.6
[Fix][StarRocks] Fix NPE when upstream catalogtable table path only have table name part (#6540)https://github.com/apache/seatunnel/commit/5795b265c2.3.5
[Fix][Connector-V2] Fixed doris/starrocks create table sql parse error (#6580)https://github.com/apache/seatunnel/commit/f2ed1fbde2.3.5
[Fix][Connector-V2] Fix connector support SPI but without no args constructor (#6551)https://github.com/apache/seatunnel/commit/5f3c9c36a2.3.5
[Improve] Add SaveMode log of process detail (#6375)https://github.com/apache/seatunnel/commit/b0d70ce222.3.5
[Improve][Connector-V2] Support TableSourceFactory on StarRocks (#6498)https://github.com/apache/seatunnel/commit/aded562992.3.5
[Improve] StarRocksSourceReader use the existing client (#6480)https://github.com/apache/seatunnel/commit/1a02c571a2.3.5
[Improve][API] Unify type system api(data & type) (#5872)https://github.com/apache/seatunnel/commit/b38c7edcc2.3.5
[Feature][Connector] add starrocks save_mode (#6029)https://github.com/apache/seatunnel/commit/66b0f1e1d2.3.4
[Feature] Add unsupported datatype check for all catalog (#5890)https://github.com/apache/seatunnel/commit/b9791285a2.3.4
[Improve] StarRocks support create table template with unique key (#5905)https://github.com/apache/seatunnel/commit/25b01125e2.3.4
[Improve][StarRocksSink] add http socket timeout. (#5918)https://github.com/apache/seatunnel/commit/febdb262b2.3.4
[Improve] Support create varchar field type in StarRocks (#5911)https://github.com/apache/seatunnel/commit/6025895162.3.4
[Improve]Change System.out.println to log output. (#5912)https://github.com/apache/seatunnel/commit/bbedb07a92.3.4
[Improve][Common] Introduce new error define rule (#5793)https://github.com/apache/seatunnel/commit/9d1b2582b2.3.4
[Improve] Remove use SeaTunnelSink::getConsumedType method and mark it as deprecated (#5755)https://github.com/apache/seatunnel/commit/8de7408102.3.4
[Improve][Connector] Add field name to DataTypeConvertor to improve error message (#5782)https://github.com/apache/seatunnel/commit/ab60790f02.3.4
[feature][connector-jdbc]Add Save Mode function and Connector-JDBC (MySQL) connector has been realized (#5663)https://github.com/apache/seatunnel/commit/eff17ccbe2.3.4
[Improve] Add default implement for SeaTunnelSink::setTypeInfo (#5682)https://github.com/apache/seatunnel/commit/86cba87452.3.4
Support config column/primaryKey/constraintKey in schema (#5564)https://github.com/apache/seatunnel/commit/eac76b4e52.3.4
[Improve] Refactor CatalogTable and add SeaTunnelSource::getProducedCatalogTables (#5562)https://github.com/apache/seatunnel/commit/41173357f2.3.4
[Hotfix][Connector-V2][StarRocks] fix starrocks template sql parser #5071 (#5332)https://github.com/apache/seatunnel/commit/23d79b0d12.3.4
[Improve][Connector-V2] Remove scheduler in StarRocks sink (#5269)https://github.com/apache/seatunnel/commit/cb7b794912.3.4
[Improve][CheckStyle] Remove useless 'SuppressWarnings' annotation of checkstyle. (#5260)https://github.com/apache/seatunnel/commit/51c0d709b2.3.4
[Hotfix] Fix com.google.common.base.Preconditions to seatunnel shade one (#5284)https://github.com/apache/seatunnel/commit/ed5eadcf72.3.3
Fix StarRocksJsonSerializer will transform array/map/row to string (#5281)https://github.com/apache/seatunnel/commit/f941953772.3.3
[Improve] Improve savemode api (#4767)https://github.com/apache/seatunnel/commit/4acd370d42.3.3
[Improve][Connector-V2] Improve StarRocks Auto Create Table To Support Use Primary Key Template In Field (#4487)https://github.com/apache/seatunnel/commit/e601cd4c32.3.2
Revert "[Improve][Catalog] refactor catalog (#4540)" (#4628)https://github.com/apache/seatunnel/commit/2d19331952.3.2
[hotfix][starrocks] fix error on get starrocks source typeInfo (#4619)https://github.com/apache/seatunnel/commit/f7b094f9e2.3.2
[Improve][Catalog] refactor catalog (#4540)https://github.com/apache/seatunnel/commit/b0a701cb82.3.2
[Improve][Connector-V2] Throw StarRocks Serialize Error To Client (#4484)https://github.com/apache/seatunnel/commit/e2c1073232.3.2
[Improve][Connector-V2] Improve StarRocks Serialize Error Message (#4458)https://github.com/apache/seatunnel/commit/465e75cbf2.3.2
[Hotfix][Zeta] Adapt StarRocks With Multi-Table And Single-Table Mode (#4324)https://github.com/apache/seatunnel/commit/c11c171d32.3.1
[improve][zeta] fix zeta bugshttps://github.com/apache/seatunnel/commit/3a82e8b392.3.1
[Improve][Zeta] Improve Client Job Info Messagehttps://github.com/apache/seatunnel/commit/56febf0112.3.1
[Fix][Connector-V2] Fix StarRocksSink Without Format Field In Headerhttps://github.com/apache/seatunnel/commit/463ae64372.3.1
[Improve] Support StarRocksCatalog Use JDBC URL With Custom Suffixhttps://github.com/apache/seatunnel/commit/d00ced6ec2.3.1
[Improve] Support MySqlCatalog Use JDBC URL With Custom Suffixhttps://github.com/apache/seatunnel/commit/210d0ff1f2.3.1
[Improve] Change StarRocks Sink Default Format To Jsonhttps://github.com/apache/seatunnel/commit/8703357832.3.1
[Fix] Fix StarRocks Default Url Can't Usehttps://github.com/apache/seatunnel/commit/67c45d3532.3.1
[hotfix] fixed schema options import errorhttps://github.com/apache/seatunnel/commit/656805f2d2.3.1
[chore] Code format with spotless plugin.https://github.com/apache/seatunnel/commit/291214ad62.3.1
Merge branch 'dev' into merge/cdchttps://github.com/apache/seatunnel/commit/4324ee1912.3.1
[Improve][Project] Code format with spotless plugin.https://github.com/apache/seatunnel/commit/423b583032.3.1
[Fix] Fix StarRocks Default Url Can't Use (#4229)https://github.com/apache/seatunnel/commit/ed74d11092.3.1
[Bug] Remove StarRocks Auto Creat Table Default Value (#4220)https://github.com/apache/seatunnel/commit/80b5cd40a2.3.1
[Feature] Add SaveMode For StarRocks (#4217)https://github.com/apache/seatunnel/commit/0674f10a52.3.1
[Improve] Improve StarRocks Catalog Base Url (#4215)https://github.com/apache/seatunnel/commit/6632a40472.3.1
[Improve] Improve StarRocks Sink Config (#4212)https://github.com/apache/seatunnel/commit/8d5712c1d2.3.1
[Hotfix][Zeta] keep deleteCheckpoint method synchronized (#4209)https://github.com/apache/seatunnel/commit/061f9b5872.3.1
[Improve] Improve StarRocks Auto Create Table (#4208)https://github.com/apache/seatunnel/commit/bc9cd6bf62.3.1
[hotfix][zeta] fix zeta multi-table parser error (#4193)https://github.com/apache/seatunnel/commit/98f2ad0c12.3.1
[feature][starrocks] add StarRocks factories (#4191)https://github.com/apache/seatunnel/commit/c485d887e2.3.1
[Feature] Change StarRocks CreatTable Template (#4184)https://github.com/apache/seatunnel/commit/4cf07f3be2.3.1
[Feature][Connector-V2] StarRocks source connector (#3679)https://github.com/apache/seatunnel/commit/9681173b12.3.1
[Improve][Connector-V2] [StarRocks] Starrocks Support Auto Create Table (#4177)https://github.com/apache/seatunnel/commit/7e0008e6f2.3.1
[Improve][build] Give the maven module a human readable name (#4114)https://github.com/apache/seatunnel/commit/d7cd601052.3.1
[Improve][Project] Code format with spotless plugin. (#4101)https://github.com/apache/seatunnel/commit/a2ab166562.3.1
[Feature][Connector-v2][StarRocks] Support write cdc changelog event(INSERT/UPDATE/DELETE) (#3865)https://github.com/apache/seatunnel/commit/8e3d158c02.3.1
[Improve][Connector-V2] Change Connector Custom Config Prefix To Map (#3719)https://github.com/apache/seatunnel/commit/ef1b8b1bb2.3.1
[Improve][Connector-V2][StarRocks] Unified exception for StarRocks source and sink (#3593)https://github.com/apache/seatunnel/commit/612d0297a2.3.0
[Improve][Connector-V2][StarRocks] Delete the Mapper may not be used (#3579)https://github.com/apache/seatunnel/commit/1e868ecf22.3.0
[Hotfix][OptionRule] Fix option rule about all connectors (#3592)https://github.com/apache/seatunnel/commit/226dc6a112.3.0
[Improve][Connector-V2][StarRocks]Add StarRocks connector option rules (#3402)https://github.com/apache/seatunnel/commit/5d187f69b2.3.0
[Bugfix][Connector-V2][StarRocks]Fix StarRocks StreamLoad retry bug and fix doc (#3406)https://github.com/apache/seatunnel/commit/071f9aa052.3.0
[Feature][Connector-V2] Starrocks sink connector (#3164)https://github.com/apache/seatunnel/commit/3e6caf7052.3.0