跳到主要内容
版本:Next

PostgreSql

JDBC PostgreSql 数据接收器

支持的引擎

Spark
Flink
SeaTunnel Zeta

描述

通过 JDBC 写入数据。支持批处理模式和流式模式,支持并发写入,支持精确一次语义(使用 XA 事务保证)。

使用依赖

  1. 您需要确保 jdbc 驱动 jar 包 已放置在目录 ${SEATUNNEL_HOME}/plugins/ 中。

对于 SeaTunnel Zeta 引擎

  1. 您需要确保 jdbc 驱动 jar 包 已放置在目录 ${SEATUNNEL_HOME}/lib/ 中。

主要特性

使用 XA 事务 来确保 精确一次。因此,仅对支持 XA 事务 的数据库支持 精确一次。您可以设置 is_exactly_once=true 来启用此功能。

支持的数据源信息

数据源支持的版本驱动URLMaven
PostgreSQL不同的依赖版本有不同的驱动类。org.postgresql.Driverjdbc:postgresql://localhost:5432/test下载
PostgreSQL如果您想在 PostgreSQL 中处理 GEOMETRY 类型。org.postgresql.Driverjdbc:postgresql://localhost:5432/test下载

数据库依赖

请下载与 'Maven' 对应的支持列表,并将其复制到 '$SEATUNNEL_HOME/plugins/jdbc/lib/' 工作目录中。
例如 PostgreSQL 数据源:cp postgresql-xxx.jar $SEATUNNEL_HOME/plugins/jdbc/lib/
如果您想在 PostgreSQL 中处理 GEOMETRY 类型,请将 postgresql-xxx.jarpostgis-jdbc-xxx.jar 添加到 $SEATUNNEL_HOME/plugins/jdbc/lib/ 中。

数据类型映射

PostgreSQL 数据类型SeaTunnel 数据类型
BOOL
BOOLEAN
_BOOL
ARRAY<BOOLEAN>
BYTEA
BYTES
_BYTEA
ARRAY<TINYINT>
INT2
SMALLSERIAL
INT4
SERIAL
INT
_INT2
_INT4
ARRAY<INT>
INT8
BIGSERIAL
BIGINT
_INT8
ARRAY<BIGINT>
FLOAT4
FLOAT
_FLOAT4
ARRAY<FLOAT>
FLOAT8
DOUBLE
_FLOAT8
ARRAY<DOUBLE>
NUMERIC(指定列的列大小>0)DECIMAL(指定列的列大小,获取指定列小数点右侧的数字位数)
NUMERIC(指定列的列大小<0)DECIMAL(38, 18)
BPCHAR
CHARACTER
VARCHAR
TEXT
GEOMETRY
GEOGRAPHY
JSON
JSONB
UUID
STRING
_BPCHAR
_CHARACTER
_VARCHAR
_TEXT
ARRAY<STRING>
TIMESTAMP
TIMESTAMP
TIME
TIME
DATE
DATE
其他数据类型目前不支持

选项

名称类型必填默认描述
urlString-JDBC 连接的 URL。参见示例:jdbc:postgresql://localhost:5432/test
如果您使用 json 或 jsonb 类型插入,请添加 jdbc url 字符串 stringtype=unspecified 选项。
driverString-用于连接远程数据源的 JDBC 类名,
如果使用 PostgreSQL,则该值为 org.postgresql.Driver
userString-连接实例的用户名。
passwordString-连接实例的密码。
queryString-使用此 SQL 将上游输入数据写入数据库。例如 INSERT ...query 的优先级更高。
databaseString-使用此 databasetable-name 自动生成 SQL,并接收上游输入数据写入数据库。
此选项与 query 互斥,并具有更高的优先级。
tableString-使用数据库和此表名自动生成 SQL,并接收上游输入数据写入数据库。
此选项与 query 互斥,并具有更高的优先级。表参数可以填写一个不想的表的名称,最终将作为创建表的表名,并支持变量(${table_name}${schema_name})。替换规则: ${schema_name} 将替换为传递给目标端的 SCHEMA 名称,${table_name} 将替换为传递给目标端的表名称。
primary_keysArray-此选项用于支持在自动生成 SQL 时进行 insertdeleteupdate 操作。
support_upsert_by_query_primary_key_existBooleanfalse选择使用 INSERT SQL,UPDATE SQL 根据查询主键存在来处理更新事件(INSERT,UPDATE_AFTER)。此配置仅在数据库不支持 upsert 语法时使用。注意:此方法性能较低。
connection_check_timeout_secInt30用于验证连接的数据库操作完成的等待时间(秒)。
max_retriesInt0提交失败的重试次数(executeBatch)。
batch_sizeInt1000对于批量写入,当缓冲记录的数量达到 batch_size 或时间达到 checkpoint.interval
时,数据将刷新到数据库。
is_exactly_onceBooleanfalse是否启用精确一次语义,将使用 XA 事务。如果启用,您需要
设置 xa_data_source_class_name
generate_sink_sqlBooleanfalse根据要写入的数据库表生成 SQL 语句。
xa_data_source_class_nameString-数据库驱动的 XA 数据源类名,例如,PostgreSQL 是 org.postgresql.xa.PGXADataSource,并
请参阅附录以获取其他数据源。
max_commit_attemptsInt3事务提交失败的重试次数。
transaction_timeout_secInt-1事务开启后的超时时间,默认值为 -1(永不超时)。注意设置超时可能会影响
精确一次语义。
auto_commitBooleantrue默认启用自动事务提交。
field_ideString-识别字段在从源到汇的同步时是否需要转换。ORIGINAL 表示无需转换;UPPERCASE 表示转换为大写;LOWERCASE 表示转换为小写。
propertiesMap-附加连接配置参数,当 properties 和 URL 具有相同参数时,优先级由
驱动的具体实现决定。例如,在 MySQL 中,properties 优先于 URL。
common-options-Sink 插件的公共参数,请参阅 Sink 公共选项 以获取详细信息。
schema_save_modeEnumCREATE_SCHEMA_WHEN_NOT_EXIST在同步任务开启之前,根据目标端现有表结构选择不同处理方案。
data_save_modeEnumAPPEND_DATA在同步任务开启之前,根据目标端现有数据选择不同处理方案。
custom_sqlString-data_save_mode 选择 CUSTOM_PROCESSING 时,您应该填写 CUSTOM_SQL 参数。此参数通常填入可执行的 SQL。SQL 将在同步任务之前执行。
enable_upsertBooleantrue通过主键存在启用 upsert,如果任务没有重复数据,设置此参数为 false 可以加快数据导入。

table [字符串]

使用 database 和此 table-name 自动生成 SQL,并接收上游输入数据写入数据库。

此选项与 query 互斥,并具有更高的优先级。

表参数可以填写一个不想的表的名称,最终将作为创建表的表名,并支持变量(${table_name}${schema_name})。替换规则:${schema_name} 将替换为传递给目标端的 SCHEMA 名称,${table_name} 将替换为传递给目标端的表名称。

例如:

  1. ${schema_name}.${table_name} _test
  2. dbo.tt_${table_name} _sink
  3. public.sink_table

schema_save_mode [枚举]

在同步任务开启之前,根据目标端现有表结构选择不同处理方案。
选项介绍:
RECREATE_SCHEMA :当表不存在时将创建,保存时删除并重建。
CREATE_SCHEMA_WHEN_NOT_EXIST :当表不存在时创建,保存时跳过。
ERROR_WHEN_SCHEMA_NOT_EXIST :当表不存在时报告错误。
IGNORE :忽略对表的处理。

data_save_mode [枚举]

在同步任务开启之前,根据目标端现有数据选择不同处理方案。
选项介绍:
DROP_DATA:保留数据库结构并删除数据。
APPEND_DATA:保留数据库结构,保留数据。
CUSTOM_PROCESSING:用户定义处理。
ERROR_WHEN_DATA_EXISTS:当存在数据时报告错误。

custom_sql [字符串]

data_save_mode 选择 CUSTOM_PROCESSING 时,您应该填写 CUSTOM_SQL 参数。此参数通常填入可以执行的 SQL。SQL 将在同步任务之前执行。

提示

如果未设置 partition_column,它将以单线程并发运行;如果设置了 partition_column,它将根据任务的并发性并行执行。

任务示例

简单示例:

此示例定义了一个 SeaTunnel 同步任务,通过 FakeSource 自动生成数据并将其发送到 JDBC Sink。FakeSource 生成总共 16 行数据(row.num=16),每行有两个字段,name(字符串类型)和 age(整数类型)。最终目标表 test_table 也将包含 16 行数据。在运行此作业之前,您需要在 PostgreSQL 中创建数据库 test 和表 test_table。如果您还未安装和部署 SeaTunnel,请按照 安装 SeaTunnel 中的说明进行安装和部署。然后按照 快速开始 SeaTunnel 引擎 中的说明运行此作业。

# Defining the runtime environment
env {
parallelism = 1
job.mode = "BATCH"
}

source {
FakeSource {
parallelism = 1
plugin_output = "fake"
row.num = 16
schema = {
fields {
name = "string"
age = "int"
}
}
}
# If you would like to get more information about how to configure seatunnel and see full list of source plugins,
# please go to https://seatunnel.apache.org/docs/connector-v2/source
}

transform {
# If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
# please go to https://seatunnel.apache.org/docs/category/transform-v2
}

sink {
jdbc {
# if you would use json or jsonb type insert please add jdbc url stringtype=unspecified option
url = "jdbc:postgresql://localhost:5432/test"
driver = "org.postgresql.Driver"
user = root
password = 123456
query = "insert into test_table(name,age) values(?,?)"
}
# If you would like to get more information about how to configure seatunnel and see full list of sink plugins,
# please go to https://seatunnel.apache.org/docs/connector-v2/sink
}

生成 Sink SQL

此示例不需要编写复杂的 SQL 语句,您可以配置数据库名称和表名称,系统将自动为您生成添加语句。

sink {
Jdbc {
# if you would use json or jsonb type insert please add jdbc url stringtype=unspecified option
url = "jdbc:postgresql://localhost:5432/test"
driver = org.postgresql.Driver
user = root
password = 123456

generate_sink_sql = true
database = test
table = "public.test_table"
}
}

精确一次:

对于精确写入场景,我们保证精确一次。

sink {
jdbc {
# if you would use json or jsonb type insert please add jdbc url stringtype=unspecified option
url = "jdbc:postgresql://localhost:5432/test"
driver = "org.postgresql.Driver"

max_retries = 0
user = root
password = 123456
query = "insert into test_table(name,age) values(?,?)"

is_exactly_once = "true"

xa_data_source_class_name = "org.postgresql.xa.PGXADataSource"
}
}

CDC(变更数据捕获)事件

我们也支持 CDC 变更数据。在这种情况下,您需要配置数据库、表和主键。

sink {
jdbc {
# if you would use json or jsonb type insert please add jdbc url stringtype=unspecified option
url = "jdbc:postgresql://localhost:5432/test"
driver = "org.postgresql.Driver"
user = root
password = 123456

generate_sink_sql = true
# You need to configure both database and table
database = test
table = sink_table
primary_keys = ["id","name"]
field_ide = UPPERCASE
}
}

保存模式功能

sink {
Jdbc {
# if you would use json or jsonb type insert please add jdbc url stringtype=unspecified option
url = "jdbc:postgresql://localhost:5432/test"
driver = org.postgresql.Driver
user = root
password = 123456

generate_sink_sql = true
database = test
table = "public.test_table"
schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"
data_save_mode="APPEND_DATA"
}
}