Skip to main content
Version: Next

Exactly-Once Semantics

1. Overview

1.1 Problem Background

Distributed data processing faces fundamental delivery guarantees challenges:

  • At-Most-Once: Records may be lost (unacceptable for critical data)
  • At-Least-Once: Records may be duplicated (causes counting errors, double charges)
  • Exactly-Once: Each record processed exactly once (ideal but complex)

Real-World Impact: | Scenario | Outcome | Result | |----------|---------|--------| | At-least-once | Transaction $100 processed twice | User is charged $200 | | Exactly-once | Transaction $100 processed once | User is charged $100 |

1.2 Design Goals

SeaTunnel's exactly-once semantics aims to:

  1. Verifiable End-to-End Consistency: With checkpoint boundaries + sink transactional/idempotent commits, avoid data loss/duplication under the documented failure model
  2. Transparent Implementation: Framework handles complexity, users configure minimally
  3. Performance Efficiency: Minimize overhead while maintaining guarantee
  4. Failure Resilience: Maintain guarantee across task/worker/master failures
  5. Broad Applicability: Support transactional sinks and also provide practical semantics for non-transactional sinks (e.g., idempotent writes / at-least-once)

1.3 Consistency Levels

LevelGuaranteeUse CasesImplementation
At-Most-OnceNo duplicates, may loseNon-critical logsNo retry
At-Least-OnceNo loss, may duplicateIdempotent processingRetry without transaction
Exactly-OnceNo loss, no duplicatesFinancial, billing, auditCheckpoint + 2PC

2. Theoretical Foundation

2.1 Chandy-Lamport Algorithm

Concept: Distributed snapshot without stopping the entire system.

Mechanism:

  1. Coordinator injects barriers (markers) into data streams
  2. Upon receiving barrier, each operator:
    • Snapshots its local state
    • Forwards barrier downstream
  3. When all operators snapshot, we have a consistent global snapshot

Key Property: Snapshot represents a consistent cut across distributed system state.

2.2 Two-Phase Commit Protocol

Concept: Atomic commitment across distributed participants.

Phases:

  1. Prepare Phase: All participants prepare (avoid making changes externally visible)
  2. Commit Phase: Coordinator decides commit/abort, all participants execute

In SeaTunnel:

  • Prepare: SinkWriter.prepareCommit(checkpointId) during checkpoint
  • Commit: SinkCommitter.commit() after checkpoint completes

3. Architecture for Exactly-Once

3.1 End-to-End Pipeline

3.2 Key Components

Source Offset Management:

public class KafkaSourceReader {
private Map<TopicPartition, Long> currentOffsets;

@Override
public void pollNext(Collector<SeaTunnelRow> output) {
ConsumerRecords<K, V> records = consumer.poll(timeout);
for (ConsumerRecord<K, V> record : records) {
// Process record
output.collect(convert(record));

// Track offset
currentOffsets.put(
new TopicPartition(record.topic(), record.partition()),
record.offset()
);
}
}

@Override
public List<KafkaSourceState> snapshotState(long checkpointId) {
// Snapshot offsets (will be committed after checkpoint completes)
return Collections.singletonList(new KafkaSourceState(currentOffsets));
}

@Override
public void notifyCheckpointComplete(long checkpointId) {
// Commit offsets to Kafka (idempotent)
consumer.commitSync(currentOffsets);
}
}

Sink Two-Phase Commit:

public class JdbcExactlyOnceSinkWriter {
private XAConnection xaConnection;
private Xid currentXid;

@Override
public void write(SeaTunnelRow element) {
if (currentXid == null) {
// Start XA transaction
currentXid = generateXid();
xaConnection.getXAResource().start(currentXid, XAResource.TMNOFLAGS);
}

// Execute INSERT (buffered in XA transaction)
statement.executeUpdate(toSQL(element));
}

@Override
public Optional<XidInfo> prepareCommit(long checkpointId) {
if (currentXid == null) {
return Optional.empty();
}

// PHASE 1: Prepare (no side effects)
xaConnection.getXAResource().end(currentXid, XAResource.TMSUCCESS);
xaConnection.getXAResource().prepare(currentXid);

// Return XID for committer
XidInfo xidInfo = new XidInfo(currentXid);
currentXid = null;
return Optional.of(xidInfo);
}
}

public class JdbcSinkCommitter {
@Override
public List<XidInfo> commit(List<XidInfo> commitInfos) {
List<XidInfo> failed = new ArrayList<>();

for (XidInfo xidInfo : commitInfos) {
try {
// PHASE 2: Commit (side effects now visible)
xaConnection.getXAResource().commit(xidInfo.getXid(), false);
} catch (XAException e) {
if (e.errorCode == XAException.XAER_NOTA) {
// Already committed (idempotent)
LOG.info("XID already committed: {}", xidInfo);
} else {
failed.add(xidInfo);
}
}
}

return failed;
}
}

4. Implementation Patterns

4.1 Transactional Sinks (XA)

Supported Systems: MySQL, PostgreSQL, Oracle, SQL Server

Implementation:

public class JdbcExactlyOnceSink implements SeaTunnelSink<...> {
@Override
public SinkWriter<...> createWriter(Context context) {
// Enable XA transactions
XADataSource xaDataSource = createXADataSource();
return new JdbcExactlyOnceSinkWriter(xaDataSource);
}

@Override
public Optional<SinkCommitter<XidInfo>> createCommitter() {
return Optional.of(new JdbcSinkCommitter(xaDataSource));
}
}

Pros:

  • Strong consistency guarantee
  • Automatic rollback on failure

Cons:

  • Requires database XA support
  • Higher latency (2PC overhead)
  • Lock contention during prepare phase

4.2 Idempotent Sinks (Upsert)

Supported Systems: Key-value stores, Elasticsearch (with doc ID)

Implementation:

public class ElasticsearchSinkWriter {
@Override
public void write(SeaTunnelRow element) {
// Use deterministic document ID
String docId = extractPrimaryKey(element);

IndexRequest request = new IndexRequest("my_index")
.id(docId) // Idempotent key
.source(toJson(element));

bulkProcessor.add(request);
}

@Override
public Optional<CommitInfo> prepareCommit(long checkpointId) {
// Flush bulk processor
bulkProcessor.flush();

// No explicit commit needed (operations are idempotent)
return Optional.empty();
}
}

Key: Same primary key → same document → idempotent updates

Pros:

  • No transaction overhead
  • Lower latency

Cons:

  • Requires unique key
  • Cannot handle complex transactions

4.3 Log-Based Sinks (Kafka)

Implementation:

public class KafkaSinkWriter {
private KafkaProducer<K, V> producer;
private String transactionId;

public KafkaSinkWriter() {
// Enable Kafka transactions
Properties props = new Properties();
props.put("transactional.id", generateTransactionalId());
props.put("enable.idempotence", "true");

producer = new KafkaProducer<>(props);
producer.initTransactions();
}

@Override
public void write(SeaTunnelRow element) {
if (!transactionStarted) {
producer.beginTransaction();
transactionStarted = true;
}

ProducerRecord<K, V> record = convert(element);
producer.send(record);
}

@Override
public Optional<KafkaCommitInfo> prepareCommit(long checkpointId) {
// PHASE 1: Prepare (flush, but don't commit)
producer.flush();

// Return transaction info
return Optional.of(new KafkaCommitInfo(transactionId));
}
}

public class KafkaSinkCommitter {
@Override
public List<KafkaCommitInfo> commit(List<KafkaCommitInfo> commitInfos) {
for (KafkaCommitInfo info : commitInfos) {
// PHASE 2: Commit transaction
producer.commitTransaction();

// Start new transaction for next checkpoint
producer.beginTransaction();
}
return Collections.emptyList();
}
}

4.4 File Sinks (Atomic Rename)

Implementation:

public class FileSinkWriter {
private String tempFilePath;
private String finalFilePath;
private OutputStream outputStream;

@Override
public void write(SeaTunnelRow element) {
// Write to temporary file
byte[] bytes = serialize(element);
outputStream.write(bytes);
}

@Override
public Optional<FileCommitInfo> prepareCommit(long checkpointId) {
// PHASE 1: Close temp file (no rename yet)
outputStream.close();

return Optional.of(new FileCommitInfo(tempFilePath, finalFilePath));
}
}

public class FileSinkCommitter {
@Override
public List<FileCommitInfo> commit(List<FileCommitInfo> commitInfos) {
List<FileCommitInfo> failed = new ArrayList<>();

for (FileCommitInfo info : commitInfos) {
// PHASE 2: Atomic rename (file becomes visible)
boolean success = fileSystem.rename(
new Path(info.getTempFilePath()),
new Path(info.getFinalFilePath())
);

if (!success) {
failed.add(info);
}
}

return failed;
}
}

Key: Atomic rename ensures file is either fully visible or not visible.

5. Failure Scenarios and Recovery

5.1 Task Failure Before Checkpoint

TimeEvent
t0Checkpoint N completes
t1Records [1000-2000] are processed
t2Task fails before the next successful checkpoint
t3Job restores from checkpoint N
t4Records [1000-2000] are reprocessed

Result:

  • No data loss because the records are replayed after recovery.
  • No duplication because nothing was committed before the failure.

5.2 Task Failure After prepareCommit

TimeEvent
t0Checkpoint N is in progress
t1SinkWriter.prepareCommit(...) prepares XID-123
t2Task fails before the commit step
t3Job restores from checkpoint N-1
t4Records are reprocessed
t5A new prepareCommit(...) prepares XID-124
t6The committer commits XID-124

Result:

  • XID-123 never becomes externally visible.
  • XID-124 is the only committed transaction for this checkpoint boundary.

5.3 Committer Failure During Commit

TimeEvent
t0Checkpoint N completes
t1The committer starts committing XID-100, XID-101, XID-102
t2XID-100 commits successfully
t3The committer fails before XID-101 and XID-102 finish
t4A new committer retries the whole batch
t5XID-100 is seen as already committed and treated idempotently
t6XID-101 commits successfully
t7XID-102 commits successfully

Result:

  • Every prepared transaction is eventually committed.
  • Idempotent commit logic prevents duplicates during retry.

5.4 Network Partition

TimeEvent
t0SinkWriter prepares XID-200
t1The checkpoint completes
t2The committer sends commit(XID-200)
t3A network partition causes the ACK to be lost
t4The committer retries commit(XID-200)
t5The sink reports that XID-200 is already committed

Result:

  • The external system still sees exactly one committed transaction.
  • Idempotency absorbs the duplicate commit attempt safely.

6. Idempotency Requirements

6.1 Why Idempotency Matters

Problem: Network failures, retries, and failover can cause duplicate commit attempts.

Solution: Committer operations must be idempotent.

// ❌ BAD: Non-idempotent (calling twice inserts twice)
void commit(CommitInfo info) {
statement.execute("INSERT INTO table VALUES (1, 'data')");
}

// ✅ GOOD: Idempotent (calling twice has same effect as once)
void commit(CommitInfo info) {
statement.execute(
"INSERT INTO table VALUES (1, 'data') " +
"ON DUPLICATE KEY UPDATE data = VALUES(data)"
);
}

6.2 Implementing Idempotency

Strategy 1: Check-then-Execute

public List<XidInfo> commit(List<XidInfo> commitInfos) {
for (XidInfo xid : commitInfos) {
// Check if already committed
if (isCommitted(xid)) {
LOG.info("XID already committed: {}", xid);
continue; // Idempotent
}

// Commit and record
xaResource.commit(xid, false);
recordCommit(xid);
}
}

Strategy 2: Database-Level Idempotency

-- Unique constraint ensures idempotency
CREATE TABLE commits (
xid VARCHAR(255) PRIMARY KEY,
committed_at TIMESTAMP
);

-- Idempotent insert
INSERT IGNORE INTO commits (xid, committed_at)
VALUES ('XID-123', NOW());

Strategy 3: Natural Idempotency (XA)

try {
xaResource.commit(xid, false);
} catch (XAException e) {
if (e.errorCode == XAException.XAER_NOTA) {
// Transaction not found = already committed
return; // Idempotent
}
throw e;
}

7. Performance Considerations

7.1 Checkpoint Interval Trade-offs

Checkpoint intervalStrengthsTrade-offs
Short (10-30s)Faster recovery and less reprocessingHigher snapshot overhead and more commit operations
Long (5-10min)Lower steady-state overhead and fewer commitsSlower recovery and more reprocessing after failure

Recommendation: 60-120 seconds for most workloads

7.2 Batch Size Optimization

public class OptimizedSinkWriter {
private static final int BATCH_SIZE = 1000;
private List<SeaTunnelRow> buffer = new ArrayList<>();

@Override
public void write(SeaTunnelRow element) {
buffer.add(element);

if (buffer.size() >= BATCH_SIZE) {
// Batch insert (amortize overhead)
statement.executeBatch();
buffer.clear();
}
}
}

Impact: 1000x batch → ~10x throughput improvement

7.3 Async Checkpoint

public List<StateT> snapshotState(long checkpointId) {
// Quick: Copy state snapshot (in-memory)
StateSnapshot snapshot = state.copy();

// Async: Serialize and upload
CompletableFuture.runAsync(() -> {
byte[] serialized = serialize(snapshot);
checkpointStorage.upload(checkpointId, serialized);
});

return snapshot;
}

Impact: Data processing continues while snapshot uploads

8. Configuration

8.1 Enable Exactly-Once

env {
# Checkpoint configuration
checkpoint.interval = 60000 # 60 seconds
checkpoint.timeout = 600000 # 10 minutes

# Exactly-once mode (vs at-least-once)
# This is implicit when using transactional sinks
}

8.2 Source Configuration

Kafka:

source {
Kafka {
bootstrap.servers = "localhost:9092"
topic = "my_topic"

# Kafka consumer offset commit
commit_on_checkpoint = true # Commit offsets after checkpoint
}
}

JDBC:

source {
JDBC {
url = "jdbc:mysql://..."

# Query-based source (idempotent reprocessing)
query = "SELECT * FROM table WHERE id >= ? AND id < ?"
}
}

8.3 Sink Configuration

JDBC (XA):

sink {
JDBC {
url = "jdbc:mysql://..."

# Enable XA transactions
xa_data_source_class_name = "com.mysql.cj.jdbc.MysqlXADataSource"
is_exactly_once = true
}
}

Kafka (Transactions):

sink {
Kafka {
bootstrap.servers = "localhost:9092"
topic = "output_topic"

# Kafka transactions
transaction.id = "seatunnel-kafka-sink"
enable.idempotence = true
}
}

9. Testing Exactly-Once

9.1 Functional Test

@Test
public void testExactlyOnce() {
// 1. Insert 1000 records
insertRecords(1000);

// 2. Trigger checkpoint
coordinator.triggerCheckpoint();

// 3. Simulate failure
task.fail();

// 4. Restore and continue
task.restore(checkpointId);
insertRecords(1000); // Same records reprocessed

// 5. Verify: Should have exactly 1000 records (no duplicates)
assertEquals(1000, countRecordsInSink());
}

9.2 Chaos Testing

@Test
public void testExactlyOnceUnderChaos() {
ChaosMonkey chaos = new ChaosMonkey()
.killTaskRandomly(probability = 0.1)
.injectNetworkDelay(maxDelayMs = 5000)
.pauseCheckpointRandomly(probability = 0.05);

// Run for 10 minutes with chaos
runJobWithChaos(duration = 10 * 60 * 1000, chaos);

// Verify: Input count == Output count
assertEquals(countSource(), countSink());
}

9.3 Monitoring Verification

MetricExample valueExpectation
source.records_read1,000,000Should match committed sink records
sink.records_written1,000,000Should match source reads after retries settle
sink.records_committed1,000,000Should match source reads for exactly-once verification

When all three counters converge, the end-to-end path is behaving as expected.

10. Best Practices

10.1 Choose Appropriate Sink

Use Transactional Sinks (XA) for:

  • Financial transactions
  • Billing systems
  • Audit logs
  • Critical data

Use Idempotent Sinks for:

  • High-throughput scenarios
  • Eventual consistency acceptable
  • No transaction support

10.2 Handle Poisoned Records

@Override
public void write(SeaTunnelRow element) {
try {
statement.executeUpdate(toSQL(element));
} catch (SQLException e) {
// Log poisoned record
LOG.error("Failed to write record: {}", element, e);

// Send to dead letter queue
deadLetterQueue.send(element);

// Don't fail entire checkpoint
}
}

10.3 Monitor Checkpoint Health

Key Metrics:

  • checkpoint.duration: Should be < 10% of interval
  • checkpoint.failure_rate: Should be < 1%
  • checkpoint.size: Monitor growth over time

Alerts: Recommended alerts:

  • Alert when checkpoint.duration > 300s
  • Alert when checkpoint.failure_rate > 5%
  • Alert when no checkpoint completes within 2x the configured interval

12. References

Academic Papers

Further Reading