跳到主要内容
版本:2.3.10

Paimon

Paimon 源连接器

描述

用于从 Apache Paimon 读取数据

主要功能

配置选项

名称类型是否必须默认值
warehouseString-
catalog_typeStringfilesystem
catalog_uriString-
databaseString-
tableString-
hdfs_site_pathString-
queryString-
paimon.hadoop.confMap-
paimon.hadoop.conf-pathString-

warehouse [string]

Paimon warehouse 路径

catalog_type [string]

Paimon Catalog 类型,支持 filesystem 和 hive

catalog_uri [string]

Paimon 的 catalog uri,仅当 catalog_type 为 hive 时需要

database [string]

需要访问的数据库

table [string]

需要访问的表

hdfs_site_path [string]

hdfs-site.xml 文件地址

query [string]

读取表格的筛选条件,例如:select * from st_test where id > 100。如果未指定,则将读取所有记录。

目前,where 支持<, <=, >, >=, =, !=, or, and,is null, is not null, between...and,其他暂不支持。

由于 Paimon 限制,目前不支持 Having, Group ByOrder By,未来版本将会支持 projectionlimit

注意:当 where 后的字段为字符串或布尔值时,其值必须使用单引号,否则将会报错。例如 name='abc'tag='true'

当前 where 支持的字段数据类型如下:

  • string
  • boolean
  • tinyint
  • smallint
  • int
  • bigint
  • float
  • double
  • date
  • timestamp
  • time

paimon.hadoop.conf [string]

hadoop conf 属性

paimon.hadoop.conf-path [string]

指定 'core-site.xml', 'hdfs-site.xml', 'hive-site.xml' 文件加载路径。

Filesystems

Paimon 连接器支持向多个文件系统写入数据。目前,支持的文件系统有 hdfss3。 如果使用 s3 文件系统,可以在 paimon.hadoop.conf 中配置fs.s3a.access-keyfs.s3a.secret-keyfs.s3a.endpointfs.s3a.path.style.accessfs.s3a.aws.credentials.provider 属性,数仓地址应该以 s3a:// 开头。

示例

简单示例

source {
Paimon {
warehouse = "/tmp/paimon"
database = "default"
table = "st_test"
}
}

Filter 示例

source {
Paimon {
warehouse = "/tmp/paimon"
database = "full_type"
table = "st_test"
query = "select c_boolean, c_tinyint from st_test where c_boolean= 'true' and c_tinyint > 116 and c_smallint = 15987 or c_decimal='2924137191386439303744.39292213'"
}
}

S3 示例

env {
execution.parallelism = 1
job.mode = "BATCH"
}

source {
Paimon {
warehouse = "s3a://test/"
database = "seatunnel_namespace11"
table = "st_test"
paimon.hadoop.conf = {
fs.s3a.access-key=G52pnxg67819khOZ9ezX
fs.s3a.secret-key=SHJuAQqHsLrgZWikvMa3lJf5T0NfM5LMFliJh9HF
fs.s3a.endpoint="http://minio4:9000"
fs.s3a.path.style.access=true
fs.s3a.aws.credentials.provider=org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider
}
}
}

sink {
Console{}
}

Hadoop 配置示例

source {
Paimon {
catalog_name="seatunnel_test"
warehouse="hdfs:///tmp/paimon"
database="seatunnel_namespace1"
table="st_test"
query = "select * from st_test where pk_id is not null and pk_id < 3"
paimon.hadoop.conf = {
hadoop_user_name = "hdfs"
fs.defaultFS = "hdfs://nameservice1"
dfs.nameservices = "nameservice1"
dfs.ha.namenodes.nameservice1 = "nn1,nn2"
dfs.namenode.rpc-address.nameservice1.nn1 = "hadoop03:8020"
dfs.namenode.rpc-address.nameservice1.nn2 = "hadoop04:8020"
dfs.client.failover.proxy.provider.nameservice1 = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
dfs.client.use.datanode.hostname = "true"
}
}
}

Hive catalog 示例

source {
Paimon {
catalog_name="seatunnel_test"
catalog_type="hive"
catalog_uri="thrift://hadoop04:9083"
warehouse="hdfs:///tmp/seatunnel"
database="seatunnel_test"
table="st_test3"
paimon.hadoop.conf = {
fs.defaultFS = "hdfs://nameservice1"
dfs.nameservices = "nameservice1"
dfs.ha.namenodes.nameservice1 = "nn1,nn2"
dfs.namenode.rpc-address.nameservice1.nn1 = "hadoop03:8020"
dfs.namenode.rpc-address.nameservice1.nn2 = "hadoop04:8020"
dfs.client.failover.proxy.provider.nameservice1 = "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
dfs.client.use.datanode.hostname = "true"
}
}
}

Changelog

如果要读取 paimon 表的 changelog,首先要为 Paimon 源表设置 changelog-producer,然后使用 SeaTunnel 流任务读取。

Note

目前,批读取总是读取最新的快照,如需读取更完整的 changelog 数据,需使用流读取,并在将数据写入 Paimon 表之前开始流读取,为了确保顺序,流读取任务并行度应该设置为 1。

Streaming read 示例

env {
parallelism = 1
job.mode = "Streaming"
}

source {
Paimon {
warehouse = "/tmp/paimon"
database = "full_type"
table = "st_test"
}
}

sink {
Paimon {
warehouse = "/tmp/paimon"
database = "full_type"
table = "st_test_sink"
paimon.table.primary-keys = "c_tinyint"
}
}

变更日志

Change Log
ChangeCommitVersion
[Feature][Connector-V2] Support between predicate pushdown in paimon (#8962)https://github.com/apache/seatunnel/commit/3b141cf622.3.10
[Feature][Connector-V2] Suppor Time type in paimon connector (#8880)https://github.com/apache/seatunnel/commit/9f1e590092.3.10
[Feature][Paimon] Customize the hadoop user (#8888)https://github.com/apache/seatunnel/commit/2657626f92.3.10
[Improve][Connector-v2][Paimon]PaimonCatalog close error message update (#8640)https://github.com/apache/seatunnel/commit/48253da8d2.3.10
[Improve] restruct connector common options (#8634)https://github.com/apache/seatunnel/commit/f3499a6ee2.3.10
[Improve][Connector-v2] Support checkpoint in batch mode for paimon sink (#8333)https://github.com/apache/seatunnel/commit/f22d4ebd42.3.9
[Feature][Connector-v2] Support schema evolution for paimon sink (#8211)https://github.com/apache/seatunnel/commit/57190e2a32.3.9
[Improve][dist]add shade check rule (#8136)https://github.com/apache/seatunnel/commit/51ef800012.3.9
[Feature][Connector-v2] Support S3 filesystem of paimon connector (#8036)https://github.com/apache/seatunnel/commit/e2a4772932.3.9
[Feature][transform] transform support explode (#7928)https://github.com/apache/seatunnel/commit/132278c062.3.9
[Feature][Connector-V2] Piamon Sink supports changelog-procuder is lookup and full-compaction mode (#7834)https://github.com/apache/seatunnel/commit/c0f27c2f72.3.9
[Fix][connector-v2]Fix Paimon table connector Error log information. (#7873)https://github.com/apache/seatunnel/commit/a3b49e6352.3.9
[Improve][Connector-v2] Use checkpointId as the commit's identifier instead of the hash for streaming write of paimon sink (#7835)https://github.com/apache/seatunnel/commit/c7a384af22.3.9
[Feature][Restapi] Allow metrics information to be associated to logical plan nodes (#7786)https://github.com/apache/seatunnel/commit/6b7c53d032.3.9
[Fix][Connecotr-V2] Fix paimon dynamic bucket tale in primary key is not first (#7728)https://github.com/apache/seatunnel/commit/dc7f695532.3.8
[Improve][Connector-v2] Remove useless code and add changelog doc for paimon sink (#7748)https://github.com/apache/seatunnel/commit/846d876dc2.3.8
[Hotfix][Connector-V2] Release resources even the task is crashed for paimon sink (#7726)https://github.com/apache/seatunnel/commit/5ddf8d4612.3.8
[Fix][Connector-V2] Fix paimon e2e error (#7721)https://github.com/apache/seatunnel/commit/61d1964362.3.8
[Feature][Connector-Paimon] Support dynamic bucket splitting improves Paimon writing efficiency (#7335)https://github.com/apache/seatunnel/commit/bc0326cba2.3.8
[Feature][Connector-v2] Support streaming read for paimon (#7681)https://github.com/apache/seatunnel/commit/4a2e272912.3.8
[Hotfix][Seatunnel-common] Fix the CommonError msg for paimon sink (#7591)https://github.com/apache/seatunnel/commit/d1f5db9252.3.8
[Feature][CONNECTORS-V2-Paimon] Paimon Sink supported truncate table (#7560)https://github.com/apache/seatunnel/commit/4f3df22122.3.8
[Improve][Connector-v2] Improve the exception msg in case-sensitive case for paimon sink (#7549)https://github.com/apache/seatunnel/commit/7d31e56682.3.8
[Hotfix][Connector-V2] Fixed lost data precision for decimal data types (#7527)https://github.com/apache/seatunnel/commit/df210ea732.3.8
[Improve][API] Move catalog open to SaveModeHandler (#7439)https://github.com/apache/seatunnel/commit/8c2c5c79a2.3.8
[Improve][Connector] Add multi-table sink option check (#7360)https://github.com/apache/seatunnel/commit/2489f64462.3.7
The isNullable attribute is true when the primary key field in the Paimon table converts the Column object. #7231 (#7242)https://github.com/apache/seatunnel/commit/b0fe432e92.3.6
[Feature][Core] Support using upstream table placeholders in sink options and auto replacement (#7131)https://github.com/apache/seatunnel/commit/c4ca741222.3.6
[Paimon]support projection for paimon source (#6343)https://github.com/apache/seatunnel/commit/6c15772672.3.6
[Improve][Paimon] Add check for the base type between source and sink before write. (#6953)https://github.com/apache/seatunnel/commit/d56d64fc02.3.6
[Improve][Connector-V2] Improve the paimon source (#6887)https://github.com/apache/seatunnel/commit/658643ae52.3.6
[Hotfix][Connector-V2] Close the tableWrite when task is close (#6897)https://github.com/apache/seatunnel/commit/23a744b9b2.3.6
[Fix][Connector-V2] Field information lost during Paimon DataType and SeaTunnel Column conversion (#6767)https://github.com/apache/seatunnel/commit/6cf6e41da2.3.6
[Improve][Connector-V2] Support hive catalog for paimon sink (#6833)https://github.com/apache/seatunnel/commit/4969c91dc2.3.6
[Hotfix][Connector-V2] Fix the batch write with paimon (#6865)https://github.com/apache/seatunnel/commit/9ec971d942.3.6
[Feature][Doris] Add Doris type converter (#6354)https://github.com/apache/seatunnel/commit/5189991842.3.6
[Improve][Connector-V2] Support hadoop ha and kerberos for paimon sink (#6585)https://github.com/apache/seatunnel/commit/20b62f3bf2.3.5
[Feature][Paimon] Support specify paimon table write properties, partition keys and primary keys (#6535)https://github.com/apache/seatunnel/commit/2b1234c7a2.3.5
[Feature][Connector-V2] Support multi-table sink feature for paimon #5652 (#6449)https://github.com/apache/seatunnel/commit/b0abbd2d82.3.5
[Feature][Connectors-v2-Paimon] Adaptation Paimon 0.6 Version (#6061)https://github.com/apache/seatunnel/commit/b32df930e2.3.4
[Fix][Connectors-v2-Paimon] Flink table store failed to prepare commit (#6057)https://github.com/apache/seatunnel/commit/c8dcefc3b2.3.4
[Improve][Common] Introduce new error define rule (#5793)https://github.com/apache/seatunnel/commit/9d1b2582b2.3.4
[Improve] Remove use SeaTunnelSink::getConsumedType method and mark it as deprecated (#5755)https://github.com/apache/seatunnel/commit/8de7408102.3.4
[Hotfix][Connector-V2][Paimon] Bump paimon-bundle version to 0.4.0-incubating (#5219)https://github.com/apache/seatunnel/commit/2917542bf2.3.3
[Improve] Documentation and partial word optimization. (#4936)https://github.com/apache/seatunnel/commit/6e8de0e2a2.3.3
[Connector-V2][Paimon] Introduce paimon connector (#4178)https://github.com/apache/seatunnel/commit/da507bbe02.3.2