DAG 执行模型
1. 概述
1.1 问题背景
分布式数据处理需要将用户意图转换为可执行的分布式任务:
- 抽象层次: 如何分离逻辑意图与物理执行?
- 优化: 如何优化任务放置和数据混洗?
- 流水线: 如何执行具有多个数据 Source/Sink 的复杂 DAG?
- 并行度: 如何确定任务并行度和分布?
- 故障隔离: 如何将故障影响限制在受影响的组件内?
1.2 设计目标
SeaTunnel 的 DAG 执行模型旨在:
- 关注点分离: 逻辑规划(用户意图) vs 物理执行(运行时细节)
- 支持优化: 任务融合、流水线分割、资源分配
- 支持复杂拓扑: 多个数据源、目标端、分支、连接
- 促进容错: 清晰的故障边界与独立检查点
- 最大化并行度: 高效并行执行,最少协调开销
1.3 执行模型概览
2. LogicalDag: 用户意图
2.1 结构
LogicalDag 以引擎无关的方式表示用户的作业配置。
LogicalDag 的核心组成:
- logicalVertexMap: 顶点集合(每个顶点对应一个 Source/Transform/Sink 动作)
- edges: 边集合(描述数据流依赖关系)
- jobConfig: 作业级配置(例如并行度默认值、容错/资源/运行参数)
2.2 LogicalVertex
表示单个动作(数据 Source/转换器/Sink 目标端)及其并行度。
一个 LogicalVertex 通常包含:
- vertexId: 顶点唯一标识
- action: 动作类型(SourceAction / TransformChainAction / SinkAction)
- parallelism: 并行实例数量(若未显式配置,可能由引擎推断)
动作类型:
- SourceAction: 封装
SeaTunnelSource,生产CatalogTable - TransformChainAction:
SeaTunnelTransform链,转换模式 - SinkAction: 封装
SeaTunnelSink,消费CatalogTable
示例:
来自配置的直观映射关系:
- Vertex 1: JDBC Source,parallelism=4
- Vertex 2: SQL Transform,parallelism=8
- Vertex 3: Elasticsearch Sink,parallelism=2
2.3 LogicalEdge
表示动作之间的数据流。
一条 LogicalEdge 通常只需要描述:
- inputVertexId: 上游顶点
- targetVertexId: 下游顶点
示例:
典型线性拓扑中的边:
- JDBC Source(1) → SQL Transform(2)
- SQL Transform(2) → Elasticsearch Sink(3)
2.4 LogicalDag 创建
从用户配置构建:
LogicalDag 在作业提交/启动阶段由作业执行环境解析配置生成(可能发生在客户端或服务端),随后作为作业不可变信息的一部分交由 JobMaster 管理执行。
过程:
- 解析 HOCON 配置(source、transform、sink 部分)
- 为每个配置的组件创建
Action对象 - 从配置结构推断数据流
- 验证模式兼容性
- 构建
LogicalDag对象
示例配置 → LogicalDag:
env {
parallelism = 4
}
source {
JDBC {
url = "jdbc:mysql://..."
query = "SELECT * FROM orders"
}
}
transform {
Sql {
query = "SELECT order_id, SUM(amount) FROM this GROUP BY order_id"
}
}
sink {
Elasticsearch {
hosts = ["es-host:9200"]
index = "orders_summary"
}
}
生成的 LogicalDag:
3. PhysicalPlan: 执行策略
3.1 结构
PhysicalPlan 描述如何在分布式工作节点上执行 LogicalDag。
PhysicalPlan 的核心信息通常包括:
- pipelineList(SubPlans): 由 LogicalDag 切分得到的多个流水线(独立执行单元)
- jobImmutableInformation: 作业不可变信息(例如作业 ID、提交参数、依赖等)
- running state store: 分布式状态存储(用于运行态状态、时间戳、元信息等)
- jobEndFuture: 作业完成信号(用于协调退出、回收资源、返回结果)
3.2 流水线分割
LogicalDag 在生成 ExecutionPlan 时会被组织为一个或多个流水线(Pipeline/SubPlan)。以当前实现为准,主要规则是:
- 按连通性拆分:DAG 中互不相连的子图会被拆成不同流水线。
- 遇到多输入顶点时拆分:当存在“多输入顶点”(某个顶点有多个上游输入,例如 UNION多流汇聚)时,当前实现会沿每条 source→…→sink 的路径拆成多条线性流水线,并对共享顶点做克隆,以降低多输入拓扑在同一流水线内的协调复杂度。
说明:
- 如果仅存在“一个 source 分叉到多个 sink”(多输出/分支),但没有任何多输入顶点,当前实现通常不会仅因为多个 sink 就拆分流水线;该分支拓扑仍可能在同一流水线内执行。
- 更细粒度的切分(例如按并行度/可协调能力)在代码中仍保留 TODO,后续可能演进。
示例 1: 简单线性流水线:
source { JDBC { } }
transform { Sql { } }
sink { Elasticsearch { } }
生成: 1 个流水线 生成结果:
流水线 1:JDBC 数据源 → SQL 转换器 → Elasticsearch 目标端
示例 2: 多个数据源:
source {
JDBC { plugin_output = "orders" }
Kafka { plugin_output = "events" }
}
transform {
Sql { query = "SELECT * FROM orders UNION SELECT * FROM events" }
}
sink {
Elasticsearch { }
}
生成: 2 个流水线 生成结果:
流水线 1:JDBC 数据源 → SQL 转换器 → Elasticsearch 目标端流水线 2:Kafka 数据源 → SQL 转换器 → Elasticsearch 目标端
示例 3: 多个目标端:
source {
MySQL-CDC { }
}
sink {
Elasticsearch { plugin_input = "MySQL-CDC" }
JDBC { plugin_input = "MySQL-CDC" }
}
生成: 通常为 1 个流水线(包含分支) 生成结果:
3.3 PhysicalPlan 生成
PhysicalPlan 通常由 JobMaster 在拿到 LogicalDag 后生成,并结合 ResourceManager 做资源申请与放置。
步骤:
- 分析 LogicalDag: 识别数据源、目标端和依赖关系
- 分割为流水线: 为每个流水线创建 SubPlan
- 生成 PhysicalVertices: 为每个动作创建并行实例
- 分配资源: 从 ResourceManager 请求槽位
- 分配任务: 将 PhysicalVertices 映射到槽位
- 创建协调器: 为每个流水线设置 CheckpointCoordinator
4. SubPlan (流水线)
4.1 结构
SubPlan 表示一个独立执行的流水线。
SubPlan(流水线)通常包含:
- pipelineId/pipelineLocation: 流水线的唯一标识
- physicalVertexList: 此流水线中的并行任务实例列表
- coordinatorVertexList: 协调器类任务(如 split enumerator、聚合提交等单实例协调任务)
- checkpointCoordinator: 本流水线的检查点协调器(独立协调域)
- pipelineStatus: 执行状态(如 CREATED/RUNNING/FAILED/FINISHED)
4.2 PhysicalVertex 列表
每个并行度为 N 的 LogicalVertex 生成 N 个 PhysicalVertices。
示例:
| 逻辑顶点 | 生成的物理顶点 |
|----------|----------------|
| JDBC 数据源 (parallelism = 4) | PhysicalVertex 子任务 0 落在槽位 1,子任务 1 落在槽位 2,子任务 2 落在槽位 3,子任务 3 落在槽位 4 |
4.3 协调器顶点
用于协调任务的特殊顶点:
- SourceSplitEnumerator: 通常以单实例运行,分配分片给读取器(部署位置由引擎调度决定)
- SinkAggregatedCommitter: 当 Sink 提供 aggregated committer 时,通常以单实例运行用于全局提交协调(部署位置由引擎调度决定)
说明:SinkCommitter 的触发方式取决于引擎实现,并不一定体现为独立的协调器顶点;例如在 SeaTunnel Engine 中,committer 可能在 Sink 任务的 checkpoint 回调中被触发。
示例:
| 运行时范围 | 实例 |
|------------|------|
| physicalVertexList | JdbcSourceTask × 4、TransformTask × 4、ElasticsearchSinkTask × 4 |
| coordinatorVertexList | JdbcSourceSplitEnumerator × 1,以及 ElasticsearchSinkAggregatedCommitter × 1(可选) |
4.4 独立检查点
每个流水线都有自己的 CheckpointCoordinator:
优势:
- 独立的检查点间隔
- 隔离的故障域
- 减少协调开销
- 简化屏障对齐
示例:
| 流水线 | 检查点行为 |
|--------|------------|
| 流水线 1 (JDBC → ES) | 一个 CheckpointCoordinator 按作业配置的间隔触发,仅管理 JDBC 和 Elasticsearch 任务 |
| 流水线 2 (Kafka → JDBC) | 另一个协调器同样按作业配置触发,但只管理 Kafka 和 JDBC 任务 |
5. PhysicalVertex: 已部署任务
5.1 结构
PhysicalVertex 表示已部署的任务实例。
PhysicalVertex 关注“一个并行任务实例如何被部署与运行”:
- taskGroupLocation: 任务实例定位信息(含并行子任务序号等)
- taskGroup: 任务融合后的执行单元(见下节)
- slotProfile: 该实例被分配到的槽位(资源容量与位置)
- currentExecutionState: 当前执行状态(CREATED/RUNNING/FAILED 等)
- pluginJarsUrls: 插件依赖(用于类加载隔离)
5.2 TaskGroup: 任务融合
多个任务可以融合到单个 TaskGroup 以提高效率。
TaskGroup 的关键点:
- 将一段可融合的线性算子链(Source/Transform/Sink 的某些组合)放在同一执行单元内
- 通过共享线程/队列/内存通道减少跨算子序列化与网络开销
- 以并行度为单位生成多个 TaskGroup 实例(通常与上游并行度对齐)
融合条件:
- 相同并行度
- 顺序依赖(A → B)
- 不需要数据混洗
示例(带融合):
| 模式 | 执行形态 | 影响 |
|------|----------|------|
| 逻辑 DAG | Source (4) → Transform (4) → Sink (4) | 两种模式都保留相同的业务拓扑 |
| 不融合 | 12 个独立任务,阶段之间走网络传输 | 序列化和网络开销更高 |
| 融合后 | 4 个 TaskGroup,每组执行 SourceTask → TransformTask → SinkTask | 本地性更好,网络成本更低 |
优势:
- 减少网络序列化/反序列化
- 更好的 CPU 缓存局部性
- 更低的内存占用
- 简化部署
5.3 槽位分配
每个 PhysicalVertex 被分配一个 SlotProfile:
SlotProfile 表达“这个任务实例运行在哪里、能用多少资源”。具体字段与语义见资源管理文档。
分配过程:
- JobMaster 从 ResourceManager 请求槽位
- ResourceManager 根据分配策略选择工作节点(例如 RANDOM / SLOT_RATIO / SYSTEM_LOAD)
- ResourceManager 分配槽位并返回 SlotProfiles
- JobMaster 将 SlotProfiles 分配给 PhysicalVertices
- JobMaster 通过
DeployTaskOperation部署任务
6. 任务部署和执行
6.1 部署流程
6.2 任务执行
每个 SeaTunnelTask 执行其分配的动作:
SourceSeaTunnelTask:
执行要点:
- 持续从 SourceReader 拉取/接收数据并发出记录
- 在检查点触发时生成并传播 barrier(屏障),参与流水线级的一致性快照
TransformSeaTunnelTask:
执行要点:
- 从上游通道读取记录
- 应用 transform 逻辑并输出到下游通道
- 若 transform 有状态,需要参与 checkpoint 的状态快照与恢复
SinkSeaTunnelTask:
执行要点:
- 持续消费上游记录并调用 sinkWriter 写入目标端
- 在 barrier 到达时切换到“快照边界”:准备提交信息(prepareCommit(checkpointId))、持久化 writer 状态并将提交信息交给 committer
- 在 checkpoint 成功后由 committer 进行最终提交;失败时由恢复流程回滚/重试(取决于 sink 语义)
7. 优化策略
7.1 任务融合
何时融合:
- 相同并行度
- 顺序算子(无分支)
- 无混洗边界
何时不融合:
- 不同并行度(例如 source=4, sink=8)
- 分支 DAG(一个数据源,多个目标端)
- 需要混洗(例如 GROUP BY、JOIN)
说明:任务融合的具体策略与可配置项以当前引擎实现为准,文档不在此绑定某个固定的配置开关,避免与实际版本不一致。
7.2 并行度推断
并行度以配置为准:
- 若连接器显式配置了
parallelism,则使用连接器配置。 - 否则使用
env.parallelism(默认值为 1)。 - 某些连接器/引擎可能会根据外部系统分区数等信息做额外推断,但这是实现细节,不能在架构文档里写成固定规则。
示例:
source {
JDBC { parallelism = 4 } # 显式
}
transform {
Sql { } # 推断: 4 (来自数据源)
}
sink {
Elasticsearch { } # 推断: 4 (来自转换器)
}
7.3 资源分配
槽位计算: 槽位估算经验:
所需槽位 = 所有任务并行度之和- 不融合示例:
Source (4) + Transform (4) + Sink (2) = 10 个槽位 - 融合示例:
TaskGroup (4, Source+Transform 融合) + Sink (2) = 6 个槽位
说明:资源画像/槽位资源的具体字段、单位与配置路径以引擎侧配置与实现为准;文档不在此给出不存在或不稳定的配置项示例。
8. 故障处理
8.1 任务故障
检测:
- 任务抛出异常
- 心跳超时
恢复:
- 标记任务为 FAILED
- 使整个流水线失败(保守策略)
- 从最新检查点恢复
- 重新分配资源
- 重新部署和重启流水线
8.2 流水线故障隔离
关键见解: 流水线故障是隔离的。
示例:
| 流水线 | 状态 | 恢复结果 |
|--------|------|----------|
| 流水线 1 | RUNNING | 持续运行,不受其他流水线影响 |
| 流水线 2 | FAILED | 从最近一次检查点重启 |
优势:
- 减少爆炸半径
- 更快恢复(仅失败的流水线)
- 更好的资源利用率
9. 监控和可观测性
9.1 关键指标
流水线级别:
pipeline.status: CREATED / RUNNING / FINISHED / FAILEDpipeline.tasks.total: 任务总数pipeline.tasks.running: 当前运行的任务数pipeline.checkpoint.latest_id: 最新检查点 IDpipeline.checkpoint.duration: 检查点持续时间
任务级别:
task.status: 任务执行状态task.records_in: 接收的记录数task.records_out: 发出的记录数task.bytes_in: 接收的字节数task.bytes_out: 发出的字节数
9.2 可视化
| 流水线 | 顶点部署 |
|---|---|
流水线 1 (mysql-cdc → elasticsearch) | PhysicalVertex 0 @ worker-1:slot-1,1 @ worker-2:slot-1,2 @ worker-3:slot-1,3 @ worker-4:slot-1 |
流水线 2 (mysql-cdc → jdbc) | PhysicalVertex 0 @ worker-1:slot-2,1 @ worker-2:slot-2 |
10. 最佳实践
10.1 并行度配置
经验法则: 并行度优先取以下几个约束中的较小值:
- 数据分区数
- 可用槽位数
目标吞吐量 / 单任务吞吐量
示例:
- JDBC 数据源: 设置为数据库分区数(例如 8 个分区 → parallelism=8)
- Kafka 数据源: 设置为分区数(例如 32 个分区 → parallelism=32)
- 文件数据源: 设置为文件数或文件分片数
- CPU 密集型转换器: 设置为 CPU 核心数
- I/O 密集型目标端: 根据目标系统容量设置
10.2 流水线设计
保持流水线简单:
- 优先使用线性流水线(数据源 → 转换器 → 目标端)
- 尽可能避免复杂分支
- 对完全独立的工作流使用多个作业
何时使用多个作业:
- 需要不同的检查点间隔
- 需要不同的资源需求
- 需要独立的故障域
10.3 故障排除
问题: 任务未启动
检查:
- 是否有足够的可用槽位?(
required_slots <= available_slots) - 资源配置文件是否合理?(不要请求 100 个 CPU 核心)
- 标签过滤器是否正确?(如果使用基于标签的分配)
问题: 低吞吐量
检查:
- 并行度是否太低?(增加并行度)
- 任务融合是否被禁用?(启用以获得更好的性能)
- 检查点间隔是否太短?(增加间隔)