SQL Server CDC
Sql Server CDC source connector
Support SQL Server Version
- server:2019 (Or later version for information only)
Support Those Engines
SeaTunnel Zeta
Flink
Key Features
Description
The Sql Server CDC connector allows for reading snapshot data and incremental data from SqlServer database. This document describes how to setup the Sql Server CDC connector to run SQL queries against SqlServer databases.
Supported DataSource Info
Datasource | Supported versions | Driver | Url | Maven |
---|---|---|---|---|
SqlServer | com.microsoft.sqlserver.jdbc.SQLServerDriver | jdbc:sqlserver://localhost:1433;databaseName=column_type_test | https://mvnrepository.com/artifact/com.microsoft.sqlserver/mssql-jdbc |
Using Dependency
Install Jdbc Driver
For Spark/Flink Engine
- You need to ensure that the jdbc driver jar package has been placed in directory
${SEATUNNEL_HOME}/plugins/
.
For SeaTunnel Zeta Engine
- You need to ensure that the jdbc driver jar package has been placed in directory
${SEATUNNEL_HOME}/lib/
.
Data Type Mapping
SQLserver Data Type | SeaTunnel Data Type |
---|---|
CHAR VARCHAR NCHAR NVARCHAR TEXT NTEXT XML | STRING |
BINARY VARBINARY IMAGE | BYTES |
INTEGER INT | INT |
SMALLINT TINYINT | SMALLINT |
BIGINT | BIGINT |
FLOAT(1~24) REAL | FLOAT |
DOUBLE FLOAT(>24) | DOUBLE |
NUMERIC(p,s) DECIMAL(p,s) MONEY SMALLMONEY | DECIMAL(p, s) |
TIMESTAMP | BYTES |
DATE | DATE |
TIME(s) | TIME(s) |
DATETIME(s) DATETIME2(s) DATETIMEOFFSET(s) SMALLDATETIME | TIMESTAMP(s) |
BOOLEAN BIT | BOOLEAN |
Source Options
Name | Type | Required | Default | Description |
---|---|---|---|---|
username | String | Yes | - | Name of the database to use when connecting to the database server. |
password | String | Yes | - | Password to use when connecting to the database server. |
database-names | List | Yes | - | Database name of the database to monitor. |
table-names | List | Yes | - | Table name is a combination of schema name and table name (databaseName.schemaName.tableName). |
table-names-config | List | No | - | Table config list. for example: [{"table": "db1.schema1.table1","primaryKeys": ["key1"],"snapshotSplitColumn": "key2"}] |
base-url | String | Yes | - | URL has to be with database, like "jdbc:sqlserver://localhost:1433;databaseName=test". |
startup.mode | Enum | No | INITIAL | Optional startup mode for SqlServer CDC consumer, valid enumerations are "initial", "earliest", "latest" and "specific". |
startup.timestamp | Long | No | - | Start from the specified epoch timestamp (in milliseconds). Note, This option is required when the "startup.mode" option used 'timestamp' . |
startup.specific-offset.file | String | No | - | Start from the specified binlog file name. Note, This option is required when the "startup.mode" option used 'specific' . |
startup.specific-offset.pos | Long | No | - | Start from the specified binlog file position. Note, This option is required when the "startup.mode" option used 'specific' . |
stop.mode | Enum | No | NEVER | Optional stop mode for SqlServer CDC consumer, valid enumerations are "never". |
stop.timestamp | Long | No | - | Stop from the specified epoch timestamp (in milliseconds). Note, This option is required when the "stop.mode" option used 'timestamp' . |
stop.specific-offset.file | String | No | - | Stop from the specified binlog file name. Note, This option is required when the "stop.mode" option used 'specific' . |
stop.specific-offset.pos | Long | No | - | Stop from the specified binlog file position. Note, This option is required when the "stop.mode" option used 'specific' . |
incremental.parallelism | Integer | No | 1 | The number of parallel readers in the incremental phase. |
snapshot.split.size | Integer | No | 8096 | The split size (number of rows) of table snapshot, captured tables are split into multiple splits when read the snapshotof table. |
snapshot.fetch.size | Integer | No | 1024 | The maximum fetch size for per poll when read table snapshot. |
server-time-zone | String | No | UTC | The session time zone in database server. |
connect.timeout | Duration | No | 30s | The maximum time that the connector should wait after trying to connect to the database server before timing out. |
connect.max-retries | Integer | No | 3 | The max retry times that the connector should retry to build database server connection. |
connection.pool.size | Integer | No | 20 | The connection pool size. |
chunk-key.even-distribution.factor.upper-bound | Double | No | 100 | The upper bound of the chunk key distribution factor. This factor is used to determine whether the table data is evenly distributed. If the distribution factor is calculated to be less than or equal to this upper bound (i.e., (MAX(id) - MIN(id) + 1) / row count), the table chunks would be optimized for even distribution. Otherwise, if the distribution factor is greater, the table will be considered as unevenly distributed and the sampling-based sharding strategy will be used if the estimated shard count exceeds the value specified by sample-sharding.threshold . The default value is 100.0. |
chunk-key.even-distribution.factor.lower-bound | Double | No | 0.05 | The lower bound of the chunk key distribution factor. This factor is used to determine whether the table data is evenly distributed. If the distribution factor is calculated to be greater than or equal to this lower bound (i.e., (MAX(id) - MIN(id) + 1) / row count), the table chunks would be optimized for even distribution. Otherwise, if the distribution factor is less, the table will be considered as unevenly distributed and the sampling-based sharding strategy will be used if the estimated shard count exceeds the value specified by sample-sharding.threshold . The default value is 0.05. |
sample-sharding.threshold | int | No | 1000 | This configuration specifies the threshold of estimated shard count to trigger the sample sharding strategy. When the distribution factor is outside the bounds specified by chunk-key.even-distribution.factor.upper-bound and chunk-key.even-distribution.factor.lower-bound , and the estimated shard count (calculated as approximate row count / chunk size) exceeds this threshold, the sample sharding strategy will be used. This can help to handle large datasets more efficiently. The default value is 1000 shards. |
inverse-sampling.rate | int | No | 1000 | The inverse of the sampling rate used in the sample sharding strategy. For example, if this value is set to 1000, it means a 1/1000 sampling rate is applied during the sampling process. This option provides flexibility in controlling the granularity of the sampling, thus affecting the final number of shards. It's especially useful when dealing with very large datasets where a lower sampling rate is preferred. The default value is 1000. |
exactly_once | Boolean | No | false | Enable exactly once semantic. |
debezium.* | config | No | - | Pass-through Debezium's properties to Debezium Embedded Engine which is used to capture data changes from SqlServer server. See more about the Debezium's SqlServer Connector properties |
format | Enum | No | DEFAULT | Optional output format for SqlServer CDC, valid enumerations are "DEFAULT"、"COMPATIBLE_DEBEZIUM_JSON". |
common-options | no | - | Source plugin common parameters, please refer to Source Common Options for details. |
Enable Sql Server CDC
- Check whether the CDC Agent is enabled
EXEC xp_servicecontrol N'querystate', N'SQLServerAGENT';
If the result is running, prove that it is enabled. Otherwise, you need to manually enable it
- Enable the CDC Agent
/opt/mssql/bin/mssql-conf setup
- The result is as follows
1) Evaluation (free, no production use rights, 180-day limit) 2) Developer (free, no production use rights) 3) Express (free) 4) Web (PAID) 5) Standard (PAID) 6) Enterprise (PAID) 7) Enterprise Core (PAID) 8) I bought a license through a retail sales channel and have a product key to enter.
- Set the CDC at the library level Set the library level below to enable CDC. At this level, all tables under the libraries of the enabled CDC automatically enable CDC
USE TestDB; -- Replace with the actual database name
EXEC sys.sp_cdc_enable_db;
SELECT name, is_tracked_by_cdc FROM sys.tables WHERE name = 'table'; -- table Replace with the name of the table you want to check
Task Example
initiali read Simple
This is a stream mode cdc initializes read table data will be read incrementally after successful read The following sql DDL is for reference only
env {
# You can set engine configuration here
parallelism = 1
job.mode = "STREAMING"
checkpoint.interval = 5000
}
source {
# This is a example source plugin **only for test and demonstrate the feature source plugin**
SqlServer-CDC {
plugin_output = "customers"
username = "sa"
password = "Y.sa123456"
startup.mode="initial"
database-names = ["column_type_test"]
table-names = ["column_type_test.dbo.full_types"]
base-url = "jdbc:sqlserver://localhost:1433;databaseName=column_type_test"
}
}
transform {
}
sink {
console {
plugin_input = "customers"
}
increment read Simple
This is an incremental read that reads the changed data for printing
env {
# You can set engine configuration here
parallelism = 1
job.mode = "STREAMING"
checkpoint.interval = 5000
}
source {
# This is a example source plugin **only for test and demonstrate the feature source plugin**
SqlServer-CDC {
# Set up accurate one read
exactly_once=true
plugin_output = "customers"
username = "sa"
password = "Y.sa123456"
startup.mode="latest"
database-names = ["column_type_test"]
table-names = ["column_type_test.dbo.full_types"]
base-url = "jdbc:sqlserver://localhost:1433;databaseName=column_type_test"
}
}
transform {
}
sink {
console {
plugin_input = "customers"
}
Support custom primary key for table
env {
parallelism = 1
job.mode = "STREAMING"
checkpoint.interval = 5000
}
source {
SqlServer-CDC {
base-url = "jdbc:sqlserver://localhost:1433;databaseName=column_type_test"
username = "sa"
password = "Y.sa123456"
database-names = ["column_type_test"]
table-names = ["column_type_test.dbo.simple_types", "column_type_test.dbo.full_types"]
table-names-config = [
{
table = "column_type_test.dbo.full_types"
primaryKeys = ["id"]
}
]
}
}
sink {
console {
}
Changelog
Change Log
Change | Commit | Version |
---|---|---|
[Improve][CDC] Extract duplicate code (#8906) | https://github.com/apache/seatunnel/commit/b922bb90e6 | dev |
[Improve] restruct connector common options (#8634) | https://github.com/apache/seatunnel/commit/f3499a6eeb | dev |
[Improve][dist]add shade check rule (#8136) | https://github.com/apache/seatunnel/commit/51ef800016 | 2.3.9 |
[Improve][Connector-V2] Add pre-check for table enable cdc (#8152) | https://github.com/apache/seatunnel/commit/9a5da78176 | 2.3.9 |
[Improve][Connector-V2] Fix SqlServer cdc memory leak (#8083) | https://github.com/apache/seatunnel/commit/69cd4ae1a2 | 2.3.9 |
[Feature][Connector-V2]Jdbc chunk split add snapshotSplitColumn config #7794 (#7840) | https://github.com/apache/seatunnel/commit/b6c6dc0438 | 2.3.9 |
[Feature][Core] Support cdc task ddl restore for zeta (#7463) | https://github.com/apache/seatunnel/commit/8e322281ed | 2.3.9 |
[Feature][Connector-V2] SqlServer support user-defined type (#7706) | https://github.com/apache/seatunnel/commit/fb89033273 | 2.3.8 |
[Improve][Connector-V2] Optimize sqlserver package structure (#7715) | https://github.com/apache/seatunnel/commit/9720f118e5 | 2.3.8 |
[Hotfix][CDC] Fix package name spelling mistake (#7415) | https://github.com/apache/seatunnel/commit/469112fa64 | 2.3.8 |
[Improve][CDC] Bump the version of debezium to 1.9.8.Final (#6740) | https://github.com/apache/seatunnel/commit/c3ac953524 | 2.3.6 |
[Improve][CDC] Close idle subtasks gorup(reader/writer) in increment phase (#6526) | https://github.com/apache/seatunnel/commit/454c339b9c | 2.3.6 |
[Improve][JDBC Source] Fix Split can not be cancel (#6825) | https://github.com/apache/seatunnel/commit/ee3b7c3723 | 2.3.6 |
[Hotfix][Jdbc/CDC] Fix postgresql uuid type in jdbc read (#6684) | https://github.com/apache/seatunnel/commit/868ba4d7c7 | 2.3.6 |
[Improve] Improve read table schema in cdc connector (#6702) | https://github.com/apache/seatunnel/commit/a8c6cc6e0c | 2.3.6 |
[Improve][Jdbc] Add quote identifier for sql (#6669) | https://github.com/apache/seatunnel/commit/849d748d3d | 2.3.5 |
[Improve][CDC] Optimize split state memory allocation in increment phase (#6554) | https://github.com/apache/seatunnel/commit/fe33422161 | 2.3.5 |
[Fix][Connector-V2] Fix connector support SPI but without no args constructor (#6551) | https://github.com/apache/seatunnel/commit/5f3c9c36a5 | 2.3.5 |
[Improve][CDC-Connector]Fix CDC option rule. (#6454) | https://github.com/apache/seatunnel/commit/1ea27afa87 | 2.3.5 |
[Improve][CDC] Optimize memory allocation for snapshot split reading (#6281) | https://github.com/apache/seatunnel/commit/4856645837 | 2.3.5 |
[Improve][API] Unify type system api(data & type) (#5872) | https://github.com/apache/seatunnel/commit/b38c7edcc9 | 2.3.5 |
[Improve] Support int identity type in sql server (#6186) | https://github.com/apache/seatunnel/commit/1a8da1c843 | 2.3.4 |
[Feature][CDC] Support custom table primary key (#6106) | https://github.com/apache/seatunnel/commit/1312a1dd27 | 2.3.4 |
[Feature][CDC] Support read no primary key table (#6098) | https://github.com/apache/seatunnel/commit/b42d78de3f | 2.3.4 |
[Hotfix][Jdbc] Fix jdbc setFetchSize error (#6005) | https://github.com/apache/seatunnel/commit/d41af8a6ed | 2.3.4 |
[Bug][CDC] Fix state recovery error when switching a single table to multiple tables (#5784) | https://github.com/apache/seatunnel/commit/37fcff347e | 2.3.4 |
[Improve][CDC] Clean unused code (#5785) | https://github.com/apache/seatunnel/commit/b5a66d3dbe | 2.3.4 |
[Improve][Jdbc] Fix database identifier (#5756) | https://github.com/apache/seatunnel/commit/dbfc8a670a | 2.3.4 |
[improve][connector-v2][sqlserver-cdc]Unified sqlserver TypeUtils type conversion mode (#5668) | https://github.com/apache/seatunnel/commit/75b814bc3d | 2.3.4 |
[feature][connector-cdc-sqlserver] add dataType datetimeoffset (#5548) | https://github.com/apache/seatunnel/commit/0cf63eed6d | 2.3.4 |
[Improve] Remove catalog tag for config file (#5645) | https://github.com/apache/seatunnel/commit/dc509aa080 | 2.3.4 |
[Improve] Refactor CatalogTable and add SeaTunnelSource::getProducedCatalogTables (#5562) | https://github.com/apache/seatunnel/commit/41173357f8 | 2.3.4 |
[Imporve][CDC Base] Add a fast sampling method that supports character types (#5179) | https://github.com/apache/seatunnel/commit/c0422dbfeb | 2.3.3 |
[improve][CDC Base] Add some split parameters to the optionRule (#5161) | https://github.com/apache/seatunnel/commit/94fd6755e6 | 2.3.3 |
[Feature][Connector-V2][CDC] Support string type shard fields. (#5147) | https://github.com/apache/seatunnel/commit/e1be9d7f8a | 2.3.3 |
[Feature][CDC] Support tables without primary keys (with unique keys) (#163) (#5150) | https://github.com/apache/seatunnel/commit/32b7f2b690 | 2.3.3 |
[Bugfix][zeta] Fix cdc connection does not close (#4922) | https://github.com/apache/seatunnel/commit/a2d2f2dda8 | 2.3.3 |
[Feature][CDC] Support disable/enable exactly once for INITIAL (#4921) | https://github.com/apache/seatunnel/commit/6d9a3e5957 | 2.3.3 |
[Improve][CDC]change driver scope to provider (#5002) | https://github.com/apache/seatunnel/commit/745c0b9e92 | 2.3.3 |
[Improve][CDC]Remove driver for cdc connector (#4952) | https://github.com/apache/seatunnel/commit/b65f40c3c9 | 2.3.3 |
[Bugfix][zeta] Fix the deadlock issue with JDBC driver loading (#4878) | https://github.com/apache/seatunnel/commit/c30a2a1b1c | 2.3.2 |
[improve][CDC base] Implement Sample-based Sharding Strategy with Configurable Sampling Rate (#4856) | https://github.com/apache/seatunnel/commit/d827c700f0 | 2.3.2 |
[Bugfix][CDC Base] Solving the ConcurrentModificationException caused by snapshotState being modified concurrently. (#4877) | https://github.com/apache/seatunnel/commit/9a2efa51c7 | 2.3.2 |
[Hotfix][CDC] Fix chunk start/end parameter type error (#4777) | https://github.com/apache/seatunnel/commit/c13c031995 | 2.3.2 |
[Feature][CDC][SqlServer] Support multi-table read (#4377) | https://github.com/apache/seatunnel/commit/c4e3f2dc03 | 2.3.2 |
[Improve][CDC] Optimize jdbc fetch-size options (#4352) | https://github.com/apache/seatunnel/commit/fbb60ce1be | 2.3.1 |
[Improve][CDC] Improve startup.mode/stop.mode options (#4360) | https://github.com/apache/seatunnel/commit/b71d8739d5 | 2.3.1 |
[Improve][CDC] Optimize options & add docs for compatible_debezium_json (#4351) | https://github.com/apache/seatunnel/commit/336f590498 | 2.3.1 |
Update CDC StartupMode and StopMode option to SingleChoiceOption (#4357) | https://github.com/apache/seatunnel/commit/f60ac1a5e9 | 2.3.1 |
[bugfix][cdc-base] Fix cdc base shutdown thread not cleared (#4327) | https://github.com/apache/seatunnel/commit/ac61409bd8 | 2.3.1 |
[improve][zeta] fix zeta bugs | https://github.com/apache/seatunnel/commit/3a82e8b39f | 2.3.1 |
[Improve] Support MySqlCatalog Use JDBC URL With Custom Suffix | https://github.com/apache/seatunnel/commit/210d0ff1f8 | 2.3.1 |
Merge branch 'dev' into merge/cdc | https://github.com/apache/seatunnel/commit/4324ee1912 | 2.3.1 |
[Improve][Project] Code format with spotless plugin. | https://github.com/apache/seatunnel/commit/423b583038 | 2.3.1 |
[improve][cdc] support sharding-tables (#4207) | https://github.com/apache/seatunnel/commit/5c3f0c9b00 | 2.3.1 |
[Hotfix][CDC] Fix multiple-table data read (#4200) | https://github.com/apache/seatunnel/commit/7f5671d2ce | 2.3.1 |
[Improve][build] Give the maven module a human readable name (#4114) | https://github.com/apache/seatunnel/commit/d7cd601051 | 2.3.1 |
[Improve][Project] Code format with spotless plugin. (#4101) | https://github.com/apache/seatunnel/commit/a2ab166561 | 2.3.1 |
[Improve][Connector-V2][SQLServer-CDC] Add sqlserver cdc optionRule (#4019) | https://github.com/apache/seatunnel/commit/78df503392 | 2.3.1 |
[Improve][CDC][base] Guaranteed to be exactly-once in the process of switching from SnapshotTask to IncrementalTask (#3837) | https://github.com/apache/seatunnel/commit/8379aaf876 | 2.3.1 |
[Feature][API & Connector & Doc] add parallelism and column projection interface (#3829) | https://github.com/apache/seatunnel/commit/b9164b8ba1 | 2.3.1 |
[Improve][CDC] Add mysql-cdc source factory (#3791) | https://github.com/apache/seatunnel/commit/356538de8a | 2.3.1 |
[feature][connector-v2] add sqlServer CDC (#3686) | https://github.com/apache/seatunnel/commit/0f0afb58af | 2.3.0 |