Skip to main content
Version: 2.3.10

Elasticsearch

Elasticsearch source connector

Description

Used to read data from Elasticsearch.

support version >= 2.x and <= 8.x.

Key features

Options

nametyperequireddefault value
hostsarrayyes-
usernamestringno-
passwordstringno-
indexstringnoIf the index list does not exist, the index must be configured
index_listarraynoused to define a multiple table task
sourcearrayno-
queryjsonno{"match_all": {}}
search_typejsonnoSearch method,sql or dsl,default dsl
sql_queryjsonnosql query
scroll_timestringno1m
scroll_sizeintno100
tls_verify_certificatebooleannotrue
tls_verify_hostnamesbooleannotrue
array_columnmapno
tls_keystore_pathstringno-
tls_keystore_passwordstringno-
tls_truststore_pathstringno-
tls_truststore_passwordstringno-
common-optionsno-

hosts [array]

Elasticsearch cluster http address, the format is host:port, allowing multiple hosts to be specified. Such as ["host1:9200", "host2:9200"].

username [string]

x-pack username.

password [string]

x-pack password.

index [string]

Elasticsearch index name, support * fuzzy matching.

source [array]

The fields of index. You can get the document id by specifying the field _id.If sink _id to other index,you need specify an alias for _id due to the Elasticsearch limit. If you don't config source, it is automatically retrieved from the mapping of the index.

array_column [array]

The fields of array type. Since there is no array index in es,so need assign array type,just like {c_array = "array<tinyint>"}.

query [json]

Elasticsearch DSL. You can control the range of data read.

scroll_time [String]

Amount of time Elasticsearch will keep the search context alive for scroll requests.

scroll_size [int]

Maximum number of hits to be returned with each Elasticsearch scroll request.

index_list [array]

The index_list is used to define multi-index synchronization tasks. It is an array that contains the parameters required for single-table synchronization, such as query, source/schema, scroll_size, and scroll_time. It is recommended that index_list and query should not be configured at the same level simultaneously. Please refer to the upcoming multi-table synchronization example for more details.

tls_verify_certificate [boolean]

Enable certificates validation for HTTPS endpoints

tls_verify_hostname [boolean]

Enable hostname validation for HTTPS endpoints

tls_keystore_path [string]

The path to the PEM or JKS key store. This file must be readable by the operating system user running SeaTunnel.

tls_keystore_password [string]

The key password for the key store specified

tls_truststore_path [string]

The path to PEM or JKS trust store. This file must be readable by the operating system user running SeaTunnel.

tls_truststore_password [string]

The key password for the trust store specified

common options

Source plugin common parameters, please refer to Source Common Options for details

Examples

Demo 1

This case will read data from indices matching the seatunnel-* pattern based on a query. The query will only return documents containing the id, name, age, tags, and phones fields. In this example, the source field configuration is used to specify which fields should be read, and the array_column is used to indicate that tags and phones should be treated as arrays.

Elasticsearch {
hosts = ["localhost:9200"]
index = "seatunnel-*"
array_column = {tags = "array<string>",phones = "array<string>"}
source = ["_id","name","age","tags","phones"]
query = {"range":{"firstPacket":{"gte":1669225429990,"lte":1669225429990}}}
}

Demo 2 : Multi-table synchronization

This example demonstrates how to read different data from read_index1 and read_index2 and write separately to read_index1_copy,read_index2_copy. in read_index1,I used source to specify the fields to be read and specify which fields are array fields using the 'array_column'.

source {
Elasticsearch {
hosts = ["https://elasticsearch:9200"]
username = "elastic"
password = "elasticsearch"
tls_verify_certificate = false
tls_verify_hostname = false
index_list = [
{
index = "read_index1"
query = {"range": {"c_int": {"gte": 10, "lte": 20}}}
source = [
c_map,
c_array,
c_string,
c_boolean,
c_tinyint,
c_smallint,
c_bigint,
c_float,
c_double,
c_decimal,
c_bytes,
c_int,
c_date,
c_timestamp]
array_column = {
c_array = "array<tinyint>"
}
}
{
index = "read_index2"
query = {"match_all": {}}
source = [
c_int2,
c_date2,
c_null
]

}

]

}
}

transform {
}

sink {
Elasticsearch {
hosts = ["https://elasticsearch:9200"]
username = "elastic"
password = "elasticsearch"
tls_verify_certificate = false
tls_verify_hostname = false

index = "${table_name}_copy"
index_type = "st"
"schema_save_mode"="CREATE_SCHEMA_WHEN_NOT_EXIST"
"data_save_mode"="APPEND_DATA"
}
}

Demo 3 : SSL (Disable certificates validation)

source {
Elasticsearch {
hosts = ["https://localhost:9200"]
username = "elastic"
password = "elasticsearch"

tls_verify_certificate = false
}
}

Demo 4 :SSL (Disable hostname validation)

source {
Elasticsearch {
hosts = ["https://localhost:9200"]
username = "elastic"
password = "elasticsearch"

tls_verify_hostname = false
}
}

Demo 5 :SSL (Enable certificates validation)

source {
Elasticsearch {
hosts = ["https://localhost:9200"]
username = "elastic"
password = "elasticsearch"

tls_keystore_path = "${your elasticsearch home}/config/certs/http.p12"
tls_keystore_password = "${your password}"
}
}

Demo 6 : sql query notes: sql does not support map and array types

source {
Elasticsearch {
hosts = ["https://elasticsearch:9200"]
username = "elastic"
password = "elasticsearch"
tls_verify_certificate = false
tls_verify_hostname = false
index = "st_index_sql"
sql_query = "select * from st_index_sql where c_int>=10 and c_int<=20"
search_type = "sql"
}
}

Changelog

Change Log
ChangeCommitVersion
[Feature][elasticsearch-connector] support elasticsearch sql source (#8895)https://github.com/apache/seatunnel/commit/8140862792.3.10
[Fix] Fix error log name for SourceSplitEnumerator implements class (#8817)https://github.com/apache/seatunnel/commit/55ed90eca2.3.10
[Improve] restruct connector common options (#8634)https://github.com/apache/seatunnel/commit/f3499a6ee2.3.10
[improve] add Elasticsearch options (#8623)https://github.com/apache/seatunnel/commit/d307ab44f2.3.10
[Fix][connector-elasticsearch] support elasticsearch nest type && spark with Array<map> (#8492)https://github.com/apache/seatunnel/commit/92d2a4a102.3.10
Revert "[Feature][connector-elasticsearch] elasticsearch support nested type (#8462)" (#8485)https://github.com/apache/seatunnel/commit/c689448932.3.9
[Feature][connector-elasticsearch] elasticsearch support nested type (#8462)https://github.com/apache/seatunnel/commit/eaa15e4c82.3.9
[Feature][Elasticsearch] Support sink ddl (#8412)https://github.com/apache/seatunnel/commit/a4a38ccff2.3.9
[hotfix][connector-elasticsearch-sink] Convert index to lowercase (#8429)https://github.com/apache/seatunnel/commit/46fcb237c2.3.9
[Improve][Elasticsearch] Truncate the exception message body for request errors (#8263)https://github.com/apache/seatunnel/commit/b9d850e612.3.9
[Improve][dist]add shade check rule (#8136)https://github.com/apache/seatunnel/commit/51ef800012.3.9
[Feature][Restapi] Allow metrics information to be associated to logical plan nodes (#7786)https://github.com/apache/seatunnel/commit/6b7c53d032.3.9
[Fix][Connector-V2] Fix known directory create and delete ignore issues (#7700)https://github.com/apache/seatunnel/commit/e2fb679572.3.8
[Feature][Elastic search] Support multi-table source feature (#7502)https://github.com/apache/seatunnel/commit/29fbeb2542.3.8
[Hotfix][Connector-V2] Fix null not inserted in es (#7493)https://github.com/apache/seatunnel/commit/a4ba6a1712.3.8
[Improve][API] Move catalog open to SaveModeHandler (#7439)https://github.com/apache/seatunnel/commit/8c2c5c79a2.3.8
[Improve][Connector] Add multi-table sink option check (#7360)https://github.com/apache/seatunnel/commit/2489f64462.3.7
[Feature][Core] Support using upstream table placeholders in sink options and auto replacement (#7131)https://github.com/apache/seatunnel/commit/c4ca741222.3.6
[Fix][Connector-V2][Elasticsearch]Fix sink configuration for DROP_DATA (#7124)https://github.com/apache/seatunnel/commit/bb9fd516e2.3.6
[Feature][Elasticsearch] Support multi-table sink write #7041 (#7052)https://github.com/apache/seatunnel/commit/45653e1d22.3.6
[Feature][Doris] Add Doris type converter (#6354)https://github.com/apache/seatunnel/commit/5189991842.3.6
[Fix][Connector-V2] Remove Some Incorrect Comments and Properties in ElasticsearchCommitInfohttps://github.com/apache/seatunnel/commit/7202987752.3.6
[Bug][Improve][Connector-v2][ElasticsearchSource] Fix behavior when source empty,Support SourceConfig.SOURCE field empty. (#6425)https://github.com/apache/seatunnel/commit/4e98eb8632.3.6
[Improve][Connector-V2] Add ElasticSearch type converter (#6546)https://github.com/apache/seatunnel/commit/505c1252b2.3.5
[Fix][Connector-V2] Fix connector support SPI but without no args constructor (#6551)https://github.com/apache/seatunnel/commit/5f3c9c36a2.3.5
[Improve] Add SaveMode log of process detail (#6375)https://github.com/apache/seatunnel/commit/b0d70ce222.3.5
[Improve][API] Unify type system api(data & type) (#5872)https://github.com/apache/seatunnel/commit/b38c7edcc2.3.5
[Improve] Implement ElasticSearch connector factory (#6181)https://github.com/apache/seatunnel/commit/1fd854de62.3.4
[Feature][Connector] add elasticsearch save_mode (#6046)https://github.com/apache/seatunnel/commit/716a36ac32.3.4
[Improve][Connector-V2] Replace CommonErrorCodeDeprecated.JSON_OPERATION_FAILED (#5978)https://github.com/apache/seatunnel/commit/456cd17712.3.4
[Feature] Add unsupported datatype check for all catalog (#5890)https://github.com/apache/seatunnel/commit/b9791285a2.3.4
[BUG][Connector-V2] Fixed conversion exception of elasticsearch array format (#5825)https://github.com/apache/seatunnel/commit/64f19f25d2.3.4
[Improve][Common] Introduce new error define rule (#5793)https://github.com/apache/seatunnel/commit/9d1b2582b2.3.4
[Improve] Remove use SeaTunnelSink::getConsumedType method and mark it as deprecated (#5755)https://github.com/apache/seatunnel/commit/8de7408102.3.4
[Improve][Connector] Add field name to DataTypeConvertor to improve error message (#5782)https://github.com/apache/seatunnel/commit/ab60790f02.3.4
Support config column/primaryKey/constraintKey in schema (#5564)https://github.com/apache/seatunnel/commit/eac76b4e52.3.4
[Improve][CheckStyle] Remove useless 'SuppressWarnings' annotation of checkstyle. (#5260)https://github.com/apache/seatunnel/commit/51c0d709b2.3.4
[Chore] Update the es version in the docs. (#4499)https://github.com/apache/seatunnel/commit/4151506352.3.2
[Improve][ElasticsearchSink]remove useless code. (#4500)https://github.com/apache/seatunnel/commit/ef44c0d442.3.2
[Hotfix][Connector-V2][ES] Source deserializer error and inappropriate (#4233)https://github.com/apache/seatunnel/commit/15530d2782.3.2
[Feature][Connector-V2][ES] Support dsl filter (#4130)https://github.com/apache/seatunnel/commit/79ca878332.3.1
[Bug][Connector-V2][ES]Fix es field type not support binary(#4240) (#4274)https://github.com/apache/seatunnel/commit/84f10f2012.3.1
Merge branch 'dev' into merge/cdchttps://github.com/apache/seatunnel/commit/4324ee1912.3.1
[Improve][Project] Code format with spotless plugin.https://github.com/apache/seatunnel/commit/423b583032.3.1
Shade google common in hadoop (#4222)https://github.com/apache/seatunnel/commit/5376905072.3.1
Set es text type to string (#4192)https://github.com/apache/seatunnel/commit/473971b942.3.1
[improve][api] Refactoring schema parse (#4157)https://github.com/apache/seatunnel/commit/b2f573a132.3.1
Support ES catalog get field mapping (#4167)https://github.com/apache/seatunnel/commit/72f2418712.3.1
[Improve][build] Give the maven module a human readable name (#4114)https://github.com/apache/seatunnel/commit/d7cd601052.3.1
[Bug][Connector-V2][ES]Fix es source no data (#4076)https://github.com/apache/seatunnel/commit/a573b8dbe2.3.1
Add convertor factory (#4119)https://github.com/apache/seatunnel/commit/cbdea45d92.3.1
Add ElasticSearch catalog (#4108)https://github.com/apache/seatunnel/commit/9ee4d83942.3.1
[Improve][Project] Code format with spotless plugin. (#4101)https://github.com/apache/seatunnel/commit/a2ab166562.3.1
[Feature][Connector-V2][Elasticsearch] Support https protocol (#3997)https://github.com/apache/seatunnel/commit/79b5cdd9c2.3.1
[Feature][shade][Jackson] Add seatunnel-jackson module (#3947)https://github.com/apache/seatunnel/commit/5d8862ec92.3.1
[Feature][Connector] add get source method to all source connector (#3846)https://github.com/apache/seatunnel/commit/417178fb82.3.1
[Feature][API &amp; Connector &amp; Doc] add parallelism and column projection interface (#3829)https://github.com/apache/seatunnel/commit/b9164b8ba2.3.1
[hotfix][connector-v2][elasticsearch] Fix bulk refresh operation not locked (#3738)https://github.com/apache/seatunnel/commit/b6cab90d22.3.0
[feature][connector-v2][elasticsearch] Support write cdc changelog event in elasticsearch sink (#3673)https://github.com/apache/seatunnel/commit/3ec47c6842.3.0
[Hotfix][OptionRule] Fix option rule about all connectors (#3592)https://github.com/apache/seatunnel/commit/226dc6a112.3.0
[Improve][Connector-V2][ElasticSearch] Unified exception for ElasticSearch source & sink connector (#3569)https://github.com/apache/seatunnel/commit/b73944d1d2.3.0
[Improve][Connector-V2] Bad smell ToArrayCallWithZeroLengthArrayArgument: (#3577)https://github.com/apache/seatunnel/commit/cc448d98c2.3.0
[Improve][Connector-V2][ElasticSearch] Improve es bulk sink retriable mechanism (#3148)https://github.com/apache/seatunnel/commit/02ef38eb72.3.0
[Connector-V2][E2E] Add missed ElasticSearch E2E module. (#3338)https://github.com/apache/seatunnel/commit/b2dad4d472.3.0
[Connector-V2][ElasticSearch] Add ElasticSearch Source/Sink Factory (#3325)https://github.com/apache/seatunnel/commit/38254e3f22.3.0
[Feature][Connector-V2][Elasticsearch] Support Elasticsearch source (#2821)https://github.com/apache/seatunnel/commit/ded5481d92.3.0
update (#3149)https://github.com/apache/seatunnel/commit/59abe4ad62.3.0
[Improve][all] change Log to @Slf4j (#3001)https://github.com/apache/seatunnel/commit/6016100f12.3.0-beta
[Connector-V2][ElasticSearch] Fix ElasticSearch Connector V2 Bug (#2817)https://github.com/apache/seatunnel/commit/2fcbbf4642.2.0-beta
[DEV][Api] Replace SeaTunnelContext with JobContext and remove singleton pattern (#2706)https://github.com/apache/seatunnel/commit/cbf82f7552.2.0-beta
[#2606]Dependency management split (#2630)https://github.com/apache/seatunnel/commit/fc047be692.2.0-beta
[Feature][Connector-V2] new connecotor of Elasticsearch sink(#2326) (#2330)https://github.com/apache/seatunnel/commit/2a1fd50272.2.0-beta