Elasticsearch
描述
输出数据到 Elasticsearch
主要特性
引擎支持
- 支持
ElasticSearch 版本 >= 2.x 并且 <= 8.x
选项
名称 | 类型 | 是否必须 | 默认值 |
---|---|---|---|
hosts | array | 是 | - |
index | string | 是 | - |
schema_save_mode | string | 是 | CREATE_SCHEMA_WHEN_NOT_EXIST |
data_save_mode | string | 是 | APPEND_DATA |
index_type | string | 否 | |
primary_keys | list | 否 | |
key_delimiter | string | 否 | _ |
username | string | 否 | |
password | string | 否 | |
max_retry_count | int | 否 | 3 |
max_batch_size | int | 否 | 10 |
tls_verify_certificate | boolean | 否 | true |
tls_verify_hostnames | boolean | 否 | true |
tls_keystore_path | string | 否 | - |
tls_keystore_password | string | 否 | - |
tls_truststore_path | string | 否 | - |
tls_truststore_password | string | 否 | - |
common-options | 否 | - |
hosts [array]
Elasticsearch
集群http地址,格式为 host:port
,允许指定多个主机。例如 ["host1:9200", "host2:9200"]
index [string]
Elasticsearch
的 index
名称。索引支持包含字段名变量,例如 seatunnel_${age}
,并且该字段必须出现在 seatunnel Row 中。如果没有,我们将把它视为普通索引
index_type [string]
Elasticsearch
索引类型,elasticsearch 6及以上版本建议不要指定
primary_keys [list]
主键字段用于生成文档 _id
,这是 CDC 必需的选项。
key_delimiter [string]
设定复合键的分隔符(默认为 _
),例如,如果使用 $
作为分隔符,那么文档的 _id
将呈现为 KEY1$KEY2$KEY3
的格式
username [string]
x-pack 用户名
password [string]
x-pack 密码
max_retry_count [int]
批次批量请求最大尝试大小
max_batch_size [int]
批次批量文档最大大小
tls_verify_certificate [boolean]
为 HTTPS 端点启用证书验证
tls_verify_hostname [boolean]
为 HTTPS 端点启用主机名验证
tls_keystore_path [string]
指向 PEM 或 JKS 密钥存储的路径。运行 SeaTunnel 的操作系统用户必须能够读取此文件
tls_keystore_password [string]
指定的密钥存储的密钥密码
tls_truststore_path [string]
指向 PEM 或 JKS 信任存储的路径。运行 SeaTunnel 的操作系统用户必须能够读取此文件
tls_truststore_password [string]
指定的信任存储的密钥密码
common options
Sink插件常用参数,请参考 Sink常用选项 了解详情
schema_save_mode
在启动同步任务之前,针对目标侧已有的表结构选择不同的处理方案
选项介绍:
RECREATE_SCHEMA
:当表不存在时会创建,当表已存在时会删除并重建
CREATE_SCHEMA_WHEN_NOT_EXIST
:当表不存在时会创建,当表已存在时则跳过创建
ERROR_WHEN_SCHEMA_NOT_EXIST
:当表不存在时将抛出错误
data_save_mode
在启动同步任务之前,针对目标侧已存在的数据选择不同的处理方案
选项介绍:
DROP_DATA
: 保留数据库结构,删除数据
APPEND_DATA
:保留数据库结构,保留数据
ERROR_WHEN_DATA_EXISTS
:当有数据时抛出错误
示例
简单示例
sink {
Elasticsearch {
hosts = ["localhost:9200"]
index = "seatunnel-${age}"
}
}
变更数据捕获 (Change data capture) 事件
sink {
Elasticsearch {
hosts = ["localhost:9200"]
index = "seatunnel-${age}"
# CDC required options
primary_keys = ["key1", "key2", ...]
}
}
SSL 禁用证书验证
sink {
Elasticsearch {
hosts = ["https://localhost:9200"]
username = "elastic"
password = "elasticsearch"
tls_verify_certificate = false
}
}
SSL 禁用主机名验证
sink {
Elasticsearch {
hosts = ["https://localhost:9200"]
username = "elastic"
password = "elasticsearch"
tls_verify_hostname = false
}
}
SSL 启用证书验证
通过设置 tls_keystore_path
与 tls_keystore_password
指定证书路径及密码
sink {
Elasticsearch {
hosts = ["https://localhost:9200"]
username = "elastic"
password = "elasticsearch"
tls_keystore_path = "${your elasticsearch home}/config/certs/http.p12"
tls_keystore_password = "${your password}"
}
}
配置表生成策略 (schema_save_mode)
通过设置 schema_save_mode
配置为 CREATE_SCHEMA_WHEN_NOT_EXIST
来支持不存在表时创建表
sink {
Elasticsearch {
hosts = ["https://localhost:9200"]
username = "elastic"
password = "elasticsearch"
schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"
data_save_mode = "APPEND_DATA"
}
}