跳到主要内容
版本:Next

Lance

Lance sink 连接器

支持的引擎

Spark 3.4 及以上版本
Flink 暂不支持
SeaTunnel Zeta

主要特性

描述

Lance sink 用于把 SeaTunnel 数据写入 Lance 数据集。它可以根据上游 SeaTunnel 表结构创建 Lance 表,并按配置的 Lance 写入模式创建或追加数据。

当前连接器支持基于目录的 Lance namespace。

依赖

<dependency>
<groupId>com.lancedb</groupId>
<artifactId>lance-core</artifactId>
<version>0.33.0</version>
</dependency>

<dependency>
<groupId>com.lancedb</groupId>
<artifactId>lance-namespace-core</artifactId>
<version>0.0.14</version>
</dependency>

Sink 配置项

名称类型是否必填默认值说明
dataset_pathstring/test.lanceLance 数据集路径。目录 namespace 下通常是本地数据路径。
namespace_typestringdirLance namespace 类型。当前仅支持 dir
namespace_idstring""Lance namespace ID。
namespace_idslist[]解析目标表 namespace 时使用的 namespace 路径片段。
root_namespace_pathstring/tmpLance namespace 的根路径。
tablestringtest目标 Lance 表名。设置后会覆盖上游表名。
lance.write.max-rows-per-fileint10单个 Lance 文件最多写入的行数。
lance.write.max-rows-per-groupint20单个 Lance row group 最多写入的行数。
lance.write.max-bytes-per-filelong20480单个 Lance 文件最多写入的字节数。
lance.write.modestringCREATELance 写入模式,会传给 Lance WriteParams.WriteMode
lance.write.enable.stable.row.idsbooleantrue写入 Lance 时是否启用稳定 row ID。
lance.write.storage.optionsmap{}传给 Lance 的额外存储参数。
multi_table_sink_replicaint1多表写入时的 sink 并行副本数。

dataset_path

Lance 数据的目录或数据集路径。使用本地目录模式时,请确保 SeaTunnel 运行环境有权限创建并写入该路径。

namespace_type

Lance namespace 类型。当前连接器支持 dir

table

目标 Lance 表名。不设置时,如果上游存在表名,连接器会使用上游表名;该配置本身的默认值是 test

lance.write.mode

控制 Lance 的写入方式。默认值是 CREATE。该值需要是 Lance WriteParams.WriteMode 支持的值。

lance.write.storage.options

以键值对形式传递额外的 Lance 存储参数。

示例:

lance.write.storage.options = {
key1 = "value1"
key2 = "value2"
}

数据类型映射

Lance 使用 Apache Arrow 类型系统。sink 会根据上游 SeaTunnel 表结构创建 Lance schema。

SeaTunnel 数据类型Lance / Arrow 数据类型
BOOLEANbool
TINYINTint32
SMALLINTint32
INTint32
BIGINTint32
FLOATfloat64
DOUBLEfloat64
DECIMALdecimal128
BYTESbinary
DATEdate32
TIMEtime32
TIMESTAMPtimestamp
STRINGutf8
ARRAYlist
MAPmap

任务示例

写入 FakeSource 数据到 Lance

env {
parallelism = 1
job.mode = "BATCH"
}

source {
FakeSource {
row.num = 100
schema = {
fields {
c_string = string
c_boolean = boolean
c_tinyint = tinyint
c_smallint = smallint
c_int = int
c_bigint = bigint
c_float = float
c_double = double
c_decimal = "decimal(30, 8)"
c_bytes = bytes
c_date = date
c_timestamp = timestamp
}
}
plugin_output = "fake"
}
}

sink {
Lance {
dataset_path = "/tmp/seatunnel_mnt/lanceTest/lance_sink_table"
namespace_type = "dir"
namespace_id = "root"
table = "lance_sink_table"
}
}

更新日志

Change Log
ChangeCommitVersion