MQTT
MQTT Sink 连接器
描述
用于把数据写入 MQTT broker。该连接器基于 Eclipse Paho 客户端库,支持 MQTT 3.1.1 协议。
它适合把 SeaTunnel 作业中的数据发布到物联网设备、边缘网关或轻量消息 broker。消息可以按 JSON 或纯文本格式序列化,并写入指定的 MQTT topic。
主要特性
当 qos = 0 时,该连接器提供最多一次投递;当 qos = 1 时,提供尽力而为的至少一次投递。
默认 clean_session = true,连接器按无状态方式运行。客户端断开连接时,未确认的消息可能丢失。如果需要更强的投递保证,可以设置 clean_session = false,并确保每个 writer 使用稳定且唯一的 client id;也可以配合 SeaTunnel 的上游重放能力使用。
支持引擎
SeaTunnel Zeta
Flink
Spark
选项
| 名称 | 类型 | 是否必须 | 默认值 |
|---|---|---|---|
| url | string | 是 | - |
| topic | string | 是 | - |
| username | string | 否 | - |
| password | string | 否 | - |
| qos | int | 否 | 1 |
| format | string | 否 | json |
| field_delimiter | string | 否 | , |
| batch_size | int | 否 | 1 |
| retry_timeout | int | 否 | 5000 |
| connection_timeout | int | 否 | 30 |
| clean_session | boolean | 否 | true |
| common-options | 否 | - |
url [string]
MQTT broker 连接地址,必须包含协议、主机和端口。
示例:tcp://broker.example.com:1883
topic [string]
要发布消息的 MQTT topic。
示例:iot/sensors/temperature
username [string]
MQTT broker 认证用户名。匿名访问时可以不配置。
password [string]
MQTT broker 认证密码。匿名访问时可以不配置。
qos [int]
发布消息时使用的 MQTT 服务质量等级。
0:最多一次,发送后不等待确认。1:至少一次,broker 需要确认收到消息,默认值。
format [string]
输出消息的序列化格式。支持以下值:
json:把每一行序列化为一个 JSON 对象,默认值。text:把每一行序列化为按分隔符拼接的纯文本,分隔符由field_delimiter控制。
field_delimiter [string]
当 format = "text" 时使用的字段分隔符。默认值为 ,。
示例:,, |, \t
batch_size [int]
发送到 broker 前缓存的消息数量。默认值为 1,表示每条消息都会立即发送。
调大该值可以减少逐条发送的开销,提升吞吐。缓存中的消息会在 checkpoint 和 writer 关闭时自动 flush。
retry_timeout [int]
发布消息遇到临时网络故障时,最多重试多久,单位为毫秒。writer 会在这个时间窗口内等待连接恢复并重试发送。
connection_timeout [int]
建立 MQTT 连接的超时时间,单位为秒。
clean_session [boolean]
是否使用 clean MQTT session。默认值为 true。
true:broker 会丢弃之前的会话状态,适合大多数无状态写入场景。false:broker 可以保留会话状态和未确认的 QoS 1 消息,可以增强至少一次语义,但可能造成 broker 端状态堆积。使用时需要确保每个 writer 的 client id 稳定且唯一。
common options
Sink 插件通用参数,请参考 Sink 通用选项。
性能建议
MQTT Sink 会同步发送消息,以保持写入顺序。可以通过下面方式提升吞吐:
- 适当调大
batch_size,例如设置为100,减少逐条发送开销。 - 如果业务可以接受最多一次投递,把
qos设置为0。 - 提高 SeaTunnel 作业并行度,让多个 MQTT client 分担写入。
- 如果需要非常高的吞吐,可以考虑使用 Kafka Sink。
示例
写入 JSON 消息到 MQTT
env {
parallelism = 1
job.mode = "BATCH"
job.name = "SeaTunnel_MQTT_Sink"
}
source {
FakeSource {
row.num = 16
schema = {
fields {
id = bigint
name = string
age = int
}
}
plugin_output = "fake"
}
}
sink {
MQTT {
plugin_input = "fake"
url = "tcp://mqtt-broker:1883"
topic = "test/seatunnel/sink"
qos = 1
format = "json"
}
}
该作业会向 test/seatunnel/sink topic 写入 16 条消息。因为 format 设置为 json,每一行都会被序列化为一条 JSON 消息。
使用认证并写入文本格式
sink {
MQTT {
url = "tcp://secure-broker.example.com:1883"
topic = "data/pipeline/output"
username = "seatunnel_user"
password = "secret"
qos = 1
format = "text"
field_delimiter = "|"
retry_timeout = 10000
connection_timeout = 60
}
}
当 format = "text" 时,每一行都会被序列化为一行分隔符文本。可以通过 field_delimiter 调整分隔符,以匹配下游消费端的解析方式。