Skip to main content
Version: Next

Multi-Table Synchronization Architecture

1. Overview

1.1 Problem Background

Database migration and CDC scenarios often require synchronizing hundreds of tables:

  • Resource Efficiency: How to avoid creating one job per table?
  • Consistent Snapshot: How to ensure all tables start from same point in time?
  • Schema Routing: How to route data to correct target tables?
  • Independent Schemas: How to handle different schemas per table?
  • Parallel Writing: How to maximize throughput for multiple tables?

1.2 Design Goals

SeaTunnel's multi-table synchronization aims to:

  1. Single Job, Multiple Tables: Synchronize hundreds of tables in one job
  2. Resource Efficiency: Share resources across tables
  3. Schema Independence: Each table maintains its own schema
  4. Dynamic Routing: Route records to correct sink based on table identity
  5. Horizontal Scalability: Support replica writers for high throughput

1.3 Use Cases

Database Migration:

source {
MySQL-CDC {
# Capture all tables in database
database-name = "my_db"
table-name = ".*" # Regex: all tables
}
}

sink {
JDBC {
# Write to PostgreSQL
url = "jdbc:postgresql://..."
}
}

Multi-Table CDC:

source {
MySQL-CDC {
table-name = "order_.*|user_.*|product_.*" # Multiple table patterns
}
}

sink {
Elasticsearch {
# Different indices per table
}
}

2. Core Abstractions

2.1 TablePath

Unique identifier for routing records to tables.

public class TablePath implements Serializable {
private final String databaseName;
private final String schemaName;
private final String tableName;

// Unique string representation
public String getFullName() {
return String.join(".", databaseName, schemaName, tableName);
}
}

Example:

TablePath orderTable = TablePath.of("my_db", "public", "orders");
TablePath userTable = TablePath.of("my_db", "public", "users");

2.2 SeaTunnelRow with TableId

Records carry table identity for routing.

public class SeaTunnelRow {
private final String tableId; // TablePath serialized
private final SeaTunnelRowKind rowKind; // INSERT, UPDATE, DELETE
private final Object[] fields;

public TablePath getTablePath() {
return TablePath.deserialize(tableId);
}
}

2.3 SinkIdentifier

Unique identifier for sink writers (table + replica index).

public class SinkIdentifier implements Serializable {
private final TableIdentifier tableIdentifier;
private final int index; // Replica index

// For multi-table: one identifier per table per replica
// Example: (orders, 0), (orders, 1), (users, 0), (users, 1)
}

3. MultiTableSource Architecture

3.1 Structure

public class MultiTableSource<T, SplitT, StateT>
implements SeaTunnelSource<T, SplitT, StateT> {

// Underlying sources (one per table)
private final Map<TablePath, SeaTunnelSource<T, SplitT, StateT>> sources;

// Produced catalog tables
private final List<CatalogTable> catalogTables;
}

3.2 Creation

// From configuration
MultiTableSource<SeaTunnelRow, ?, ?> multiSource =
MultiTableSource.builder()
.addSource(orderTablePath, orderSource)
.addSource(userTablePath, userSource)
.addSource(productTablePath, productSource)
.build();

3.3 Enumerator: Unified Split Assignment

public class MultiTableSourceSplitEnumerator {
private final Map<TablePath, SourceSplitEnumerator> enumerators;

@Override
public void handleSplitRequest(int subtaskId) {
// Round-robin across table enumerators
for (Map.Entry<TablePath, SourceSplitEnumerator> entry : enumerators.entrySet()) {
TablePath tablePath = entry.getKey();
SourceSplitEnumerator enumerator = entry.getValue();

// Request split from table enumerator
enumerator.handleSplitRequest(subtaskId);
}
}

@Override
public void addReader(int subtaskId) {
// Register reader with all table enumerators
for (SourceSplitEnumerator enumerator : enumerators.values()) {
enumerator.addReader(subtaskId);
}
}
}

3.4 Reader: Multi-Table Data Reading

public class MultiTableSourceReader {
private final Map<TablePath, SourceReader> readers;
private final Queue<TablePath> readOrder; // Round-robin queue

@Override
public void pollNext(Collector<SeaTunnelRow> output) {
if (readOrder.isEmpty()) {
return;
}

// Round-robin read from tables
TablePath currentTable = readOrder.poll();
SourceReader reader = readers.get(currentTable);

// Read from current table
reader.pollNext(new Collector<SeaTunnelRow>() {
@Override
public void collect(SeaTunnelRow row) {
// Tag row with table path
row.setTableId(currentTable.serialize());
output.collect(row);
}
});

// Re-add to queue for next round
readOrder.offer(currentTable);
}

@Override
public void addSplits(List<SplitT> splits) {
// Route splits to correct table readers
for (SplitT split : splits) {
TablePath tablePath = extractTablePath(split);
SourceReader reader = readers.get(tablePath);
reader.addSplits(Collections.singletonList(split));

// Add table to read order if not present
if (!readOrder.contains(tablePath)) {
readOrder.offer(tablePath);
}
}
}
}

4. MultiTableSink Architecture

4.1 Structure

public class MultiTableSink<IN, StateT, CommitInfoT, AggregatedCommitInfoT>
implements SeaTunnelSink<IN, StateT, CommitInfoT, AggregatedCommitInfoT> {

// Underlying sinks (one per table)
private final Map<TablePath, SeaTunnelSink> sinks;

// Number of writer replicas per table
private final int replicaNum;

// Input catalog tables
private final List<CatalogTable> catalogTables;
}

4.2 Writer: Multi-Table Writing with Replicas

public class MultiTableSinkWriter<IN, CommitInfoT, StateT>
implements SinkWriter<IN, CommitInfoT, StateT> {

// Writers per table (multiple replicas per table)
private final Map<SinkIdentifier, SinkWriter<IN, CommitInfoT, StateT>> writers;

// Replica count per table
private final int replicaNum;

// Context
private final int writerIndex; // This writer's global index

@Override
public void write(IN element) throws IOException {
SeaTunnelRow row = (SeaTunnelRow) element;

// 1. Determine target table
TablePath tablePath = row.getTablePath();

// 2. Select replica for this table (load balancing)
int replicaIndex = selectReplica(tablePath, row);

// 3. Get writer for (table, replica)
SinkIdentifier identifier = new SinkIdentifier(
new TableIdentifier(tablePath),
replicaIndex
);

SinkWriter<IN, CommitInfoT, StateT> writer = writers.get(identifier);

// 4. Write to selected writer
writer.write(element);
}

private int selectReplica(TablePath tablePath, SeaTunnelRow row) {
// If primary key is available, route stably by primary key hash.
Optional<Object> primaryKey = extractPrimaryKeyIfPresent(row);
if (primaryKey.isPresent()) {
return Math.abs(primaryKey.get().hashCode()) % replicaNum;
}

// Otherwise, distribute across replicas (no stable routing guarantee).
return (int) (System.nanoTime() % replicaNum);
}

@Override
public Optional<CommitInfoT> prepareCommit(long checkpointId) throws IOException {
// Collect commit info from all writers
List<CommitInfoT> allCommitInfos = new ArrayList<>();

for (SinkWriter<IN, CommitInfoT, StateT> writer : writers.values()) {
Optional<CommitInfoT> commitInfo = writer.prepareCommit(checkpointId);
commitInfo.ifPresent(allCommitInfos::add);
}

// Wrap in multi-table commit info
return Optional.of((CommitInfoT) new MultiTableCommitInfo(allCommitInfos));
}

@Override
public List<StateT> snapshotState(long checkpointId) throws IOException {
// Snapshot all writers
List<StateT> allStates = new ArrayList<>();

for (Map.Entry<SinkIdentifier, SinkWriter> entry : writers.entrySet()) {
List<StateT> states = entry.getValue().snapshotState(checkpointId);

// Tag states with sink identifier for recovery
for (StateT state : states) {
allStates.add(wrapWithIdentifier(entry.getKey(), state));
}
}

return allStates;
}
}

4.3 Committer: Multi-Table Commit Coordination

public class MultiTableSinkCommitter<CommitInfoT>
implements SinkCommitter<CommitInfoT> {

// Committers per table
private final Map<TablePath, SinkCommitter<CommitInfoT>> committers;

@Override
public List<CommitInfoT> commit(List<CommitInfoT> commitInfos) throws IOException {
List<CommitInfoT> failed = new ArrayList<>();

// Group commit infos by table
Map<TablePath, List<CommitInfoT>> groupedInfos = groupByTable(commitInfos);

// Commit per table
for (Map.Entry<TablePath, List<CommitInfoT>> entry : groupedInfos.entrySet()) {
TablePath tablePath = entry.getKey();
List<CommitInfoT> tableCommitInfos = entry.getValue();

SinkCommitter<CommitInfoT> committer = committers.get(tablePath);

// Commit for this table
List<CommitInfoT> tableFailed = committer.commit(tableCommitInfos);
failed.addAll(tableFailed);
}

return failed;
}

private Map<TablePath, List<CommitInfoT>> groupByTable(List<CommitInfoT> commitInfos) {
Map<TablePath, List<CommitInfoT>> grouped = new HashMap<>();

for (CommitInfoT commitInfo : commitInfos) {
TablePath tablePath = extractTablePath(commitInfo);
grouped.computeIfAbsent(tablePath, k -> new ArrayList<>()).add(commitInfo);
}

return grouped;
}
}

5. Replica Mechanism

5.1 Why Replicas?

Problem: Single writer per table becomes bottleneck for high-throughput tables.

Solution: Multiple replica writers per table for parallel writing.

Without Replicas:
orders table (1000 writes/sec) → [Single Writer] → Bottleneck

With Replicas (replicaNum=4):
orders table (1000 writes/sec) → [Writer 0] (250 writes/sec)
→ [Writer 1] (250 writes/sec)
→ [Writer 2] (250 writes/sec)
→ [Writer 3] (250 writes/sec)

5.2 Replica Configuration

sink {
JDBC {
url = "..."

# Multi-table configuration
multi_table_sink_replica = 4 # replicas per table (applies to all tables)
}
}

5.3 Replica Selection Strategies

Hash-Based (when primary key is available):

// Ensures same primary key always goes to same replica (order preservation)
int replica = Math.abs(primaryKey.hashCode()) % replicaNum;

Random (when primary key is not available):

// Distributes load across replicas (no stable routing guarantee)
int replica = (int) (System.nanoTime() % replicaNum);

6. Schema Management in Multi-Table

6.1 Independent Schemas

Each table maintains its own schema:

public class MultiTableSink {
// Schema per table
private final Map<TablePath, CatalogTable> catalogTables;

public CatalogTable getCatalogTable(TablePath tablePath) {
return catalogTables.get(tablePath);
}
}

6.2 Schema Evolution Routing

public class MultiTableSinkWriter {
public void handleSchemaChange(SchemaChangeEvent event) {
// Route schema change to correct table writer
TablePath tablePath = event.getTableId().toTablePath();

// Apply to all replicas of this table
for (int i = 0; i < replicaNum; i++) {
SinkIdentifier identifier = new SinkIdentifier(
new TableIdentifier(tablePath),
i
);

SinkWriter writer = writers.get(identifier);
writer.applySchemaChange(event);
}
}
}

7. Data Flow Example

7.1 Full Pipeline

┌──────────────────────────────────────────────────────────────┐
│ MySQL CDC Source │
│ • Captures changes from 100 tables │
│ • Tags each row with TablePath │
└──────────────────────────┬───────────────────────────────────┘


┌─────────────────────────────────────┐
│ SeaTunnelRow (with TablePath) │
│ tableId: "my_db.public.orders" │
│ fields: [1, "order-001", 99.99] │
└─────────────────────────────────────┘


┌──────────────────────────────────────────────────────────────┐
│ MultiTableSinkWriter │
│ • Extracts TablePath from row │
│ • Selects replica (hash or random) │
│ • Routes to correct writer │
└──────────────────────────┬───────────────────────────────────┘

┌──────────────────┼──────────────────┐
▼ ▼ ▼
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ orders │ │ users │ │ products │
│ Writer 0 │ │ Writer 0 │ │ Writer 0 │
│ Writer 1 │ │ Writer 1 │ │ Writer 1 │
│ Writer 2 │ │ │ │ │
│ Writer 3 │ │ │ │ │
└──────────────┘ └──────────────┘ └──────────────┘
│ │ │
▼ ▼ ▼
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ PostgreSQL │ │ PostgreSQL │ │ PostgreSQL │
│ orders │ │ users │ │ products │
└──────────────┘ └──────────────┘ └──────────────┘

7.2 Write Flow

7.3 Checkpoint Flow

8. Performance Optimization

8.1 Replica Sizing

Rule of Thumb:

replicaNum = ceil(Table Write Rate / Single Writer Throughput)

Example:
orders: 10,000 writes/sec
Single writer: 2,500 writes/sec
replicaNum = ceil(10,000 / 2,500) = 4

8.2 Table-Specific Replicas

// Future enhancement: different replicas per table
Map<TablePath, Integer> replicaConfig = Map.of(
TablePath.of("orders"), 4, // High-throughput table
TablePath.of("users"), 2, // Medium-throughput
TablePath.of("config"), 1 // Low-throughput
);

8.3 Batch Writing

public class MultiTableSinkWriter {
private final Map<SinkIdentifier, List<SeaTunnelRow>> buffers;
private static final int BATCH_SIZE = 1000;

@Override
public void write(SeaTunnelRow row) {
SinkIdentifier identifier = selectWriter(row);

List<SeaTunnelRow> buffer = buffers.computeIfAbsent(
identifier,
k -> new ArrayList<>()
);

buffer.add(row);

if (buffer.size() >= BATCH_SIZE) {
flushBuffer(identifier, buffer);
}
}
}

9. Monitoring and Observability

9.1 Key Metrics

Per-Table Metrics:

  • table.{tableName}.records_written: Records written per table
  • table.{tableName}.bytes_written: Bytes written per table
  • table.{tableName}.write_latency: Write latency per table

Per-Replica Metrics:

  • table.{tableName}.replica.{index}.records: Records per replica
  • table.{tableName}.replica.{index}.utilization: Replica utilization

Global Metrics:

  • multitable.tables.total: Total number of tables
  • multitable.writers.total: Total number of writers (tables × replicas)
  • multitable.throughput: Aggregate throughput

9.2 Monitoring Dashboard

Multi-Table Job: mysql-to-postgres

Tables: 100
Writers: 250 (avg 2.5 replicas per table)
Throughput: 50,000 records/sec

Top Tables by Throughput:
1. orders: 15,000 rec/sec (4 replicas)
2. events: 10,000 rec/sec (4 replicas)
3. users: 5,000 rec/sec (2 replicas)
...

Replica Distribution:
orders:
Replica 0: 3,750 rec/sec (25%)
Replica 1: 3,800 rec/sec (25.3%)
Replica 2: 3,700 rec/sec (24.7%)
Replica 3: 3,750 rec/sec (25%)

10. Best Practices

10.1 Table Selection

Table include/exclude patterns are connector-specific. Please refer to the specific Source connector documentation for the supported option keys and formats.

10.2 Replica Configuration

Start Conservative:

sink {
JDBC {
# Start with 1 replica, increase if bottleneck
multi_table_sink_replica = 1
}
}

Monitor and Tune:

# Check if single replica is bottleneck
# If write latency high → increase replicas
multi_table_sink_replica = 2 # Double capacity

10.3 Schema Management

Pre-create Target Tables:

-- Better: pre-create all target tables
CREATE TABLE orders (...);
CREATE TABLE users (...);
CREATE TABLE products (...);

Enable Auto-Create (Carefully):

sink {
JDBC {
# Auto-create missing tables
schema-evolution {
enabled = true
auto-create-table = true
}
}
}

10.4 Error Handling

Error tolerance and retry policies are typically connector-specific. Avoid relying on undocumented multi-table.* option keys unless they are defined by the connector you use.

11. Limitations and Considerations

11.1 Current Limitations

Shared Parallelism:

  • All tables share same parallelism
  • Cannot set different parallelism per table

Fixed Replicas:

  • Same replica count for all tables
  • High-throughput and low-throughput tables treated equally

Memory Overhead:

  • Each writer maintains separate buffer
  • 100 tables × 4 replicas = 400 writers in memory

11.2 Workarounds

High-Throughput Tables:

# Option 1: Separate job for hot tables
job-1 { source { table-name = "orders" } } # Dedicated job

job-2 { source { table-name = "user_.*|product_.*" } } # Rest

Memory Optimization:

# Reduce buffer size per writer
sink {
JDBC {
batch-size = 500 # Smaller batches
}
}

12. Future Enhancements

12.1 Dynamic Replicas

Per-table replica overrides are not supported by the current multi_table_sink_replica option (it applies to all tables). If you need per-table replicas, it requires additional connector/framework capabilities.

12.2 Adaptive Replicas

// Auto-adjust replicas based on throughput
if (table.getWriteRate() > threshold) {
increaseReplicas(table);
} else if (table.getWriteRate() < lowThreshold) {
decreaseReplicas(table);
}

14. References

Key Source Files

Example Implementations

  • MySQL CDC Source: seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/
  • JDBC Sink: seatunnel-connectors-v2/connector-jdbc/