Exactly-Once Semantics
1. Overview
1.1 Problem Background
Distributed data processing faces fundamental delivery guarantees challenges:
- At-Most-Once: Records may be lost (unacceptable for critical data)
- At-Least-Once: Records may be duplicated (causes counting errors, double charges)
- Exactly-Once: Each record processed exactly once (ideal but complex)
Real-World Impact:
| Scenario | Outcome | Result |
|----------|---------|--------|
| At-least-once | Transaction $100 processed twice | User is charged $200 |
| Exactly-once | Transaction $100 processed once | User is charged $100 |
1.2 Design Goals
SeaTunnel's exactly-once semantics aims to:
- Verifiable End-to-End Consistency: With checkpoint boundaries + sink transactional/idempotent commits, avoid data loss/duplication under the documented failure model
- Transparent Implementation: Framework handles complexity, users configure minimally
- Performance Efficiency: Minimize overhead while maintaining guarantee
- Failure Resilience: Maintain guarantee across task/worker/master failures
- Broad Applicability: Support transactional sinks and also provide practical semantics for non-transactional sinks (e.g., idempotent writes / at-least-once)
1.3 Consistency Levels
| Level | Guarantee | Use Cases | Implementation |
|---|---|---|---|
| At-Most-Once | No duplicates, may lose | Non-critical logs | No retry |
| At-Least-Once | No loss, may duplicate | Idempotent processing | Retry without transaction |
| Exactly-Once | No loss, no duplicates | Financial, billing, audit | Checkpoint + 2PC |
2. Theoretical Foundation
2.1 Chandy-Lamport Algorithm
Concept: Distributed snapshot without stopping the entire system.
Mechanism:
- Coordinator injects barriers (markers) into data streams
- Upon receiving barrier, each operator:
- Snapshots its local state
- Forwards barrier downstream
- When all operators snapshot, we have a consistent global snapshot
Key Property: Snapshot represents a consistent cut across distributed system state.
2.2 Two-Phase Commit Protocol
Concept: Atomic commitment across distributed participants.
Phases:
- Prepare Phase: All participants prepare (avoid making changes externally visible)
- Commit Phase: Coordinator decides commit/abort, all participants execute
In SeaTunnel:
- Prepare:
SinkWriter.prepareCommit(checkpointId)during checkpoint - Commit:
SinkCommitter.commit()after checkpoint completes
3. Architecture for Exactly-Once
3.1 End-to-End Pipeline
3.2 Key Components
Source Offset Management:
public class KafkaSourceReader {
private Map<TopicPartition, Long> currentOffsets;
@Override
public void pollNext(Collector<SeaTunnelRow> output) {
ConsumerRecords<K, V> records = consumer.poll(timeout);
for (ConsumerRecord<K, V> record : records) {
// Process record
output.collect(convert(record));
// Track offset
currentOffsets.put(
new TopicPartition(record.topic(), record.partition()),
record.offset()
);
}
}
@Override
public List<KafkaSourceState> snapshotState(long checkpointId) {
// Snapshot offsets (will be committed after checkpoint completes)
return Collections.singletonList(new KafkaSourceState(currentOffsets));
}
@Override
public void notifyCheckpointComplete(long checkpointId) {
// Commit offsets to Kafka (idempotent)
consumer.commitSync(currentOffsets);
}
}
Sink Two-Phase Commit:
public class JdbcExactlyOnceSinkWriter {
private XAConnection xaConnection;
private Xid currentXid;
@Override
public void write(SeaTunnelRow element) {
if (currentXid == null) {
// Start XA transaction
currentXid = generateXid();
xaConnection.getXAResource().start(currentXid, XAResource.TMNOFLAGS);
}
// Execute INSERT (buffered in XA transaction)
statement.executeUpdate(toSQL(element));
}
@Override
public Optional<XidInfo> prepareCommit(long checkpointId) {
if (currentXid == null) {
return Optional.empty();
}
// PHASE 1: Prepare (no side effects)
xaConnection.getXAResource().end(currentXid, XAResource.TMSUCCESS);
xaConnection.getXAResource().prepare(currentXid);
// Return XID for committer
XidInfo xidInfo = new XidInfo(currentXid);
currentXid = null;
return Optional.of(xidInfo);
}
}
public class JdbcSinkCommitter {
@Override
public List<XidInfo> commit(List<XidInfo> commitInfos) {
List<XidInfo> failed = new ArrayList<>();
for (XidInfo xidInfo : commitInfos) {
try {
// PHASE 2: Commit (side effects now visible)
xaConnection.getXAResource().commit(xidInfo.getXid(), false);
} catch (XAException e) {
if (e.errorCode == XAException.XAER_NOTA) {
// Already committed (idempotent)
LOG.info("XID already committed: {}", xidInfo);
} else {
failed.add(xidInfo);
}
}
}
return failed;
}
}
4. Implementation Patterns
4.1 Transactional Sinks (XA)
Supported Systems: MySQL, PostgreSQL, Oracle, SQL Server
Implementation:
public class JdbcExactlyOnceSink implements SeaTunnelSink<...> {
@Override
public SinkWriter<...> createWriter(Context context) {
// Enable XA transactions
XADataSource xaDataSource = createXADataSource();
return new JdbcExactlyOnceSinkWriter(xaDataSource);
}
@Override
public Optional<SinkCommitter<XidInfo>> createCommitter() {
return Optional.of(new JdbcSinkCommitter(xaDataSource));
}
}
Pros:
- Strong consistency guarantee
- Automatic rollback on failure
Cons:
- Requires database XA support
- Higher latency (2PC overhead)
- Lock contention during prepare phase
4.2 Idempotent Sinks (Upsert)
Supported Systems: Key-value stores, Elasticsearch (with doc ID)
Implementation:
public class ElasticsearchSinkWriter {
@Override
public void write(SeaTunnelRow element) {
// Use deterministic document ID
String docId = extractPrimaryKey(element);
IndexRequest request = new IndexRequest("my_index")
.id(docId) // Idempotent key
.source(toJson(element));
bulkProcessor.add(request);
}
@Override
public Optional<CommitInfo> prepareCommit(long checkpointId) {
// Flush bulk processor
bulkProcessor.flush();
// No explicit commit needed (operations are idempotent)
return Optional.empty();
}
}
Key: Same primary key → same document → idempotent updates
Pros:
- No transaction overhead
- Lower latency
Cons:
- Requires unique key
- Cannot handle complex transactions
4.3 Log-Based Sinks (Kafka)
Implementation:
public class KafkaSinkWriter {
private KafkaProducer<K, V> producer;
private String transactionId;
public KafkaSinkWriter() {
// Enable Kafka transactions
Properties props = new Properties();
props.put("transactional.id", generateTransactionalId());
props.put("enable.idempotence", "true");
producer = new KafkaProducer<>(props);
producer.initTransactions();
}
@Override
public void write(SeaTunnelRow element) {
if (!transactionStarted) {
producer.beginTransaction();
transactionStarted = true;
}
ProducerRecord<K, V> record = convert(element);
producer.send(record);
}
@Override
public Optional<KafkaCommitInfo> prepareCommit(long checkpointId) {
// PHASE 1: Prepare (flush, but don't commit)
producer.flush();
// Return transaction info
return Optional.of(new KafkaCommitInfo(transactionId));
}
}
public class KafkaSinkCommitter {
@Override
public List<KafkaCommitInfo> commit(List<KafkaCommitInfo> commitInfos) {
for (KafkaCommitInfo info : commitInfos) {
// PHASE 2: Commit transaction
producer.commitTransaction();
// Start new transaction for next checkpoint
producer.beginTransaction();
}
return Collections.emptyList();
}
}
4.4 File Sinks (Atomic Rename)
Implementation:
public class FileSinkWriter {
private String tempFilePath;
private String finalFilePath;
private OutputStream outputStream;
@Override
public void write(SeaTunnelRow element) {
// Write to temporary file
byte[] bytes = serialize(element);
outputStream.write(bytes);
}
@Override
public Optional<FileCommitInfo> prepareCommit(long checkpointId) {
// PHASE 1: Close temp file (no rename yet)
outputStream.close();
return Optional.of(new FileCommitInfo(tempFilePath, finalFilePath));
}
}
public class FileSinkCommitter {
@Override
public List<FileCommitInfo> commit(List<FileCommitInfo> commitInfos) {
List<FileCommitInfo> failed = new ArrayList<>();
for (FileCommitInfo info : commitInfos) {
// PHASE 2: Atomic rename (file becomes visible)
boolean success = fileSystem.rename(
new Path(info.getTempFilePath()),
new Path(info.getFinalFilePath())
);
if (!success) {
failed.add(info);
}
}
return failed;
}
}
Key: Atomic rename ensures file is either fully visible or not visible.
5. Failure Scenarios and Recovery
5.1 Task Failure Before Checkpoint
| Time | Event |
|---|---|
t0 | Checkpoint N completes |
t1 | Records [1000-2000] are processed |
t2 | Task fails before the next successful checkpoint |
t3 | Job restores from checkpoint N |
t4 | Records [1000-2000] are reprocessed |
Result:
- No data loss because the records are replayed after recovery.
- No duplication because nothing was committed before the failure.
5.2 Task Failure After prepareCommit
| Time | Event |
|---|---|
t0 | Checkpoint N is in progress |
t1 | SinkWriter.prepareCommit(...) prepares XID-123 |
t2 | Task fails before the commit step |
t3 | Job restores from checkpoint N-1 |
t4 | Records are reprocessed |
t5 | A new prepareCommit(...) prepares XID-124 |
t6 | The committer commits XID-124 |
Result:
XID-123never becomes externally visible.XID-124is the only committed transaction for this checkpoint boundary.
5.3 Committer Failure During Commit
| Time | Event |
|---|---|
t0 | Checkpoint N completes |
t1 | The committer starts committing XID-100, XID-101, XID-102 |
t2 | XID-100 commits successfully |
t3 | The committer fails before XID-101 and XID-102 finish |
t4 | A new committer retries the whole batch |
t5 | XID-100 is seen as already committed and treated idempotently |
t6 | XID-101 commits successfully |
t7 | XID-102 commits successfully |
Result:
- Every prepared transaction is eventually committed.
- Idempotent commit logic prevents duplicates during retry.
5.4 Network Partition
| Time | Event |
|---|---|
t0 | SinkWriter prepares XID-200 |
t1 | The checkpoint completes |
t2 | The committer sends commit(XID-200) |
t3 | A network partition causes the ACK to be lost |
t4 | The committer retries commit(XID-200) |
t5 | The sink reports that XID-200 is already committed |
Result:
- The external system still sees exactly one committed transaction.
- Idempotency absorbs the duplicate commit attempt safely.
6. Idempotency Requirements
6.1 Why Idempotency Matters
Problem: Network failures, retries, and failover can cause duplicate commit attempts.
Solution: Committer operations must be idempotent.
// ❌ BAD: Non-idempotent (calling twice inserts twice)
void commit(CommitInfo info) {
statement.execute("INSERT INTO table VALUES (1, 'data')");
}
// ✅ GOOD: Idempotent (calling twice has same effect as once)
void commit(CommitInfo info) {
statement.execute(
"INSERT INTO table VALUES (1, 'data') " +
"ON DUPLICATE KEY UPDATE data = VALUES(data)"
);
}
6.2 Implementing Idempotency
Strategy 1: Check-then-Execute
public List<XidInfo> commit(List<XidInfo> commitInfos) {
for (XidInfo xid : commitInfos) {
// Check if already committed
if (isCommitted(xid)) {
LOG.info("XID already committed: {}", xid);
continue; // Idempotent
}
// Commit and record
xaResource.commit(xid, false);
recordCommit(xid);
}
}
Strategy 2: Database-Level Idempotency
-- Unique constraint ensures idempotency
CREATE TABLE commits (
xid VARCHAR(255) PRIMARY KEY,
committed_at TIMESTAMP
);
-- Idempotent insert
INSERT IGNORE INTO commits (xid, committed_at)
VALUES ('XID-123', NOW());
Strategy 3: Natural Idempotency (XA)
try {
xaResource.commit(xid, false);
} catch (XAException e) {
if (e.errorCode == XAException.XAER_NOTA) {
// Transaction not found = already committed
return; // Idempotent
}
throw e;
}
7. Performance Considerations
7.1 Checkpoint Interval Trade-offs
| Checkpoint interval | Strengths | Trade-offs |
|---|---|---|
Short (10-30s) | Faster recovery and less reprocessing | Higher snapshot overhead and more commit operations |
Long (5-10min) | Lower steady-state overhead and fewer commits | Slower recovery and more reprocessing after failure |
Recommendation: 60-120 seconds for most workloads
7.2 Batch Size Optimization
public class OptimizedSinkWriter {
private static final int BATCH_SIZE = 1000;
private List<SeaTunnelRow> buffer = new ArrayList<>();
@Override
public void write(SeaTunnelRow element) {
buffer.add(element);
if (buffer.size() >= BATCH_SIZE) {
// Batch insert (amortize overhead)
statement.executeBatch();
buffer.clear();
}
}
}
Impact: 1000x batch → ~10x throughput improvement
7.3 Async Checkpoint
public List<StateT> snapshotState(long checkpointId) {
// Quick: Copy state snapshot (in-memory)
StateSnapshot snapshot = state.copy();
// Async: Serialize and upload
CompletableFuture.runAsync(() -> {
byte[] serialized = serialize(snapshot);
checkpointStorage.upload(checkpointId, serialized);
});
return snapshot;
}
Impact: Data processing continues while snapshot uploads
8. Configuration
8.1 Enable Exactly-Once
env {
# Checkpoint configuration
checkpoint.interval = 60000 # 60 seconds
checkpoint.timeout = 600000 # 10 minutes
# Exactly-once mode (vs at-least-once)
# This is implicit when using transactional sinks
}
8.2 Source Configuration
Kafka:
source {
Kafka {
bootstrap.servers = "localhost:9092"
topic = "my_topic"
# Kafka consumer offset commit
commit_on_checkpoint = true # Commit offsets after checkpoint
}
}
JDBC:
source {
JDBC {
url = "jdbc:mysql://..."
# Query-based source (idempotent reprocessing)
query = "SELECT * FROM table WHERE id >= ? AND id < ?"
}
}
8.3 Sink Configuration
JDBC (XA):
sink {
JDBC {
url = "jdbc:mysql://..."
# Enable XA transactions
xa_data_source_class_name = "com.mysql.cj.jdbc.MysqlXADataSource"
is_exactly_once = true
}
}
Kafka (Transactions):
sink {
Kafka {
bootstrap.servers = "localhost:9092"
topic = "output_topic"
# Kafka transactions
transaction.id = "seatunnel-kafka-sink"
enable.idempotence = true
}
}
9. Testing Exactly-Once
9.1 Functional Test
@Test
public void testExactlyOnce() {
// 1. Insert 1000 records
insertRecords(1000);
// 2. Trigger checkpoint
coordinator.triggerCheckpoint();
// 3. Simulate failure
task.fail();
// 4. Restore and continue
task.restore(checkpointId);
insertRecords(1000); // Same records reprocessed
// 5. Verify: Should have exactly 1000 records (no duplicates)
assertEquals(1000, countRecordsInSink());
}
9.2 Chaos Testing
@Test
public void testExactlyOnceUnderChaos() {
ChaosMonkey chaos = new ChaosMonkey()
.killTaskRandomly(probability = 0.1)
.injectNetworkDelay(maxDelayMs = 5000)
.pauseCheckpointRandomly(probability = 0.05);
// Run for 10 minutes with chaos
runJobWithChaos(duration = 10 * 60 * 1000, chaos);
// Verify: Input count == Output count
assertEquals(countSource(), countSink());
}
9.3 Monitoring Verification
| Metric | Example value | Expectation |
|---|---|---|
source.records_read | 1,000,000 | Should match committed sink records |
sink.records_written | 1,000,000 | Should match source reads after retries settle |
sink.records_committed | 1,000,000 | Should match source reads for exactly-once verification |
When all three counters converge, the end-to-end path is behaving as expected.
10. Best Practices
10.1 Choose Appropriate Sink
Use Transactional Sinks (XA) for:
- Financial transactions
- Billing systems
- Audit logs
- Critical data
Use Idempotent Sinks for:
- High-throughput scenarios
- Eventual consistency acceptable
- No transaction support
10.2 Handle Poisoned Records
@Override
public void write(SeaTunnelRow element) {
try {
statement.executeUpdate(toSQL(element));
} catch (SQLException e) {
// Log poisoned record
LOG.error("Failed to write record: {}", element, e);
// Send to dead letter queue
deadLetterQueue.send(element);
// Don't fail entire checkpoint
}
}
10.3 Monitor Checkpoint Health
Key Metrics:
checkpoint.duration: Should be < 10% of intervalcheckpoint.failure_rate: Should be < 1%checkpoint.size: Monitor growth over time
Alerts: Recommended alerts:
- Alert when
checkpoint.duration > 300s - Alert when
checkpoint.failure_rate > 5% - Alert when no checkpoint completes within
2xthe configured interval
11. Related Resources
12. References
Academic Papers
- Chandy & Lamport (1985): "Distributed Snapshots"
- Gray & Lamport (2006): "Consensus on Transaction Commit", ACM Trans. Database Syst. 31, 1 (March 2006), 1-33
- Carbone et al. (2017): "State Management in Apache Flink"