Skip to main content
Version: 2.3.10

Elasticsearch

Description

Output data to Elasticsearch.

Key features

tip

Engine Supported

  • supported ElasticSearch version is >= 2.x and <= 8.x

Options

nametyperequireddefault value
hostsarrayyes-
indexstringyes-
schema_save_modestringyesCREATE_SCHEMA_WHEN_NOT_EXIST
data_save_modestringyesAPPEND_DATA
index_typestringno
primary_keyslistno
key_delimiterstringno_
usernamestringno
passwordstringno
max_retry_countintno3
max_batch_sizeintno10
tls_verify_certificatebooleannotrue
tls_verify_hostnamesbooleannotrue
tls_keystore_pathstringno-
tls_keystore_passwordstringno-
tls_truststore_pathstringno-
tls_truststore_passwordstringno-
common-optionsno-

hosts [array]

Elasticsearch cluster http address, the format is host:port , allowing multiple hosts to be specified. Such as ["host1:9200", "host2:9200"].

index [string]

Elasticsearch index name.Index support contains variables of field name,such as seatunnel_${age}(Need to configure schema_save_mode="IGNORE"),and the field must appear at seatunnel row. If not, we will treat it as a normal index.

index_type [string]

Elasticsearch index type, it is recommended not to specify in elasticsearch 6 and above

primary_keys [list]

Primary key fields used to generate the document _id, this is cdc required options.

key_delimiter [string]

Delimiter for composite keys ("_" by default), e.g., "$" would result in document _id "KEY1$KEY2$KEY3".

username [string]

x-pack username

password [string]

x-pack password

max_retry_count [int]

one bulk request max try size

max_batch_size [int]

batch bulk doc max size

tls_verify_certificate [boolean]

Enable certificates validation for HTTPS endpoints

tls_verify_hostname [boolean]

Enable hostname validation for HTTPS endpoints

tls_keystore_path [string]

The path to the PEM or JKS key store. This file must be readable by the operating system user running SeaTunnel.

tls_keystore_password [string]

The key password for the key store specified

tls_truststore_path [string]

The path to PEM or JKS trust store. This file must be readable by the operating system user running SeaTunnel.

tls_truststore_password [string]

The key password for the trust store specified

common options

Sink plugin common parameters, please refer to Sink Common Options for details

schema_save_mode

Before the synchronous task is turned on, different treatment schemes are selected for the existing surface structure of the target side. Option introduction:
RECREATE_SCHEMA :Will create when the table does not exist, delete and rebuild when the table is saved
CREATE_SCHEMA_WHEN_NOT_EXIST :Will Created when the table does not exist, skipped when the table is saved
ERROR_WHEN_SCHEMA_NOT_EXIST :Error will be reported when the table does not exist
IGNORE :Ignore the treatment of the table

data_save_mode

Before the synchronous task is turned on, different processing schemes are selected for data existing data on the target side. Option introduction:
DROP_DATA: Preserve database structure and delete data
APPEND_DATA:Preserve database structure, preserve data
ERROR_WHEN_DATA_EXISTS:When there is data, an error is reported

Examples

Simple

sink {
Elasticsearch {
hosts = ["localhost:9200"]
index = "seatunnel-${age}"
schema_save_mode="IGNORE"
}
}

Multi-table writing

sink {
Elasticsearch {
hosts = ["localhost:9200"]
index = "${table_name}"
schema_save_mode="IGNORE"
}
}

CDC(Change data capture) event

sink {
Elasticsearch {
hosts = ["localhost:9200"]
index = "seatunnel-${age}"
schema_save_mode="IGNORE"
# cdc required options
primary_keys = ["key1", "key2", ...]
}
}

CDC(Change data capture) event Multi-table writing

sink {
Elasticsearch {
hosts = ["localhost:9200"]
index = "${table_name}"
schema_save_mode="IGNORE"
primary_keys = ["${primary_key}"]
}
}

SSL (Disable certificates validation)

sink {
Elasticsearch {
hosts = ["https://localhost:9200"]
username = "elastic"
password = "elasticsearch"

tls_verify_certificate = false
}
}

SSL (Disable hostname validation)

sink {
Elasticsearch {
hosts = ["https://localhost:9200"]
username = "elastic"
password = "elasticsearch"

tls_verify_hostname = false
}
}

SSL (Enable certificates validation)

sink {
Elasticsearch {
hosts = ["https://localhost:9200"]
username = "elastic"
password = "elasticsearch"

tls_keystore_path = "${your elasticsearch home}/config/certs/http.p12"
tls_keystore_password = "${your password}"
}
}

SAVE_MODE (Add saveMode function)

sink {
Elasticsearch {
hosts = ["https://localhost:9200"]
username = "elastic"
password = "elasticsearch"

schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"
data_save_mode = "APPEND_DATA"
}
}

Schema Evolution

CDC collection supports a limited number of schema changes. The currently supported schema changes include:

  • Adding columns.

Schema Evolution

env {
# You can set engine configuration here
parallelism = 5
job.mode = "STREAMING"
checkpoint.interval = 5000
read_limit.bytes_per_second = 7000000
read_limit.rows_per_second = 400
}

source {
MySQL-CDC {
server-id = 5652-5657
username = "st_user_source"
password = "mysqlpw"
table-names = ["shop.products"]
base-url = "jdbc:mysql://mysql_cdc_e2e:3306/shop"
schema-changes.enabled = true
}
}

sink {
Elasticsearch {
hosts = ["https://elasticsearch:9200"]
username = "elastic"
password = "elasticsearch"
tls_verify_certificate = false
tls_verify_hostname = false
index = "schema_change_index"
index_type = "_doc"
"schema_save_mode" = "CREATE_SCHEMA_WHEN_NOT_EXIST"
"data_save_mode" = "APPEND_DATA"
}
}

Changelog

Change Log
ChangeCommitVersion
[Feature][elasticsearch-connector] support elasticsearch sql source (#8895)https://github.com/apache/seatunnel/commit/8140862792.3.10
[Fix] Fix error log name for SourceSplitEnumerator implements class (#8817)https://github.com/apache/seatunnel/commit/55ed90eca2.3.10
[Improve] restruct connector common options (#8634)https://github.com/apache/seatunnel/commit/f3499a6ee2.3.10
[improve] add Elasticsearch options (#8623)https://github.com/apache/seatunnel/commit/d307ab44f2.3.10
[Fix][connector-elasticsearch] support elasticsearch nest type && spark with Array<map> (#8492)https://github.com/apache/seatunnel/commit/92d2a4a102.3.10
Revert "[Feature][connector-elasticsearch] elasticsearch support nested type (#8462)" (#8485)https://github.com/apache/seatunnel/commit/c689448932.3.9
[Feature][connector-elasticsearch] elasticsearch support nested type (#8462)https://github.com/apache/seatunnel/commit/eaa15e4c82.3.9
[Feature][Elasticsearch] Support sink ddl (#8412)https://github.com/apache/seatunnel/commit/a4a38ccff2.3.9
[hotfix][connector-elasticsearch-sink] Convert index to lowercase (#8429)https://github.com/apache/seatunnel/commit/46fcb237c2.3.9
[Improve][Elasticsearch] Truncate the exception message body for request errors (#8263)https://github.com/apache/seatunnel/commit/b9d850e612.3.9
[Improve][dist]add shade check rule (#8136)https://github.com/apache/seatunnel/commit/51ef800012.3.9
[Feature][Restapi] Allow metrics information to be associated to logical plan nodes (#7786)https://github.com/apache/seatunnel/commit/6b7c53d032.3.9
[Fix][Connector-V2] Fix known directory create and delete ignore issues (#7700)https://github.com/apache/seatunnel/commit/e2fb679572.3.8
[Feature][Elastic search] Support multi-table source feature (#7502)https://github.com/apache/seatunnel/commit/29fbeb2542.3.8
[Hotfix][Connector-V2] Fix null not inserted in es (#7493)https://github.com/apache/seatunnel/commit/a4ba6a1712.3.8
[Improve][API] Move catalog open to SaveModeHandler (#7439)https://github.com/apache/seatunnel/commit/8c2c5c79a2.3.8
[Improve][Connector] Add multi-table sink option check (#7360)https://github.com/apache/seatunnel/commit/2489f64462.3.7
[Feature][Core] Support using upstream table placeholders in sink options and auto replacement (#7131)https://github.com/apache/seatunnel/commit/c4ca741222.3.6
[Fix][Connector-V2][Elasticsearch]Fix sink configuration for DROP_DATA (#7124)https://github.com/apache/seatunnel/commit/bb9fd516e2.3.6
[Feature][Elasticsearch] Support multi-table sink write #7041 (#7052)https://github.com/apache/seatunnel/commit/45653e1d22.3.6
[Feature][Doris] Add Doris type converter (#6354)https://github.com/apache/seatunnel/commit/5189991842.3.6
[Fix][Connector-V2] Remove Some Incorrect Comments and Properties in ElasticsearchCommitInfohttps://github.com/apache/seatunnel/commit/7202987752.3.6
[Bug][Improve][Connector-v2][ElasticsearchSource] Fix behavior when source empty,Support SourceConfig.SOURCE field empty. (#6425)https://github.com/apache/seatunnel/commit/4e98eb8632.3.6
[Improve][Connector-V2] Add ElasticSearch type converter (#6546)https://github.com/apache/seatunnel/commit/505c1252b2.3.5
[Fix][Connector-V2] Fix connector support SPI but without no args constructor (#6551)https://github.com/apache/seatunnel/commit/5f3c9c36a2.3.5
[Improve] Add SaveMode log of process detail (#6375)https://github.com/apache/seatunnel/commit/b0d70ce222.3.5
[Improve][API] Unify type system api(data & type) (#5872)https://github.com/apache/seatunnel/commit/b38c7edcc2.3.5
[Improve] Implement ElasticSearch connector factory (#6181)https://github.com/apache/seatunnel/commit/1fd854de62.3.4
[Feature][Connector] add elasticsearch save_mode (#6046)https://github.com/apache/seatunnel/commit/716a36ac32.3.4
[Improve][Connector-V2] Replace CommonErrorCodeDeprecated.JSON_OPERATION_FAILED (#5978)https://github.com/apache/seatunnel/commit/456cd17712.3.4
[Feature] Add unsupported datatype check for all catalog (#5890)https://github.com/apache/seatunnel/commit/b9791285a2.3.4
[BUG][Connector-V2] Fixed conversion exception of elasticsearch array format (#5825)https://github.com/apache/seatunnel/commit/64f19f25d2.3.4
[Improve][Common] Introduce new error define rule (#5793)https://github.com/apache/seatunnel/commit/9d1b2582b2.3.4
[Improve] Remove use SeaTunnelSink::getConsumedType method and mark it as deprecated (#5755)https://github.com/apache/seatunnel/commit/8de7408102.3.4
[Improve][Connector] Add field name to DataTypeConvertor to improve error message (#5782)https://github.com/apache/seatunnel/commit/ab60790f02.3.4
Support config column/primaryKey/constraintKey in schema (#5564)https://github.com/apache/seatunnel/commit/eac76b4e52.3.4
[Improve][CheckStyle] Remove useless 'SuppressWarnings' annotation of checkstyle. (#5260)https://github.com/apache/seatunnel/commit/51c0d709b2.3.4
[Chore] Update the es version in the docs. (#4499)https://github.com/apache/seatunnel/commit/4151506352.3.2
[Improve][ElasticsearchSink]remove useless code. (#4500)https://github.com/apache/seatunnel/commit/ef44c0d442.3.2
[Hotfix][Connector-V2][ES] Source deserializer error and inappropriate (#4233)https://github.com/apache/seatunnel/commit/15530d2782.3.2
[Feature][Connector-V2][ES] Support dsl filter (#4130)https://github.com/apache/seatunnel/commit/79ca878332.3.1
[Bug][Connector-V2][ES]Fix es field type not support binary(#4240) (#4274)https://github.com/apache/seatunnel/commit/84f10f2012.3.1
Merge branch 'dev' into merge/cdchttps://github.com/apache/seatunnel/commit/4324ee1912.3.1
[Improve][Project] Code format with spotless plugin.https://github.com/apache/seatunnel/commit/423b583032.3.1
Shade google common in hadoop (#4222)https://github.com/apache/seatunnel/commit/5376905072.3.1
Set es text type to string (#4192)https://github.com/apache/seatunnel/commit/473971b942.3.1
[improve][api] Refactoring schema parse (#4157)https://github.com/apache/seatunnel/commit/b2f573a132.3.1
Support ES catalog get field mapping (#4167)https://github.com/apache/seatunnel/commit/72f2418712.3.1
[Improve][build] Give the maven module a human readable name (#4114)https://github.com/apache/seatunnel/commit/d7cd601052.3.1
[Bug][Connector-V2][ES]Fix es source no data (#4076)https://github.com/apache/seatunnel/commit/a573b8dbe2.3.1
Add convertor factory (#4119)https://github.com/apache/seatunnel/commit/cbdea45d92.3.1
Add ElasticSearch catalog (#4108)https://github.com/apache/seatunnel/commit/9ee4d83942.3.1
[Improve][Project] Code format with spotless plugin. (#4101)https://github.com/apache/seatunnel/commit/a2ab166562.3.1
[Feature][Connector-V2][Elasticsearch] Support https protocol (#3997)https://github.com/apache/seatunnel/commit/79b5cdd9c2.3.1
[Feature][shade][Jackson] Add seatunnel-jackson module (#3947)https://github.com/apache/seatunnel/commit/5d8862ec92.3.1
[Feature][Connector] add get source method to all source connector (#3846)https://github.com/apache/seatunnel/commit/417178fb82.3.1
[Feature][API &amp; Connector &amp; Doc] add parallelism and column projection interface (#3829)https://github.com/apache/seatunnel/commit/b9164b8ba2.3.1
[hotfix][connector-v2][elasticsearch] Fix bulk refresh operation not locked (#3738)https://github.com/apache/seatunnel/commit/b6cab90d22.3.0
[feature][connector-v2][elasticsearch] Support write cdc changelog event in elasticsearch sink (#3673)https://github.com/apache/seatunnel/commit/3ec47c6842.3.0
[Hotfix][OptionRule] Fix option rule about all connectors (#3592)https://github.com/apache/seatunnel/commit/226dc6a112.3.0
[Improve][Connector-V2][ElasticSearch] Unified exception for ElasticSearch source & sink connector (#3569)https://github.com/apache/seatunnel/commit/b73944d1d2.3.0
[Improve][Connector-V2] Bad smell ToArrayCallWithZeroLengthArrayArgument: (#3577)https://github.com/apache/seatunnel/commit/cc448d98c2.3.0
[Improve][Connector-V2][ElasticSearch] Improve es bulk sink retriable mechanism (#3148)https://github.com/apache/seatunnel/commit/02ef38eb72.3.0
[Connector-V2][E2E] Add missed ElasticSearch E2E module. (#3338)https://github.com/apache/seatunnel/commit/b2dad4d472.3.0
[Connector-V2][ElasticSearch] Add ElasticSearch Source/Sink Factory (#3325)https://github.com/apache/seatunnel/commit/38254e3f22.3.0
[Feature][Connector-V2][Elasticsearch] Support Elasticsearch source (#2821)https://github.com/apache/seatunnel/commit/ded5481d92.3.0
update (#3149)https://github.com/apache/seatunnel/commit/59abe4ad62.3.0
[Improve][all] change Log to @Slf4j (#3001)https://github.com/apache/seatunnel/commit/6016100f12.3.0-beta
[Connector-V2][ElasticSearch] Fix ElasticSearch Connector V2 Bug (#2817)https://github.com/apache/seatunnel/commit/2fcbbf4642.2.0-beta
[DEV][Api] Replace SeaTunnelContext with JobContext and remove singleton pattern (#2706)https://github.com/apache/seatunnel/commit/cbf82f7552.2.0-beta
[#2606]Dependency management split (#2630)https://github.com/apache/seatunnel/commit/fc047be692.2.0-beta
[Feature][Connector-V2] new connecotor of Elasticsearch sink(#2326) (#2330)https://github.com/apache/seatunnel/commit/2a1fd50272.2.0-beta