PostgreSQL CDC
PostgreSQL CDC source connector
Support Those Engines
SeaTunnel Zeta
Flink
Key features
Description
The Postgre CDC connector allows for reading snapshot data and incremental data from Postgre database. This document describes how to set up the Postgre CDC connector to run SQL queries against Postgre databases.
Supported DataSource Info
| Datasource | Supported versions | Driver | Url | Maven | 
|---|---|---|---|---|
| PostgreSQL | Different dependency version has different driver class. | org.postgresql.Driver | jdbc:postgresql://localhost:5432/test | Download | 
| PostgreSQL | If you want to manipulate the GEOMETRY type in PostgreSQL. | org.postgresql.Driver | jdbc:postgresql://localhost:5432/test | Download | 
Using Dependency
Install Jdbc Driver
For Spark/Flink Engine
- You need to ensure that the jdbc driver jar package has been placed in directory
${SEATUNNEL_HOME}/plugins/.
For SeaTunnel Zeta Engine
- You need to ensure that the jdbc driver jar package has been placed in directory
${SEATUNNEL_HOME}/lib/.
Please download and put PostgreSQL driver in ${SEATUNNEL_HOME}/lib/ dir. For example: cp postgresql-xxx.jar $SEATUNNEL_HOME/lib/
Here are the steps to enable CDC (Change Data Capture) in PostgreSQL:
- Ensure the wal_level is set to logical: Modify the postgresql.conf configuration file by adding "wal_level = logical", restart the PostgreSQL server for the changes to take effect. Alternatively, you can use SQL commands to modify the configuration directly:
ALTER SYSTEM SET wal_level TO 'logical';
SELECT pg_reload_conf();
- Change the REPLICA policy of the specified table to FULL
ALTER TABLE your_table_name REPLICA IDENTITY FULL;
Data Type Mapping
| PostgreSQL Data type | SeaTunnel Data type | 
|---|---|
| BOOL | BOOLEAN | 
| _BOOL | ARRAY<BOOLEAN> | 
| BYTEA | BYTES | 
| _BYTEA | ARRAY<TINYINT> | 
| INT2 SMALLSERIAL INT4 SERIAL | INT | 
| _INT2 _INT4 | ARRAY<INT> | 
| INT8 BIGSERIAL | BIGINT | 
| _INT8 | ARRAY<BIGINT> | 
| FLOAT4 | FLOAT | 
| _FLOAT4 | ARRAY<FLOAT> | 
| FLOAT8 | DOUBLE | 
| _FLOAT8 | ARRAY<DOUBLE> | 
| NUMERIC(Get the designated column's specified column size>0) | DECIMAL(Get the designated column's specified column size,Gets the number of digits in the specified column to the right of the decimal point) | 
| NUMERIC(Get the designated column's specified column size<0) | DECIMAL(38, 18) | 
| BPCHAR CHARACTER VARCHAR TEXT GEOMETRY GEOGRAPHY JSON JSONB | STRING | 
| _BPCHAR _CHARACTER _VARCHAR _TEXT | ARRAY<STRING> | 
| TIMESTAMP | TIMESTAMP | 
| TIME | TIME | 
| DATE | DATE | 
| OTHER DATA TYPES | NOT SUPPORTED YET | 
Source Options
| Name | Type | Required | Default | Description | 
|---|---|---|---|---|
| url | String | Yes | - | The URL of the JDBC connection. Refer to a case: jdbc:postgresql://localhost:5432/postgres_cdc?loggerLevel=OFF. | 
| username | String | Yes | - | Name of the database to use when connecting to the database server. | 
| password | String | Yes | - | Password to use when connecting to the database server. | 
| database-names | List | No | - | Database name of the database to monitor. | 
| table-names | List | Yes | - | Table name of the database to monitor. The table name needs to include the database name, for example: database_name.table_name | 
| table-names-config | List | No | - | Table config list. for example: [{"table": "db1.schema1.table1","primaryKeys": ["key1"],"snapshotSplitColumn": "key2"}] | 
| startup.mode | Enum | No | INITIAL | Optional startup mode for PostgreSQL CDC consumer, valid enumerations are initial,earliestandlatest.initial: Synchronize historical data at startup, and then synchronize incremental data.earliest: Startup from the earliest offset possible.latest: Startup from the latest offset. | 
| snapshot.split.size | Integer | No | 8096 | The split size (number of rows) of table snapshot, captured tables are split into multiple splits when read the snapshot of table. | 
| snapshot.fetch.size | Integer | No | 1024 | The maximum fetch size for per poll when read table snapshot. | 
| slot.name | String | No | - | The name of the PostgreSQL logical decoding slot that was created for streaming changes from a particular plug-in for a particular database/schema. The server uses this slot to stream events to the connector that you are configuring. Default is seatunnel. | 
| decoding.plugin.name | String | No | pgoutput | The name of the Postgres logical decoding plug-in installed on the server,Supported values are decoderbufs, wal2json, wal2json_rds, wal2json_streaming,wal2json_rds_streaming and pgoutput. | 
| server-time-zone | String | No | UTC | The session time zone in database server. If not set, then ZoneId.systemDefault() is used to determine the server time zone. | 
| connect.timeout.ms | Duration | No | 30000 | The maximum time that the connector should wait after trying to connect to the database server before timing out. | 
| connect.max-retries | Integer | No | 3 | The max retry times that the connector should retry to build database server connection. | 
| connection.pool.size | Integer | No | 20 | The jdbc connection pool size. | 
| chunk-key.even-distribution.factor.upper-bound | Double | No | 100 | The upper bound of the chunk key distribution factor. This factor is used to determine whether the table data is evenly distributed. If the distribution factor is calculated to be less than or equal to this upper bound (i.e., (MAX(id) - MIN(id) + 1) / row count), the table chunks would be optimized for even distribution. Otherwise, if the distribution factor is greater, the table will be considered as unevenly distributed and the sampling-based sharding strategy will be used if the estimated shard count exceeds the value specified by sample-sharding.threshold. The default value is 100.0. | 
| chunk-key.even-distribution.factor.lower-bound | Double | No | 0.05 | The lower bound of the chunk key distribution factor. This factor is used to determine whether the table data is evenly distributed. If the distribution factor is calculated to be greater than or equal to this lower bound (i.e., (MAX(id) - MIN(id) + 1) / row count), the table chunks would be optimized for even distribution. Otherwise, if the distribution factor is less, the table will be considered as unevenly distributed and the sampling-based sharding strategy will be used if the estimated shard count exceeds the value specified by sample-sharding.threshold. The default value is 0.05. | 
| sample-sharding.threshold | Integer | No | 1000 | This configuration specifies the threshold of estimated shard count to trigger the sample sharding strategy. When the distribution factor is outside the bounds specified by chunk-key.even-distribution.factor.upper-boundandchunk-key.even-distribution.factor.lower-bound, and the estimated shard count (calculated as approximate row count / chunk size) exceeds this threshold, the sample sharding strategy will be used. This can help to handle large datasets more efficiently. The default value is 1000 shards. | 
| inverse-sampling.rate | Integer | No | 1000 | The inverse of the sampling rate used in the sample sharding strategy. For example, if this value is set to 1000, it means a 1/1000 sampling rate is applied during the sampling process. This option provides flexibility in controlling the granularity of the sampling, thus affecting the final number of shards. It's especially useful when dealing with very large datasets where a lower sampling rate is preferred. The default value is 1000. | 
| exactly_once | Boolean | No | false | Enable exactly once semantic. | 
| format | Enum | No | DEFAULT | Optional output format for PostgreSQL CDC, valid enumerations are DEFAULT,COMPATIBLE_DEBEZIUM_JSON. | 
| debezium | Config | No | - | Pass-through Debezium's properties to Debezium Embedded Engine which is used to capture data changes from PostgreSQL server. | 
| common-options | no | - | Source plugin common parameters, please refer to Source Common Options for details | 
Task Example
Simple
Support multi-table reading
env {
  # You can set engine configuration here
  execution.parallelism = 1
  job.mode = "STREAMING"
  checkpoint.interval = 5000
  read_limit.bytes_per_second=7000000
  read_limit.rows_per_second=400
}
source {
  Postgres-CDC {
    plugin_output = "customers_Postgre_cdc"
    username = "postgres"
    password = "postgres"
    database-names = ["postgres_cdc"]
    schema-names = ["inventory"]
    table-names = ["postgres_cdc.inventory.postgres_cdc_table_1,postgres_cdc.inventory.postgres_cdc_table_2"]
    url = "jdbc:postgresql://postgres_cdc_e2e:5432/postgres_cdc?loggerLevel=OFF"
  }
}
transform {
}
sink {
  jdbc {
    plugin_input = "customers_Postgre_cdc"
    url = "jdbc:postgresql://postgres_cdc_e2e:5432/postgres_cdc?loggerLevel=OFF"
    driver = "org.postgresql.Driver"
    username = "postgres"
    password = "postgres"
    generate_sink_sql = true
    # You need to configure both database and table
    database = postgres_cdc
    schema = "inventory"
    tablePrefix = "sink_"
    primary_keys = ["id"]
  }
}
Support custom primary key for table
source {
  Postgres-CDC {
    plugin_output = "customers_mysql_cdc"
    username = "postgres"
    password = "postgres"
    database-names = ["postgres_cdc"]
    schema-names = ["inventory"]
    table-names = ["postgres_cdc.inventory.full_types_no_primary_key"]
    url = "jdbc:postgresql://postgres_cdc_e2e:5432/postgres_cdc?loggerLevel=OFF"
    decoding.plugin.name = "decoderbufs"
    exactly_once = false
    table-names-config = [
      {
        table = "postgres_cdc.inventory.full_types_no_primary_key"
        primaryKeys = ["id"]
      }
    ]
  }
}
Changelog
Change Log
| Change | Commit | Version | 
|---|---|---|
| [Feature][Core] Add plugin directory support for each connector (#9650) | https://github.com/apache/seatunnel/commit/4beb2b9336 | 2.3.12 | 
| [improve] jdbc options (#9541) | https://github.com/apache/seatunnel/commit/d041e5fb32 | 2.3.12 | 
| [Feature][Connectors-v2] Optimize the size of CDC JAR Files (#9546) | https://github.com/apache/seatunnel/commit/1dd19c6823 | 2.3.12 | 
| [Fix][Connector-V2] Fix postgres cdc with debezium_json format can not parse number without scale (#9052) | https://github.com/apache/seatunnel/commit/29cf3a76c7 | 2.3.11 | 
| [Improve][CDC] Extract duplicate code (#8906) | https://github.com/apache/seatunnel/commit/b922bb90e6 | 2.3.10 | 
| [Improve] restruct connector common options (#8634) | https://github.com/apache/seatunnel/commit/f3499a6eeb | 2.3.10 | 
| [Improve][dist]add shade check rule (#8136) | https://github.com/apache/seatunnel/commit/51ef800016 | 2.3.9 | 
| [Improve][Connector-V2] Add pre-check for table enable cdc (#8152) | https://github.com/apache/seatunnel/commit/9a5da78176 | 2.3.9 | 
| [Feature][Connector-V2]Jdbc chunk split add snapshotSplitColumn config #7794 (#7840) | https://github.com/apache/seatunnel/commit/b6c6dc0438 | 2.3.9 | 
| [Feature][Core] Support cdc task ddl restore for zeta (#7463) | https://github.com/apache/seatunnel/commit/8e322281ed | 2.3.9 | 
| [Improve][PostgreSQL CDC]-PostgresSourceOptions description error (#7813) | https://github.com/apache/seatunnel/commit/57f47c2064 | 2.3.9 | 
| [Improve][CDC] Bump the version of debezium to 1.9.8.Final (#6740) | https://github.com/apache/seatunnel/commit/c3ac953524 | 2.3.6 | 
| [Improve][CDC] Close idle subtasks gorup(reader/writer) in increment phase (#6526) | https://github.com/apache/seatunnel/commit/454c339b9c | 2.3.6 | 
| [Improve][JDBC Source] Fix Split can not be cancel (#6825) | https://github.com/apache/seatunnel/commit/ee3b7c3723 | 2.3.6 | 
| [Hotfix][Postgres-CDC/OpenGauss-CDC] Fix read data missing when restore (#6785) | https://github.com/apache/seatunnel/commit/67c32607e7 | 2.3.6 | 
| [Hotfix][Jdbc/CDC] Fix postgresql uuid type in jdbc read (#6684) | https://github.com/apache/seatunnel/commit/868ba4d7c7 | 2.3.6 | 
| [Improve] Improve read table schema in cdc connector (#6702) | https://github.com/apache/seatunnel/commit/a8c6cc6e0c | 2.3.6 | 
| [Improve][Jdbc] Add quote identifier for sql (#6669) | https://github.com/apache/seatunnel/commit/849d748d3d | 2.3.5 | 
| [Fix][Connector-V2] Fix connector support SPI but without no args constructor (#6551) | https://github.com/apache/seatunnel/commit/5f3c9c36a5 | 2.3.5 | 
| [Improve][CDC-Connector]Fix CDC option rule. (#6454) | https://github.com/apache/seatunnel/commit/1ea27afa87 | 2.3.5 | 
| [Improve][CDC] Optimize memory allocation for snapshot split reading (#6281) | https://github.com/apache/seatunnel/commit/4856645837 | 2.3.5 | 
| [Improve][API] Unify type system api(data & type) (#5872) | https://github.com/apache/seatunnel/commit/b38c7edcc9 | 2.3.5 | 
| [Feature][Connector]update pgsql-cdc publication for add table (#6309) | https://github.com/apache/seatunnel/commit/2ad7d65236 | 2.3.5 | 
| [Improve][Postgres-CDC] Fix name typos (#6248) | https://github.com/apache/seatunnel/commit/2462f1c5f7 | 2.3.4 | 
| [Improve][Postgres-CDC] Update jdbc fetchsize (#6245) | https://github.com/apache/seatunnel/commit/c25beb9f8a | 2.3.4 | 
| [Feature][Connector-V2][Postgres-cdc]Support for Postgres cdc (#5986) | https://github.com/apache/seatunnel/commit/97438b9402 | 2.3.4 |