跳到主要内容
版本:Next

Typesense

描述

将 SeaTunnel 数据写入 Typesense collection。该 connector 可以按配置创建目标 collection、 写入前清理已有文档,也可以用一个或多个主键字段生成 Typesense 文档 id

主要特性

选项

名称类型是否必须默认值
hostsarray-
collectionstring-
schema_save_modestringCREATE_SCHEMA_WHEN_NOT_EXIST
data_save_modestringAPPEND_DATA
primary_keysarray
key_delimiterstring_
api_keystring-
max_retry_countint3
max_batch_sizeint10
common-options-

hosts [array]

Typesense 的访问地址,格式为 host:port,例如:["typesense-01:8108"]

collection [string]

要写入的 collection 名,例如:seatunnel

primary_keys [array]

主键字段用于生成文档 id。配置多个字段时,connector 会用 key_delimiter 拼接这些字段值。

key_delimiter [string]

设定复合键的分隔符(默认为 _)。

api_key [string]

Typesense 安全认证的 api_key

max_retry_count [int]

单个批量请求的最大重试次数。

max_batch_size [int]

每批最多写入的文档数量。

common options

Sink 插件常用参数,请参考 Sink 常用选项 了解详情。

schema_save_mode

在启动同步任务之前,针对目标侧已有的表结构选择不同的处理方案
选项介绍:
RECREATE_SCHEMA :当表不存在时会创建,当表已存在时会删除并重建
CREATE_SCHEMA_WHEN_NOT_EXIST :当表不存在时会创建,当表已存在时则跳过创建
ERROR_WHEN_SCHEMA_NOT_EXIST :当表不存在时将抛出错误

data_save_mode

在启动同步任务之前,针对目标侧已存在的数据选择不同的处理方案
选项介绍:
DROP_DATA: 保留数据库结构,删除数据
APPEND_DATA:保留数据库结构,保留数据
ERROR_WHEN_DATA_EXISTS:当有数据时抛出错误

任务示例

使用主键写入文档

env {
parallelism = 1
job.mode = "BATCH"
}

source {
FakeSource {
row.num = 5
plugin_output = "typesense_test_table"
schema {
fields {
company_name = string
num = long
id = string
num_employees = int
flag = boolean
}
}
}
}

sink {
Typesense {
plugin_input = "typesense_test_table"
hosts = ["localhost:8108"]
collection = "typesense_test_collection"
api_key = "xyz"
primary_keys = ["num_employees", "num"]
key_delimiter = "="
max_retry_count = 3
max_batch_size = 10
schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"
data_save_mode = "APPEND_DATA"
}
}

从 Typesense 读取并写入另一个 collection

env {
parallelism = 1
job.mode = "BATCH"
}

source {
Typesense {
hosts = ["localhost:8108"]
collection = "typesense_source_collection"
api_key = "xyz"
query = "q=*&filter_by=c_row.c_int:>10"
plugin_output = "typesense_test_table"
schema = {
fields {
company_name_list = array<string>
company_name = string
num_employees = long
country = string
id = string
c_row = {
c_int = int
c_string = string
c_array_int = array<int>
}
}
}
}
}

sink {
Typesense {
plugin_input = "typesense_test_table"
hosts = ["localhost:8108"]
collection = "typesense_sink_collection"
api_key = "xyz"
primary_keys = ["num_employees", "id"]
key_delimiter = "="
max_retry_count = 3
max_batch_size = 10
schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"
data_save_mode = "APPEND_DATA"
}
}

变更日志