跳到主要内容
版本:Next

SQLServer

JDBC SQLServer Sink 连接器

支持的 SQL Server 版本

  • server:2008(或更高版本,仅供参考)

支持的引擎

Spark
Flink
SeaTunnel Zeta

描述

通过 JDBC 写入数据。支持批处理和流处理模式,支持并发写入,支持精确一次语义(使用 XA 事务保证)。

使用依赖

  1. 需要确保 jdbc 驱动 jar 包 已放置在目录 ${SEATUNNEL_HOME}/plugins/ 中。

对于 SeaTunnel Zeta 引擎

  1. 需要确保 jdbc 驱动 jar 包 已放置在目录 ${SEATUNNEL_HOME}/lib/ 中。

主要特性

使用 Xa 事务 来保证 精确一次。因此仅支持支持 Xa 事务 的数据库。可以通过设置 is_exactly_once=true 来启用。

支持的数据源信息

数据源支持的版本驱动类名URL 格式Maven 依赖
SQL Server支持版本 >= 2008com.microsoft.sqlserver.jdbc.SQLServerDriverjdbc:sqlserver://localhost:1433下载

数据库依赖

请下载支持列表中对应的 'Maven' 依赖,并将其复制到 $SEATUNNEL_HOME/plugins/jdbc/lib/ 工作目录中。
例如 SQL Server 数据源:cp mssql-jdbc-xxx.jar $SEATUNNEL_HOME/plugins/jdbc/lib/

数据类型映射

SQL Server 数据类型SeaTunnel 数据类型
BITBOOLEAN
TINYINT
SMALLINT
SHORT
INTEGERINT
BIGINTLONG
DECIMAL
NUMERIC
MONEY
SMALLMONEY
DECIMAL((获取指定列的列大小)+1,
(获取指定列的小数点右侧的位数)))
REALFLOAT
FLOATDOUBLE
CHAR
NCHAR
VARCHAR
NTEXT
NVARCHAR
TEXT
STRING
DATELOCAL_DATE
TIMELOCAL_TIME
DATETIME
DATETIME2
SMALLDATETIME
DATETIMEOFFSET
LOCAL_DATE_TIME
TIMESTAMP
BINARY
VARBINARY
IMAGE
UNKNOWN
尚未支持

接收器选项

名称类型是否必填默认值描述
urlString-JDBC 连接的 URL。参考示例:jdbc:sqlserver://localhost:1433;databaseName=mydatabase
driverString-用于连接远程数据源的 JDBC 类名,如果使用 SQL Server,值为 com.microsoft.sqlserver.jdbc.SQLServerDriver
userString-连接实例的用户名
passwordString-连接实例的密码
queryString-使用此 SQL 将上游输入数据写入数据库。例如 INSERT ...query 优先级更高。
databaseString-使用此 databasetable-name 自动生成 SQL 并接收上游输入数据写入数据库。此选项与 query 互斥,且优先级更高。
tableString-使用 database 和此 table-name 自动生成 SQL 并接收上游输入数据写入数据库。此选项与 query 互斥,且优先级更高。
primary_keysArray-此选项用于在自动生成 SQL 时支持 insertdeleteupdate 等操作。
support_upsert_by_query_primary_key_existBooleanfalse选择使用 INSERT SQL、UPDATE SQL 来处理更新事件(INSERT, UPDATE_AFTER),基于查询主键是否存在。此配置仅在数据库不支持 upsert 语法时使用。注意:此方法性能较低。
connection_check_timeout_secInt30用于验证连接完成的数据库操作的等待时间(秒)。
max_retriesInt0提交失败(executeBatch)的重试次数。
batch_sizeInt1000对于批量写入,当缓冲的记录数达到 batch_size 或时间达到 checkpoint.interval 时,数据将被刷新到数据库中。
is_exactly_onceBooleanfalse是否启用精确一次语义,将使用 Xa 事务。如果启用,需要设置 xa_data_source_class_name
generate_sink_sqlBooleanfalse根据要写入的数据库表生成 SQL 语句。
xa_data_source_class_nameString-数据库驱动的 XA 数据源类名,例如 SQL Server 为 com.microsoft.sqlserver.jdbc.SQLServerXADataSource,其他数据源请参考附录。
max_commit_attemptsInt3事务提交失败的重试次数。
transaction_timeout_secInt-1事务打开后的超时时间,默认为 -1(永不超时)。注意:设置超时可能会影响精确一次语义。
auto_commitBooleantrue默认启用自动事务提交。
common-options-接收器插件通用参数,详情请参考 Sink Common Options
enable_upsertBooleantrue通过主键存在启用 upsert。如果任务中没有键重复数据,将此参数设置为 false 可以加快数据导入速度。

提示

如果未设置 partition_column,将以单并发运行;如果设置了 partition_column,将根据任务的并发度并行执行。

任务示例

简单示例:

这是一个读取 SQL Server 数据并直接插入到另一个表的示例

env {
# 可以在此设置引擎配置
parallelism = 10
}

source {
# 这是一个示例源插件,**仅用于测试和演示功能**
Jdbc {
driver = com.microsoft.sqlserver.jdbc.SQLServerDriver
url = "jdbc:sqlserver://localhost:1433;databaseName=column_type_test"
user = SA
password = "Y.sa123456"
query = "select * from column_type_test.dbo.full_types_jdbc"
# 并行分片读取字段
partition_column = "id"
# 分片数量
partition_num = 10
}
# 如果想了解更多关于如何配置 SeaTunnel 的信息,并查看完整的源插件列表,
# 请访问 https://seatunnel.apache.org/docs/connector-v2/source/Jdbc
}

transform {
# 如果想了解更多关于如何配置 SeaTunnel 的信息,并查看完整的转换插件列表,
# 请访问 https://seatunnel.apache.org/docs/transform-v2/sql
}

sink {
Jdbc {
driver = com.microsoft.sqlserver.jdbc.SQLServerDriver
url = "jdbc:sqlserver://localhost:1433;databaseName=column_type_test"
user = SA
password = "Y.sa123456"
query = "insert into full_types_jdbc_sink( id, val_char, val_varchar, val_text, val_nchar, val_nvarchar, val_ntext, val_decimal, val_numeric, val_float, val_real, val_smallmoney, val_money, val_bit, val_tinyint, val_smallint, val_int, val_bigint, val_date, val_time, val_datetime2, val_datetime, val_smalldatetime ) values( ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ? )"
}
# 如果想了解更多关于如何配置 SeaTunnel 的信息,并查看完整的接收器插件列表,
# 请访问 https://seatunnel.apache.org/docs/connector-v2/sink/Jdbc
}

CDC(变更数据捕获)事件

我们也支持 CDC 变更数据。在这种情况下,需要配置 databasetableprimary_keys

Jdbc {
plugin_input = "customers"
driver = com.microsoft.sqlserver.jdbc.SQLServerDriver
url = "jdbc:sqlserver://localhost:1433;databaseName=column_type_test"
user = SA
password = "Y.sa123456"
generate_sink_sql = true
database = "column_type_test"
table = "dbo.full_types_sink"
batch_size = 100
primary_keys = ["id"]
}

精确一次接收器

事务性写入可能较慢,但数据更准确

Jdbc {
driver = com.microsoft.sqlserver.jdbc.SQLServerDriver
url = "jdbc:sqlserver://localhost:1433;databaseName=column_type_test"
user = SA
password = "Y.sa123456"
query = "insert into full_types_jdbc_sink( id, val_char, val_varchar, val_text, val_nchar, val_nvarchar, val_ntext, val_decimal, val_numeric, val_float, val_real, val_smallmoney, val_money, val_bit, val_tinyint, val_smallint, val_int, val_bigint, val_date, val_time, val_datetime2, val_datetime, val_smalldatetime ) values( ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ? )"
is_exactly_once = "true"
xa_data_source_class_name = "com.microsoft.sqlserver.jdbc.SQLServerXADataSource"
}

# 如果想了解更多关于如何配置 SeaTunnel 的信息,并查看完整的接收器插件列表,
# 请访问 https://seatunnel.apache.org/docs/connector-v2/sink/Jdbc