跳到主要内容
版本:Next

RocketMQ

RocketMQ 源连接器

支持的 Apache RocketMQ 版本

  • 4.9.0(或更新版本,供参考)

支持这些引擎

Spark
Flink
SeaTunnel Zeta

关键特性

描述

Apache RocketMQ 的源连接器。

源选项

参数名类型必须默认值描述
topicsString-RocketMQ 主题名称。如果有多个主题,使用 , 分隔,例如:"tpc1,tpc2"
name.srv.addrString-RocketMQ 名称服务器集群地址。
tagsString-RocketMQ 标签名称。如果有多个标签,使用 , 分隔,例如:"tag1,tag2"
acl.enabledBooleanfalse如果为 true,启用访问控制,需要配置访问密钥和秘密密钥。
access.keyString访问密钥
secret.keyString当 ACL_ENABLED 为 true 时,秘密密钥不能为空。
batch.sizeint100RocketMQ 消费者拉取批大小
consumer.groupStringSeaTunnel-Consumer-GroupRocketMQ 消费者组 ID,用于区分不同的消费者组。
commit.on.checkpointBooleantrue如果为 true,消费者的偏移量将在后台定期提交。
schema-数据的结构,包括字段名称和字段类型。
formatStringjson数据格式。默认格式是 json。可选 text 格式。默认字段分隔符是 ","。如果自定义分隔符,添加 "field.delimiter" 选项。
field.delimiterString,自定义数据格式的字段分隔符
start.modeStringCONSUME_FROM_GROUP_OFFSETS消费者的初始消费模式,有几种类型:[CONSUME_FROM_LAST_OFFSET],[CONSUME_FROM_FIRST_OFFSET],[CONSUME_FROM_GROUP_OFFSETS],[CONSUME_FROM_TIMESTAMP],[CONSUME_FROM_SPECIFIC_OFFSETS]
start.mode.offsets消费模式为 "CONSUME_FROM_SPECIFIC_OFFSETS" 所需的偏移量
start.mode.timestampLong消费模式为 "CONSUME_FROM_TIMESTAMP" 所需的时间。
partition.discovery.interval.millislong-1动态发现主题和分区的间隔。
ignore_parse_errorsBooleanfalse可选标志,跳过解析错误而不是失败。
common-optionsconfig-源插件通用参数,请参考 源通用选项 详见。

start.mode.offsets

消费模式为 "CONSUME_FROM_SPECIFIC_OFFSETS" 所需的偏移量。

例如:

start.mode.offsets = {
topic1-0 = 70
topic1-1 = 10
topic1-2 = 10
}

任务示例

简单

消费者读取 Rocketmq 数据并将其打印到控制台

env {
parallelism = 1
job.mode = "BATCH"
}

source {
Rocketmq {
name.srv.addr = "rocketmq-e2e:9876"
topics = "test_topic_json"
plugin_output = "rocketmq_table"
schema = {
fields {
id = bigint
c_map = "map<string, smallint>"
c_array = "array<tinyint>"
c_string = string
c_boolean = boolean
c_tinyint = tinyint
c_smallint = smallint
c_int = int
c_bigint = bigint
c_float = float
c_double = double
c_decimal = "decimal(2, 1)"
c_bytes = bytes
c_date = date
c_timestamp = timestamp
}
}
}
}

transform {
# 如果您想了解有关如何配置 seatunnel 的更多信息并查看完整的转换插件列表,
# 请访问 https://seatunnel.apache.org/docs/category/transform
}

sink {
Console {
}
}

指定格式消费简单

当我以 json 格式消费主题数据并解析,每次拉取的条数是 400,消费从原始位置开始

env {
parallelism = 1
job.mode = "BATCH"
}

source {
Rocketmq {
name.srv.addr = "localhost:9876"
topics = "test_topic"
plugin_output = "rocketmq_table"
start.mode = "CONSUME_FROM_FIRST_OFFSET"
batch.size = "400"
consumer.group = "test_topic_group"
format = json
schema = {
fields {
c_map = "map<string, string>"
c_array = "array<int>"
c_string = string
c_boolean = boolean
c_tinyint = tinyint
c_smallint = smallint
c_int = int
c_bigint = bigint
c_float = float
c_double = double
c_decimal = "decimal(30, 8)"
c_bytes = bytes
c_date = date
c_timestamp = timestamp
}
}
}
}

transform {
# 如果您想了解有关如何配置 seatunnel 的更多信息并查看完整的转换插件列表,
# 请访问 https://seatunnel.apache.org/docs/category/transform
}

sink {
Console {
}
}

变更日志