Checkpoint Mechanism
1. Overview
1.1 Problem Background
Distributed data processing systems face critical challenges for fault tolerance:
- State Loss: How to preserve processing state across failures?
- Exactly-Once: How to ensure each record is processed exactly once?
- Distributed Consistency: How to create consistent snapshots across distributed tasks?
- Performance: How to checkpoint without blocking data processing?
- Recovery: How to efficiently restore state after failures?
1.2 Design Goals
SeaTunnel's checkpoint mechanism aims to:
- Guarantee Exactly-Once Semantics: Consistent state snapshots + two-phase commit
- Minimize Overhead: Asynchronous checkpoint, no data processing blocking
- Fast Recovery: Restore from latest checkpoint in seconds
- Distributed Coordination: Coordinate checkpoints across hundreds of tasks
- Pluggable Storage: Support multiple storage backends (HDFS, S3, Local, OSS)
1.3 Theoretical Foundation
SeaTunnel's checkpoint is based on the Chandy-Lamport distributed snapshot algorithm:
Key Idea: Insert special markers (barriers) into data streams. When a task receives barrier:
- Snapshot its local state
- Forward barrier downstream
- Continue processing
Result: Globally consistent snapshot without pausing entire system.
Reference: "Distributed Snapshots: Determining Global States of Distributed Systems" (Chandy & Lamport, 1985)
2. Architecture Design
2.1 Checkpoint Architecture
┌─────────────────────────────────────────────────────────────────┐
│ JobMaster (per job) │
│ │
│ ┌───────────────────────────────────────────────────────┐ │
│ │ CheckpointCoordinator (per pipeline) │ │
│ │ │ │
│ │ • Trigger checkpoint (periodic/manual) │ │
│ │ • Generate checkpoint ID │ │
│ │ • Track pending checkpoints │ │
│ │ • Collect task acknowledgements │ │
│ │ • Persist completed checkpoints │ │
│ │ • Cleanup old checkpoints │ │
│ └───────────────────────────────────────────────────────┘ │
│ │ │
│ │ (Trigger Barrier) │
│ ▼ │
└─────────────────────────────────────────────────────────────────┘
│
│ (CheckpointBarrier)
▼
┌─────────────────────────────────────────────────────────────────┐
│ Worker Nodes │
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ SourceTask 1 │ │ SourceTask 2 │ │ SourceTask N │ │
│ │ │ │ │ │ │ │
│ │ 1. Receive │ │ 1. Receive │ │ 1. Receive │ │
│ │ Barrier │ │ Barrier │ │ Barrier │ │
│ │ 2. Snapshot │ │ 2. Snapshot │ │ 2. Snapshot │ │
│ │ State │ │ State │ │ State │ │
│ │ 3. ACK │ │ 3. ACK │ │ 3. ACK │ │
│ └──────┬───────┘ └──────┬───────┘ └──────┬───────┘ │
│ │ │ │ │
│ │ (Barrier Propagation) │ │
│ ▼ ▼ ▼ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Transform 1 │ │ Transform 2 │ │ Transform N │ │
│ │ │ │ │ │ │ │
│ │ 1. Receive │ │ 1. Receive │ │ 1. Receive │ │
│ │ Barrier │ │ Barrier │ │ Barrier │ │
│ │ 2. Snapshot │ │ 2. Snapshot │ │ 2. Snapshot │ │
│ │ State │ │ State │ │ State │ │
│ │ 3. ACK │ │ 3. ACK │ │ 3. ACK │ │
│ │ 4. Forward │ │ 4. Forward │ │ 4. Forward │ │
│ └──────┬───────┘ └──────┬───────┘ └──────┬───────┘ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ SinkTask 1 │ │ SinkTask 2 │ │ SinkTask N │ │
│ │ │ │ │ │ │ │
│ │ 1. Receive │ │ 1. Receive │ │ 1. Receive │ │
│ │ Barrier │ │ Barrier │ │ Barrier │ │
│ │ 2. Prepare │ │ 2. Prepare │ │ 2. Prepare │ │
│ │ Commit │ │ Commit │ │ Commit │ │
│ │ 3. Snapshot │ │ 3. Snapshot │ │ 3. Snapshot │ │
│ │ State │ │ State │ │ State │ │
│ │ 4. ACK │ │ 4. ACK │ │ 4. ACK │ │
│ └──────────────┘ └──────────────┘ └──────────────┘ │
└─────────────────────────────────────────────────────────────────┘
│
│ (All ACKs received)
▼
┌─────────────────────────────────────────────────────────────────┐
│ CheckpointStorage │
│ (HDFS / S3 / Local / OSS) │
│ │
│ CompletedCheckpoint { │
│ checkpointId: 123 │
│ taskStates: { │
│ SourceTask-1: { splits: [...], offsets: [...] } │
│ SinkTask-1: { commitInfo: XidInfo(...) } │
│ ... │
│ } │
│ } │
└─────────────────────────────────────────────────────────────────┘
2.2 Key Data Structures
CheckpointCoordinator
public class CheckpointCoordinator {
// Checkpoint ID generator
private final CheckpointIDCounter checkpointIdCounter;
// Checkpoint execution plan
private final CheckpointPlan checkpointPlan;
// Pending checkpoints (in progress)
private final Map<Long, PendingCheckpoint> pendingCheckpoints;
// Completed checkpoints (success)
private final ArrayDeque<String> completedCheckpointIds;
// Latest completed checkpoint
private CompletedCheckpoint latestCompletedCheckpoint;
// Checkpoint storage
private final CheckpointStorage checkpointStorage;
// Configuration
private final long checkpointInterval; // Trigger interval (ms)
private final long checkpointTimeout; // Timeout (ms)
private final int minPauseBetweenCheckpoints; // Min pause (ms)
}
PendingCheckpoint
Represents in-progress checkpoint.
public class PendingCheckpoint {
private final long checkpointId;
private final CheckpointType checkpointType; // CHECKPOINT or SAVEPOINT
private final long triggerTimestamp;
// Tasks that haven't acknowledged yet
private final Set<Long> notYetAcknowledgedTasks;
// Collected action states (from task ACKs)
private final Map<ActionStateKey, ActionState> actionStates;
// Task statistics (records processed, bytes, etc.)
private final Map<Long, TaskStatistics> taskStatistics;
// Future completed when all tasks ACK
private final CompletableFuture<CompletedCheckpoint> completableFuture;
/**
* Called when task acknowledges checkpoint
*/
public void acknowledgeTask(long taskId, List<ActionSubtaskState> states,
TaskStatistics statistics) {
notYetAcknowledgedTasks.remove(taskId);
// Collect states
for (ActionSubtaskState state : states) {
actionStates.computeIfAbsent(state.getKey(), k -> new ActionState())
.putSubtaskState(state);
}
// Collect statistics
taskStatistics.put(taskId, statistics);
// Check if all tasks acknowledged
if (notYetAcknowledgedTasks.isEmpty()) {
completeCheckpoint();
}
}
private void completeCheckpoint() {
CompletedCheckpoint completed = new CompletedCheckpoint(
checkpointId, actionStates, taskStatistics, System.currentTimeMillis()
);
completableFuture.complete(completed);
}
}
CompletedCheckpoint
Persisted checkpoint data.
public class CompletedCheckpoint implements Serializable {
private final long checkpointId;
private final Map<ActionStateKey, ActionState> taskStates;
private final Map<Long, TaskStatistics> taskStatistics;
private final long completedTimestamp;
}
public class ActionState implements Serializable {
private final ActionStateKey key; // (pipelineId, actionId)
private final Map<Integer, ActionSubtaskState> subtaskStates;
}
public class ActionSubtaskState implements Serializable {
private final int subtaskIndex;
private final byte[] state; // Serialized state
}
2.3 CheckpointStorage
Abstraction for checkpoint persistence.
public interface CheckpointStorage {
/**
* Store completed checkpoint
*/
void storeCheckpoint(CompletedCheckpoint checkpoint) throws IOException;
/**
* Get latest checkpoint
*/
Optional<CompletedCheckpoint> getLatestCheckpoint() throws IOException;
/**
* Get specific checkpoint by ID
*/
Optional<CompletedCheckpoint> getCheckpoint(long checkpointId) throws IOException;
/**
* Delete old checkpoint
*/
void deleteCheckpoint(long checkpointId) throws IOException;
}
Implementations:
LocalFileStorage: Local file system (testing)HdfsStorage: Hadoop FileSystem-based backend; can work with HDFS/S3A/etc depending on Hadoop configuration
Note: S3 and OSS support are provided through Hadoop FileSystem configuration (e.g., fs.s3a.impl) rather than separate CheckpointStorage implementations.
3. Checkpoint Flow
3.1 Trigger Checkpoint
Trigger Conditions:
- Checkpoint interval elapsed (e.g., 60 seconds)
- Minimum pause between checkpoints elapsed (e.g., 10 seconds)
- Number of concurrent checkpoints < max (e.g., 1)
- No checkpoint in progress (for single concurrent)
3.2 Barrier Propagation
Barrier Flow Rules:
- Source Tasks: Start of pipeline, receive barrier from coordinator
- Transform Tasks: Receive from upstream, snapshot, forward downstream
- Sink Tasks: End of pipeline, receive from upstream, snapshot, no forward
Barrier Alignment (for tasks with multiple inputs):
// Task with 2 inputs
Input 1: ──data──data──[barrier-123]──data──data──
│ Wait!
Input 2: ──data──data──data──data──[barrier-123]──
│
▼
Both barriers received, snapshot state
3.3 State Snapshot
Each task type snapshots different state:
SourceTask:
@Override
public void triggerBarrier(long checkpointId) {
// 1. Snapshot SourceReader state (splits + offsets)
List<byte[]> states = sourceFlowLifeCycle.snapshotState(checkpointId);
// 2. Create ActionSubtaskState
ActionSubtaskState state = new ActionSubtaskState(subtaskIndex, states);
// 3. Send ACK to coordinator
sendAcknowledgement(checkpointId, Collections.singletonList(state));
// 4. Forward barrier downstream
forwardBarrierToDownstream(checkpointId);
}
TransformTask:
@Override
public void triggerBarrier(long checkpointId) {
// 1. Snapshot Transform state (usually stateless, empty state)
List<byte[]> states = transformFlowLifeCycle.snapshotState(checkpointId);
// 2. Create ActionSubtaskState
ActionSubtaskState state = new ActionSubtaskState(subtaskIndex, states);
// 3. Send ACK
sendAcknowledgement(checkpointId, Collections.singletonList(state));
// 4. Forward barrier
forwardBarrierToDownstream(checkpointId);
}
SinkTask:
@Override
public void triggerBarrier(long checkpointId) {
// 1. Prepare commit (TWO-PHASE COMMIT)
Optional<CommitInfoT> commitInfo = sinkWriter.prepareCommit(checkpointId);
// 2. Snapshot writer state
List<StateT> writerStates = sinkWriter.snapshotState(checkpointId);
// 3. Create ActionSubtaskState (includes both commit info and state)
ActionSubtaskState state = new ActionSubtaskState(
subtaskIndex,
serialize(writerStates),
commitInfo.orElse(null)
);
// 4. Send ACK (NO forwarding - end of pipeline)
sendAcknowledgement(checkpointId, Collections.singletonList(state));
}
3.4 Checkpoint Completion
Completion Steps:
- All tasks acknowledged
- Create
CompletedCheckpointfromPendingCheckpoint - Persist checkpoint to storage
- Trigger sink commit (two-phase commit)
- Notify all tasks of completion
- Cleanup old checkpoints (retain last N)
3.5 Checkpoint Timeout
// CheckpointCoordinator
private void startCheckpointTimeout(long checkpointId, long timeoutMs) {
scheduledExecutor.schedule(() -> {
PendingCheckpoint pending = pendingCheckpoints.get(checkpointId);
if (pending != null && !pending.isCompleted()) {
LOG.warn("Checkpoint {} timeout after {}ms, {} tasks not yet acknowledged",
checkpointId, timeoutMs, pending.getNotYetAcknowledgedTasks());
// Fail checkpoint
pending.abort();
pendingCheckpoints.remove(checkpointId);
// Trigger job failover if needed
handleCheckpointFailure(checkpointId);
}
}, timeoutMs, TimeUnit.MILLISECONDS);
}
Timeout Handling:
- Default timeout: 10 minutes
- If timeout, checkpoint fails
- Job continues with previous checkpoint
- Next checkpoint will be triggered per schedule
4. Recovery Process
4.1 Restore from Checkpoint
Restore Steps:
- JobMaster retrieves latest
CompletedCheckpointfrom storage - Extract state for each task (by ActionStateKey and subtaskIndex)
- Deploy tasks with
NotifyTaskRestoreOperationcontaining state - Tasks restore state:
- SourceReader: Restore splits and offsets, seek to position
- Transform: Restore transform state (usually none)
- SinkWriter: Restore writer state, may have uncommitted transactions
- Tasks transition to READY_START state
- Job resumes execution
Example: JDBC Source Recovery:
public class JdbcSourceReader {
@Override
public void restoreState(List<JdbcSourceState> states) {
for (JdbcSourceState state : states) {
JdbcSourceSplit split = state.getSplit();
long offset = state.getCurrentOffset();
// Restore split with offset
pendingSplits.add(split);
// When processing split, start from offset
String query = split.getQuery() + " OFFSET " + offset;
}
}
}
4.2 Exactly-Once Recovery
Combination of checkpoint restore + sink two-phase commit ensures exactly-once:
Checkpoint N (completed):
Source offsets: [100, 200, 300]
Sink prepared commits: [XID-1, XID-2, XID-3]
Sink committer commits XID-1, XID-2, XID-3
↓ [Failure]
Recovery from Checkpoint N:
1. Restore source offsets: [100, 200, 300]
2. Sources start reading from offset 100, 200, 300
3. Sink writers restore state (may have uncommitted XIDs)
4. Sink committer retries committing XIDs (idempotent)
Result: Records 0-99, 100-199, 200-299 committed exactly once
Records from 100+ reprocessed but not duplicated (idempotent commit)
5. Configuration and Tuning
5.1 Checkpoint Configuration
env {
# Enable checkpoint
checkpoint.interval = 60000 # Trigger every 60 seconds
# Checkpoint timeout
checkpoint.timeout = 600000 # 10 minutes
# Min pause between checkpoints
min-pause = 10000 # 10 seconds
}
Checkpoint storage is configured on the engine side (e.g., config/seatunnel.yaml under seatunnel.engine.checkpoint.storage), rather than as job-level env options.
5.2 Tuning Guidelines
Checkpoint Interval:
- Shorter interval: Faster recovery, higher overhead
- Longer interval: Lower overhead, slower recovery
Trade-offs:
- Shorter interval → More frequent I/O → Higher storage cost
- Longer interval → Less overhead → Longer recovery time
Rule of Thumb: Set interval to tolerable recovery time (data loss window).
Checkpoint Timeout:
- Should be >> checkpoint interval
- Depends on state size and storage speed
- Choose based on end-to-end latency, state size, and checkpoint storage throughput
Storage Selection (SeaTunnel Engine):
localfile(LocalFileStorage): local filesystem, non-HAhdfs(HdfsStorage): Hadoop FileSystem-based backend; can work with HDFS/S3A/etc depending on Hadoop configuration
6. Performance Optimization
6.1 Async Checkpoint
State snapshot doesn't block data processing:
public class AsyncSnapshotSupport {
@Override
public void snapshotState(long checkpointId) {
// 1. Create snapshot of current state (fast, in-memory copy)
StateSnapshot snapshot = createSnapshot();
// 2. Continue data processing (doesn't wait for serialization/upload)
// ...
// 3. Async serialize and upload
CompletableFuture.runAsync(() -> {
byte[] serialized = serialize(snapshot);
checkpointStorage.upload(checkpointId, serialized);
}, executorService);
}
}
6.2 Incremental Checkpoint (Future)
Only checkpoint changed state:
// Full checkpoint (first)
Checkpoint 1: State = 1GB → Upload 1GB
// Incremental checkpoints (subsequent)
Checkpoint 2: State = 1.1GB → Upload 100MB (delta)
Checkpoint 3: State = 1.05GB → Upload 0MB (deletion doesn't upload)
Benefits:
- Reduce checkpoint time
- Lower storage I/O
- Faster checkpoint completion
Challenges:
- More complex state management
- Need to track state changes
- Restore requires chain of deltas
6.3 Local State Backend (Future)
Store hot state locally, checkpoint only summary:
// RocksDB local state backend
class RocksDBStateBackend {
private final RocksDB rocksDB; // Fast local SSD
@Override
public void put(String key, byte[] value) {
rocksDB.put(key.getBytes(), value); // Local write (fast)
}
@Override
public byte[] snapshotState() {
// Only checkpoint RocksDB snapshot reference
return rocksDB.createCheckpoint().getBytes();
}
}
7. Best Practices
7.1 State Size Optimization
1. Keep State Small:
// ❌ BAD: Buffer entire dataset
class BadSourceReader {
private List<SeaTunnelRow> bufferedRows = new ArrayList<>(); // May be huge!
List<State> snapshotState() {
return serialize(bufferedRows); // Huge state
}
}
// ✅ GOOD: Track offset only
class GoodSourceReader {
private long currentOffset = 0;
List<State> snapshotState() {
return serialize(currentOffset); // Small state
}
}
2. Use Efficient Serialization:
- Prefer Protobuf, Kryo over Java serialization
- Compress large state (gzip, snappy)
7.2 Monitoring
Key Metrics:
checkpoint_duration: Time from trigger to completioncheckpoint_size: Size of persisted checkpointcheckpoint_failure_rate: Percentage of failed checkpointscheckpoint_alignment_duration: Time spent aligning barriers
Alerting:
- Alert if
checkpoint_duration> threshold (e.g., 5 minutes) - Alert if
checkpoint_failure_rate> 10% - Alert if no checkpoint completed in 2x interval
7.3 Troubleshooting
Problem: Checkpoint timeout
Possible Causes:
- Task stuck (slow data processing)
- Large state (slow serialization/upload)
- Slow storage (network/disk I/O)
- Barrier alignment slow (skewed data)
Solutions:
- Increase checkpoint timeout
- Optimize state size
- Use faster storage
- Tune parallelism
Problem: High checkpoint overhead
Possible Causes:
- Checkpoint interval too short
- Large state size
- Slow storage
Solutions:
- Increase checkpoint interval
- Optimize state size
- Enable incremental checkpoint (when available)
8. Related Resources
9. References
Key Source Files
Academic Papers
- Chandy, K. M., & Lamport, L. (1985). "Distributed Snapshots: Determining Global States of Distributed Systems"
- Carbone, P., et al. (2017). "State Management in Apache Flink"