跳到主要内容
版本:Next

RocketMQ

RocketMQ Sink 连接器

支持的 Apache RocketMQ 版本

  • 4.9.0 或更新版本

支持的引擎

Spark
Flink
SeaTunnel Zeta

主要特性

描述

将 SeaTunnel 数据行写入 Apache RocketMQ topic。该 Sink 支持 JSON 和文本消息体、消息 tag、同步发送、按字段选择分区,以及在 exactly.once = true 时使用事务消息保证精确一次写入。

Sink 参数

参数名类型是否必填默认值描述
topicString-RocketMQ topic 名称。
name.srv.addrString-RocketMQ NameServer 地址,例如 localhost:9876
acl.enabledBooleanfalse是否启用 RocketMQ ACL 鉴权。
access.keyString-访问密钥。acl.enabled = true 时必填。
secret.keyString-秘密密钥。acl.enabled = true 时必填。
producer.groupStringSeaTunnel-Producer-GroupRocketMQ 生产者组 ID。
tagString-写入每条消息时使用的 RocketMQ tag。
partition.key.fieldsList-用来选择 RocketMQ 队列的字段名。配置的字段必须存在于上游 schema 中。
formatStringjson消息格式。支持 jsontext
field.delimiterString,format = text 时使用的字段分隔符。
producer.send.syncBooleanfalse是否同步发送消息。为 false 时异步发送。
exactly.onceBooleanfalse是否使用事务消息实现精确一次写入。
max.message.sizeint4194304最大消息体大小,单位字节。
send.message.timeoutint3000发送消息超时时间,单位毫秒。
common-optionsconfig-Sink 连接器通用参数,详情请参考 Sink 通用参数

参数说明

partition.key.fields

partition.key.fields 控制消息写入哪个 RocketMQ 队列。SeaTunnel 会对这些字段的值做哈希,并用哈希结果选择队列。如果不配置该参数,则由 RocketMQ 自行选择队列。

例如,上游字段中有 c_int 时,可以这样配置:

partition.key.fields = ["c_int"]

exactly.once

Sink 支持通过 RocketMQ 事务消息实现精确一次写入。该能力默认关闭。确认 RocketMQ 集群和作业 checkpoint 配置满足事务写入要求后,可设置 exactly.once = true

任务示例

写入 JSON 消息

env {
parallelism = 1
job.mode = "BATCH"
}

source {
FakeSource {
row.num = 10
schema = {
fields {
c_string = string
c_int = int
c_timestamp = timestamp
}
}
}
}

sink {
Rocketmq {
name.srv.addr = "rocketmq-e2e:9876"
topic = "test_topic"
partition.key.fields = ["c_int"]
producer.send.sync = true
}
}

写入文本消息

env {
parallelism = 1
job.mode = "BATCH"
}

source {
FakeSource {
row.num = 10
schema = {
fields {
id = bigint
content = string
}
}
}
}

sink {
Rocketmq {
name.srv.addr = "rocketmq-e2e:9876"
topic = "test_text_topic"
format = text
field.delimiter = ","
producer.send.sync = true
}
}

写入带 tag 的消息

env {
parallelism = 1
job.mode = "BATCH"
}

source {
FakeSource {
row.num = 10
schema = {
fields {
c_string = string
c_int = int
}
}
}
}

sink {
Rocketmq {
name.srv.addr = "rocketmq-e2e:9876"
topic = "test_topic_message_tag"
tag = "test_tag"
partition.key.fields = ["c_string"]
producer.send.sync = true
}
}

RocketMQ 读写 RocketMQ

env {
parallelism = 1
job.mode = "STREAMING"
checkpoint.interval = 5000
}

source {
Rocketmq {
name.srv.addr = "rocketmq-e2e:9876"
topics = "test_topic_source"
plugin_output = "rocketmq_table"
format = json
start.mode = "CONSUME_FROM_FIRST_OFFSET"
consumer.group = "rocketmq_to_rocketmq_group"
schema = {
fields {
id = bigint
c_string = string
}
}
}
}

sink {
Rocketmq {
plugin_input = "rocketmq_table"
name.srv.addr = "rocketmq-e2e:9876"
topic = "test_topic_sink"
partition.key.fields = ["id"]
exactly.once = true
}
}

变更日志