DAG Execution Model
1. Overview
1.1 Problem Background
Distributed data processing requires transforming user intentions into executable distributed tasks:
- Abstraction Levels: How to separate logical intent from physical execution?
- Optimization: How to optimize task placement and data shuffling?
- Pipeline: How to execute complex DAGs with multiple sources/sinks?
- Parallelism: How to determine task parallelism and distribution?
- Fault Isolation: How to limit failure impact to affected components?
1.2 Design Goals
SeaTunnel's DAG execution model aims to:
- Separate Concerns: Logical planning (user intent) vs physical execution (runtime details)
- Enable Optimization: Task fusion, pipeline分割, resource allocation
- Support Complex Topologies: Multiple sources, sinks, branches, joins
- Facilitate Fault Tolerance: Clear failure boundaries with independent checkpoints
- Maximize Parallelism: Efficient parallel execution with minimal coordination
1.3 Execution Model Overview
User Config (HOCON)
│
▼
┌─────────────────────┐
│ LogicalDag │ Logical Plan (What to do)
│ • LogicalVertex │ - Source/Transform/Sink actions
│ • LogicalEdge │ - Data dependencies
│ • Parallelism │ - Logical parallelism
└─────────────────────┘
│ (Plan Generation)
▼
┌─────────────────────┐
│ PhysicalPlan │ Physical Plan (How to execute)
│ • SubPlan[] │ - Multiple pipelines
│ • Resources │ - Resource requirements
│ • Scheduling │ - Deployment strategy
└─────────────────────┘
│ (Pipeline Split)
▼
┌─────────────────────┐
│ SubPlan (Pipeline) │ Independent Execution Unit
│ • PhysicalVertex[] │ - Parallel task instances
│ • CheckpointCoord │ - Independent checkpointing
│ • PipelineLocation │ - Unique identifier
└─────────────────────┘
│ (Task Deployment)
▼
┌─────────────────────┐
│ PhysicalVertex │ Deployed Task Group
│ • TaskGroup │ - Co-located tasks (fusion)
│ • SlotProfile │ - Assigned resource slot
│ • ExecutionState │ - Running state
└─────────────────────┘
│ (Execution)
▼
┌─────────────────────┐
│ SeaTunnelTask │ Actual Execution
│ • Source/Transform │ - Data processing
│ • /Sink Logic │ - State management
└─────────────────────┘
2. LogicalDag: User Intent
2.1 Structure
LogicalDag represents the user's job configuration in an engine-independent way.
public class LogicalDag {
// Vertices: Source, Transform, Sink actions
private final Map<Long, LogicalVertex> logicalVertexMap;
// Edges: Data flow dependencies
private final Set<LogicalEdge> edges;
// Job configuration
private final JobConfig jobConfig;
}
2.2 LogicalVertex
Represents a single action (Source/Transform/Sink) with parallelism.
public class LogicalVertex {
private final long vertexId;
private final Action action; // SourceAction, TransformChainAction, SinkAction
private final int parallelism; // Number of parallel instances
}
Action Types:
- SourceAction: Wraps
SeaTunnelSource, producesCatalogTable - TransformChainAction: Chain of
SeaTunnelTransform, transforms schema - SinkAction: Wraps
SeaTunnelSink, consumesCatalogTable
Example:
// From config:
// source { JDBC { ... parallelism = 4 } }
// transform { Sql { ... parallelism = 8 } }
// sink { Elasticsearch { ... parallelism = 2 } }
LogicalVertex sourceVertex = new LogicalVertex(
vertexId: 1,
action: new SourceAction(jdbcSource),
parallelism: 4
);
LogicalVertex transformVertex = new LogicalVertex(
vertexId: 2,
action: new TransformChainAction(sqlTransform),
parallelism: 8
);
LogicalVertex sinkVertex = new LogicalVertex(
vertexId: 3,
action: new SinkAction(esSink),
parallelism: 2
);
2.3 LogicalEdge
Represents data flow between actions.
public class LogicalEdge {
private final long inputVertexId; // Upstream vertex
private final long targetVertexId; // Downstream vertex
}
Example:
// Source → Transform edge
LogicalEdge edge1 = new LogicalEdge(
inputVertexId: 1, // JDBC Source
targetVertexId: 2 // SQL Transform
);
// Transform → Sink edge
LogicalEdge edge2 = new LogicalEdge(
inputVertexId: 2, // SQL Transform
targetVertexId: 3 // Elasticsearch Sink
);
2.4 LogicalDag Creation
Built from user configuration:
// JobMaster creates LogicalDag
LogicalDag logicalDag = LogicalDagGenerator.generate(jobConfig);
Process:
- Parse HOCON config (source, transform, sink sections)
- Create
Actionobjects for each configured component - Infer data flow from config structure
- Validate schema compatibility
- Build
LogicalDagobject
Example Config → LogicalDag:
env {
parallelism = 4
}
source {
JDBC {
url = "jdbc:mysql://..."
query = "SELECT * FROM orders"
}
}
transform {
Sql {
query = "SELECT order_id, SUM(amount) FROM this GROUP BY order_id"
}
}
sink {
Elasticsearch {
hosts = ["es-host:9200"]
index = "orders_summary"
}
}
Generated LogicalDag:
Vertex 1 (JDBC Source, parallelism=4)
│
▼
Vertex 2 (SQL Transform, parallelism=4)
│
▼
Vertex 3 (Elasticsearch Sink, parallelism=4)
3. PhysicalPlan: Execution Strategy
3.1 Structure
PhysicalPlan describes how to execute the LogicalDag on distributed workers.
public class PhysicalPlan {
// Multiple pipelines (SubPlans)
private final List<SubPlan> pipelineList;
// Immutable job information
private final JobImmutableInformation jobImmutableInformation;
// Distributed state (Hazelcast IMap)
private final IMap<Long, JobStatus> runningJobStateIMap;
private final IMap<Long, Long> runningJobStateTimestampsIMap;
// Job completion future
private final CompletableFuture<JobResult> jobEndFuture;
}
3.2 Pipeline Splitting
A LogicalDag is split into multiple Pipelines (SubPlans) by the current PipelineGenerator implementation:
- Unrelated Subgraphs: Disconnected parts of the DAG become independent pipelines
- Multiple-Input Vertices: If a connected subgraph contains a vertex with multiple upstream inputs, the generator splits the subgraph into multiple linear pipelines along each source→sink path and clones vertices where needed
Note: Multiple sinks (branching) do not necessarily create multiple pipelines. When there is no multiple-input vertex, a branching graph is usually kept as a single pipeline.
Example 1: Simple Linear Pipeline:
source { JDBC { } }
transform { Sql { } }
sink { Elasticsearch { } }
Generated: 1 Pipeline
Pipeline 1: [JDBC Source] → [SQL Transform] → [Elasticsearch Sink]
Example 2: Multiple Sources:
source {
JDBC { plugin_output = "orders" }
Kafka { plugin_output = "events" }
}
transform {
Sql { query = "SELECT * FROM orders UNION SELECT * FROM events" }
}
sink {
Elasticsearch { }
}
Generated: 2 Pipelines
Pipeline 1: [JDBC Source] → [SQL Transform] → [Elasticsearch Sink]
Pipeline 2: [Kafka Source] → [SQL Transform] → [Elasticsearch Sink]
Example 3: Multiple Sinks:
source {
MySQL-CDC { }
}
sink {
Elasticsearch { plugin_input = "MySQL-CDC" }
JDBC { plugin_input = "MySQL-CDC" }
}
Generated: 1 Pipeline
Pipeline 1: [MySQL-CDC Source] → ([Elasticsearch Sink], [JDBC Sink])
3.3 PhysicalPlan Generation
// In JobMaster
PhysicalPlan physicalPlan = new PhysicalPlanGenerator(logicalDag, resourceManager)
.generate();
Steps:
- Analyze LogicalDag: Identify sources, sinks, and dependencies
- Split into Pipelines: Create SubPlan for each pipeline
- Generate PhysicalVertices: Create parallel instances for each action
- Allocate Resources: Request slots from ResourceManager
- Assign Tasks: Map PhysicalVertices to slots
- Create Coordinators: Setup CheckpointCoordinator per pipeline
4. SubPlan (Pipeline)
4.1 Structure
SubPlan represents an independently executing pipeline.
public class SubPlan {
private final int pipelineId;
private final PipelineLocation pipelineLocation;
// All task instances in this pipeline
private final List<PhysicalVertex> physicalVertexList;
// Coordinator tasks (Enumerator, Committer)
private final List<PhysicalVertex> coordinatorVertexList;
// Checkpoint coordinator for this pipeline
private final CheckpointCoordinator checkpointCoordinator;
// Execution state
private PipelineStatus pipelineStatus;
}
4.2 PhysicalVertex List
Each LogicalVertex with parallelism N generates N PhysicalVertices.
Example:
LogicalVertex: JDBC Source (parallelism = 4)
↓
PhysicalVertices:
- PhysicalVertex (subtask 0, slot 1)
- PhysicalVertex (subtask 1, slot 2)
- PhysicalVertex (subtask 2, slot 3)
- PhysicalVertex (subtask 3, slot 4)
4.3 Coordinator Vertices
Special vertices for coordination tasks:
- SourceSplitEnumerator: Runs on master, assigns splits to readers
- SinkCommitter: Runs on master, coordinates commits
- SinkAggregatedCommitter: Runs on master, global commit coordination
Example:
SubPlan for JDBC → Transform → Elasticsearch:
physicalVertexList:
- JdbcSourceTask (4 instances)
- TransformTask (4 instances)
- ElasticsearchSinkTask (4 instances)
coordinatorVertexList:
- JdbcSourceSplitEnumerator (1 instance, master)
- ElasticsearchSinkCommitter (1 instance, master)
4.4 Independent Checkpointing
Each pipeline has its own CheckpointCoordinator:
Benefits:
- Independent checkpoint intervals
- Isolated failure domains
- Reduced coordination overhead
- Simpler barrier alignment
Example:
Pipeline 1 (JDBC → ES):
CheckpointCoordinator triggers every 60s
Manages checkpoints for JDBC and ES tasks only
Pipeline 2 (Kafka → JDBC):
CheckpointCoordinator triggers every 30s (different interval)
Manages checkpoints for Kafka and JDBC tasks only
5. PhysicalVertex: Deployed Task
5.1 Structure
PhysicalVertex represents a deployed task instance.
public class PhysicalVertex {
private final TaskGroupLocation taskGroupLocation;
private final TaskGroupDefaultImpl taskGroup;
// Assigned resource slot
private final SlotProfile slotProfile;
// Execution state (CREATED, RUNNING, FAILED, etc.)
private ExecutionState currentExecutionState;
// Plugin jars (for class loader isolation)
private final List<Set<URL>> pluginJarsUrls;
}
5.2 TaskGroup: Task Fusion
Multiple tasks can be fused into a single TaskGroup for efficiency.
public class TaskGroupDefaultImpl implements TaskGroup {
private final TaskGroupLocation taskGroupLocation;
// Multiple tasks in this group
private final Set<Task> tasks;
// Shared thread pool
private final ExecutorService executorService;
// Shared network buffers
private final Map<Long, BlockingQueue<Record<?>>> internalChannels;
}
Fusion Conditions:
- Same parallelism
- Sequential dependency (A → B)
- No data shuffle required
Example (with fusion):
LogicalDag:
Source (parallelism=4) → Transform (parallelism=4) → Sink (parallelism=4)
Without Fusion:
12 separate tasks (4 + 4 + 4)
Network overhead for Source → Transform and Transform → Sink
With Fusion:
4 TaskGroups, each containing:
[SourceTask → TransformTask → SinkTask] (single thread, shared memory)
Benefits:
- Reduced network serialization/deserialization
- Better CPU cache locality
- Lower memory footprint
- Simplified deployment
5.3 Slot Assignment
Each PhysicalVertex is assigned a SlotProfile:
public class SlotProfile {
private final long slotID;
private final Address workerAddress;
private final ResourceProfile resourceProfile; // CPU, memory
}
Assignment Process:
- JobMaster requests slots from ResourceManager
- ResourceManager selects workers based on strategy (random, slot ratio, load)
- ResourceManager allocates slots and returns SlotProfiles
- JobMaster assigns SlotProfiles to PhysicalVertices
- JobMaster deploys tasks via
DeployTaskOperation
6. Task Deployment and Execution
6.1 Deployment Flow
6.2 Task Execution
Each SeaTunnelTask executes its assigned action:
SourceSeaTunnelTask:
while (isRunning()) {
// Poll data from SourceReader
sourceReader.pollNext(collector);
// Handle checkpoint barriers
if (checkpointTriggered) {
triggerBarrier(checkpointId);
}
}
TransformSeaTunnelTask:
while (isRunning()) {
// Read from input queue
Record record = inputQueue.take();
// Apply transform
Record transformed = transform.map(record);
// Write to output queue
outputQueue.put(transformed);
}
SinkSeaTunnelTask:
while (isRunning()) {
// Read from input queue
Record record = inputQueue.take();
// Write to sink
sinkWriter.write(record);
// Handle checkpoint barriers
if (barrierReceived) {
commitInfo = sinkWriter.prepareCommit(checkpointId);
snapshotState(checkpointId);
}
}
7. Optimization Strategies
7.1 Task Fusion
When to Fuse:
- Same parallelism
- Sequential operators (no branching)
- No shuffle boundary
When NOT to Fuse:
- Different parallelism (e.g., source=4, sink=8)
- Branching DAG (one source, multiple sinks)
- Shuffle required (e.g., GROUP BY, JOIN)
Task fusion behavior and controls are engine-implementation specific. Avoid relying on undocumented env.job.mode values in architecture examples.
7.2 Parallelism Inference
Parallelism resolution (SeaTunnel Engine / Zeta):
- If an action/connector config specifies
parallelism, it takes precedence - Otherwise use
env.parallelism(default is1)
Example:
env { parallelism = 1 }
source {
JDBC { parallelism = 4 } # Explicit
}
transform {
Sql { } # Inferred: 4 (from source)
}
sink {
Elasticsearch { } # Inferred: 4 (from transform)
}
7.3 Resource Allocation
Slot Calculation:
Required Slots = Sum of all task parallelism
Example:
Source (parallelism=4) + Transform (parallelism=4) + Sink (parallelism=2)
= 10 slots required
With Fusion:
TaskGroup (parallelism=4, fusion[Source+Transform]) + Sink (parallelism=2)
= 6 slots required
Resource Profile:
ResourceProfile profile =
new ResourceProfile(
CPU.of(1), // 1 CPU core
Memory.of(512 * 1024 * 1024L) // 512MB heap (bytes)
);
8. Failure Handling
8.1 Task Failure
Detection:
- Task throws exception
- Heartbeat timeout
Recovery:
- Mark task as FAILED
- Fail entire pipeline (conservative)
- Restore from latest checkpoint
- Reallocate resources
- Redeploy and restart pipeline
8.2 Pipeline Failure Isolation
Key Insight: Pipeline failures are isolated.
Example:
Job with 2 pipelines:
Pipeline 1: JDBC → ES (RUNNING)
Pipeline 2: Kafka → JDBC (FAILED)
Result:
Pipeline 2 restarts from checkpoint
Pipeline 1 continues unaffected
Benefits:
- Reduced blast radius
- Faster recovery (only failed pipeline)
- Better resource utilization
9. Monitoring and Observability
9.1 Key Metrics
Pipeline-Level:
- Pipeline status and lifecycle transitions (CREATED / RUNNING / FINISHED / FAILED)
- Task counts and placement across workers/slots
- Checkpoint progress (latest checkpoint id, duration, failures)
Task-Level:
- Task status and restart counters
- Record/byte throughput (in/out)
- Backpressure / queueing indicators (engine-dependent)
9.2 Visualization
Job: mysql-to-es
│
├── Pipeline 1 (mysql-cdc → elasticsearch)
│ ├── PhysicalVertex 0 [RUNNING] @ worker-1:slot-1
│ ├── PhysicalVertex 1 [RUNNING] @ worker-2:slot-1
│ ├── PhysicalVertex 2 [RUNNING] @ worker-3:slot-1
│ └── PhysicalVertex 3 [RUNNING] @ worker-4:slot-1
│
└── Pipeline 2 (mysql-cdc → jdbc)
├── PhysicalVertex 0 [RUNNING] @ worker-1:slot-2
└── PhysicalVertex 1 [RUNNING] @ worker-2:slot-2
10. Best Practices
10.1 Parallelism Configuration
Rule of Thumb:
Parallelism = min(
data partitions,
available slots,
target throughput / single-task throughput
)
Examples:
- JDBC Source: Set to number of DB partitions (e.g., 8 partitions → parallelism=8)
- Kafka Source: Set to number of partitions (e.g., 32 partitions → parallelism=32)
- File Source: Set to number of files or file splits
- CPU-Intensive Transform: Set to number of CPU cores
- I/O-Intensive Sink: Set based on target system capacity
10.2 Pipeline Design
Keep Pipelines Simple:
- Prefer linear pipelines (Source → Transform → Sink)
- Avoid complex branching when possible
- Use multiple jobs for completely independent workflows
Use Multiple Jobs When:
- Different checkpoint intervals needed
- Different resource requirements
- Independent failure domains desired
10.3 Troubleshooting
Problem: Tasks not starting
Check:
- Enough available slots? (
required_slots <= available_slots) - Resource profile reasonable? (not requesting 100 CPU cores)
- Tag filters correct? (if using tag-based assignment)
Problem: Low throughput
Check:
- Parallelism too low? (increase parallelism)
- Task fusion disabled? (enable for better performance)
- Checkpoint interval too short? (increase interval)
11. Related Resources
12. References
Key Source Files
Further Reading
- Google Borg Paper - Task scheduling inspiration
- Apache Flink JobGraph
- Spark DAG Scheduler