TiDB CDC
TiDB CDC source connector
Support Those Enginesâ
SeaTunnel Zeta
Flink
Key featuresâ
Descriptionâ
The TiDB CDC connector allows for reading snapshot data and incremental data from TiDB database. This document describes how to set up the TiDB CDC connector to snapshot data and capture streaming event in TiDB database.
Supported DataSource Infoâ
Datasource | Supported versions | Driver | Url | Maven |
---|---|---|---|---|
MySQL | com.mysql.cj.jdbc.Driver | jdbc:mysql://localhost:3306/test | https://mvnrepository.com/artifact/mysql/mysql-connector-java/8.0.28 | |
tikv-client-java | 3.2.0 | - | - | https://mvnrepository.com/artifact/org.tikv/tikv-client-java/3.2.0 |
Using Dependencyâ
Install Jdbc Driverâ
For Flink Engineâ
- You need to ensure that the jdbc driver jar package and the tikv-client-java jar package has been placed in directory
${SEATUNNEL_HOME}/plugins/
.
For SeaTunnel Zeta Engineâ
- You need to ensure that the jdbc driver jar package and the tikv-client-java jar package has been placed in directory
${SEATUNNEL_HOME}/lib/
.
Please download and put Mysql driver and tikv-java-client in ${SEATUNNEL_HOME}/lib/
dir. For example: cp mysql-connector-java-xxx.jar $SEATNUNNEL_HOME/lib/
Data Type Mappingâ
Mysql Data Type | SeaTunnel Data Type |
---|---|
BIT(1) TINYINT(1) | BOOLEAN |
TINYINT | TINYINT |
TINYINT UNSIGNED SMALLINT | SMALLINT |
SMALLINT UNSIGNED MEDIUMINT MEDIUMINT UNSIGNED INT INTEGER YEAR | INT |
INT UNSIGNED INTEGER UNSIGNED BIGINT | BIGINT |
BIGINT UNSIGNED | DECIMAL(20,0) |
DECIMAL(p, s) DECIMAL(p, s) UNSIGNED NUMERIC(p, s) NUMERIC(p, s) UNSIGNED | DECIMAL(p,s) |
FLOAT FLOAT UNSIGNED | FLOAT |
DOUBLE DOUBLE UNSIGNED REAL REAL UNSIGNED | DOUBLE |
CHAR VARCHAR TINYTEXT MEDIUMTEXT TEXT LONGTEXT ENUM JSON ENUM | STRING |
DATE | DATE |
TIME(s) | TIME(s) |
DATETIME TIMESTAMP(s) | TIMESTAMP(s) |
BINARY VARBINAR BIT(p) TINYBLOB MEDIUMBLOB BLOB LONGBLOB GEOMETRY | BYTES |
Source Optionsâ
Name | Type | Required | Default | Description |
---|---|---|---|---|
base-url | String | Yes | - | The URL of the JDBC connection. Refer to a case: jdbc:mysql://tidb0:4000/inventory . |
username | String | Yes | - | Name of the database to use when connecting to the database server. |
password | String | Yes | - | Password to use when connecting to the database server. |
pd-addresses | String | Yes | - | TiKV cluster's PD address |
database-name | String | Yes | - | Database name of the database to monitor. |
table-name | String | Yes | - | Table name of the database to monitor. The table name needs to include the database name. |
startup.mode | Enum | No | INITIAL | Optional startup mode for TiDB CDC consumer, valid enumerations are initial , earliest , latest and specific . initial : Synchronize historical data at startup, and then synchronize incremental data.earliest : Startup from the earliest offset possible.latest : Startup from the latest offset.specific : Startup from user-supplied specific offsets. |
tikv.grpc.timeout_in_ms | Long | No | - | TiKV GRPC timeout in ms. |
tikv.grpc.scan_timeout_in_ms | Long | No | - | TiKV GRPC scan timeout in ms. |
tikv.batch_get_concurrency | Integer | No | - | TiKV GRPC batch get concurrency |
tikv.batch_scan_concurrency | Integer | No | - | TiKV GRPC batch scan concurrency |
Task Exampleâ
Simpleâ
env {
parallelism = 1
job.mode = "STREAMING"
checkpoint.interval = 5000
}
source {
# This is a example source plugin **only for test and demonstrate the feature source plugin**
TiDB-CDC {
result_table_name = "products_tidb_cdc"
base-url = "jdbc:mysql://tidb0:4000/inventory"
driver = "com.mysql.cj.jdbc.Driver"
tikv.grpc.timeout_in_ms = 20000
pd-addresses = "pd0:2379"
username = "root"
password = ""
database-name = "inventory"
table-name = "products"
}
}
transform {
}
sink {
jdbc {
source_table_name = "products_tidb_cdc"
url = "jdbc:mysql://tidb0:4000/inventory"
driver = "com.mysql.cj.jdbc.Driver"
user = "root"
password = ""
database = "inventory"
table = "products_sink"
generate_sink_sql = true
primary_keys = ["id"]
}
}
Changelogâ
- Add TiDB CDC Source Connector