Jdbc
JDBC sink connector
Descriptionâ
Write data through jdbc
Engine Supported and plugin name
- Spark: Jdbc
- Flink: Jdbc
Optionsâ
- Spark
- Flink
name | type | required | default value |
---|---|---|---|
driver | string | yes | - |
url | string | yes | - |
user | string | yes | - |
password | string | yes | - |
dbTable | string | yes | - |
saveMode | string | no | update |
useSsl | string | no | false |
customUpdateStmt | string | no | - |
duplicateIncs | string | no | - |
showSql | string | no | true |
url [string]â
The URL of the JDBC connection. Refer to a case: jdbc:mysql://localhost/dbName
user [string]â
username
password [string]â
user password
dbTable [string]â
Sink table name, if the table does not exist, it will be created.
saveMode [string]â
Storage mode, add mode update
, perform data overwrite in a specified way when inserting data key conflicts
Basic mode, currently supports overwrite
, append
, ignore
and error
. For the specific meaning of each mode, see save-modes
useSsl [string]â
Configure when saveMode
is specified as update
, whether to enable ssl, the default value is false
isolationLevel [string]â
The transaction isolation level, which applies to current connection. The default value is READ_UNCOMMITTED
customUpdateStmt [string]â
Configure when saveMode
is specified as update
, which is used to specify the update statement template for key conflicts.
If customUpdateStmt
is empty, the sql will auto-generate for all columns, else use the sql which refer to the usage of
INSERT INTO table (...) values (...) ON DUPLICATE KEY UPDATE...
of mysql
, use placeholders or fixed values in values
tips: the tableName of sql should be consistent with the dbTable
.
duplicateIncs [string]â
Configure when saveMode
is specified as update
, and when the specified key conflicts, the value is updated to the existing value plus the original value
showSqlâ
Configure when saveMode
is specified as update
, whether to show sql
name | type | required | default value |
---|---|---|---|
driver | string | yes | - |
url | string | yes | - |
username | string | yes | - |
password | string | no | - |
query | string | yes | - |
batch_size | int | no | - |
source_table_name | string | yes | - |
common-options | string | no | - |
parallelism | int | no | - |
pre_sql | string | no | - |
post_sql | string | no | - |
ignore_post_sql_exceptions | boolean | no | - |
driver [string]â
Driver name, such as com.mysql.cj.jdbc.Driver
for MySQL.
Warn: for license compliance, you have to provide MySQL JDBC driver yourself, e.g. copy mysql-connector-java-xxx.jar
to $FLINK_HOME/lib
for Standalone.
url [string]â
The URL of the JDBC connection. Such as: jdbc:mysql://localhost:3306/test
username [string]â
username
password [string]â
password
query [string]â
Insert statement
batch_size [int]â
Number of writes per batch
parallelism [int]â
The parallelism of an individual operator, for JdbcSink.
pre_sql [string]â
This sql can be executed before output.
post_sql [string]â
This sql can be executed after output, and just supports for batch job.
ignore_post_sql_exceptions [boolean]â
Whether to ignore post_sql exceptions.
common options [string]â
Sink plugin common parameters, please refer to Sink Plugin for details
Examplesâ
- Spark
- Flink
jdbc {
driver = "com.mysql.cj.jdbc.Driver",
saveMode = "update",
url = "jdbc:mysql://ip:3306/database",
user = "userName",
password = "***********",
dbTable = "tableName",
customUpdateStmt = "INSERT INTO table (column1, column2, created, modified, yn) values(?, ?, now(), now(), 1) ON DUPLICATE KEY UPDATE column1 = IFNULL(VALUES (column1), column1), column2 = IFNULL(VALUES (column2), column2)"
}
Insert data through JDBC
jdbc {
driver = "com.mysql.cj.jdbc.Driver",
saveMode = "update",
truncate = "true",
url = "jdbc:mysql://ip:3306/database",
user = "userName",
password = "***********",
dbTable = "tableName",
customUpdateStmt = "INSERT INTO tableName (column1, column2, created, modified, yn) values(?, ?, now(), now(), 1) ON DUPLICATE KEY UPDATE column1 = IFNULL(VALUES (column1), column1), column2 = IFNULL(VALUES (column2), column2)"
jdbc.connect_timeout = 10000
jdbc.socket_timeout = 10000
}
Timeout config
JdbcSink {
source_table_name = fake
driver = com.mysql.cj.jdbc.Driver
url = "jdbc:mysql://localhost/test"
username = root
query = "insert into test(name,age) values(?,?)"
batch_size = 2
}