跳到主要内容
版本:Next

RocketMQ

RocketMQ source connector

Support Apache RocketMQ Version

  • 4.9.0 (Or a newer version, for reference)

Support These Engines

Spark
Flink
SeaTunnel Zeta

Key Features

Description

Source connector for Apache RocketMQ.

Source Options

NameTypeRequiredDefaultDescription
topicsStringyes-RocketMQ topic name. If there are multiple topics, use , to split, for example: "tpc1,tpc2".
name.srv.addrStringyes-RocketMQ name server cluster address.
tagsStringno-RocketMQ tag name. If there are multiple tags, use , to split, for example: "tag1,tag2".
acl.enabledBooleannofalseIf true, access control is enabled, and access key and secret key need to be configured.
access.keyStringno
secret.keyStringnoWhen ACL_ENABLED is true, secret key cannot be empty.
batch.sizeintno100RocketMQ consumer pull batch size
consumer.groupStringnoSeaTunnel-Consumer-GroupRocketMQ consumer group id, used to distinguish different consumer groups.
commit.on.checkpointBooleannotrueIf true the consumer's offset will be periodically committed in the background.
schemano-The structure of the data, including field names and field types.
formatStringnojsonData format. The default format is json. Optional text format. The default field separator is ",".If you customize the delimiter, add the "field.delimiter" option.
field.delimiterStringno,Customize the field delimiter for data format
start.modeStringnoCONSUME_FROM_GROUP_OFFSETSThe initial consumption pattern of consumers,there are several types: [CONSUME_FROM_LAST_OFFSET],[CONSUME_FROM_FIRST_OFFSET],[CONSUME_FROM_GROUP_OFFSETS],[CONSUME_FROM_TIMESTAMP],[CONSUME_FROM_SPECIFIC_OFFSETS]
start.mode.offsetsno
start.mode.timestampLongnoThe time required for consumption mode to be "CONSUME_FROM_TIMESTAMP".
partition.discovery.interval.millislongno-1The interval for dynamically discovering topics and partitions.
ignore_parse_errorsBooleannofalseOptional flag to skip parse errors instead of failing.
common-optionsconfigno-Source plugin common parameters, please refer to Source Common Options for details.

start.mode.offsets

The offset required for consumption mode to be "CONSUME_FROM_SPECIFIC_OFFSETS".

for example:

start.mode.offsets = {
topic1-0 = 70
topic1-1 = 10
topic1-2 = 10
}

Task Example

Simple:

Consumer reads Rocketmq data and prints it to the console type

env {
parallelism = 1
job.mode = "BATCH"
}

source {
Rocketmq {
name.srv.addr = "rocketmq-e2e:9876"
topics = "test_topic_json"
plugin_output = "rocketmq_table"
schema = {
fields {
id = bigint
c_map = "map<string, smallint>"
c_array = "array<tinyint>"
c_string = string
c_boolean = boolean
c_tinyint = tinyint
c_smallint = smallint
c_int = int
c_bigint = bigint
c_float = float
c_double = double
c_decimal = "decimal(2, 1)"
c_bytes = bytes
c_date = date
c_timestamp = timestamp
}
}
}
}

transform {
# If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
# please go to https://seatunnel.apache.org/docs/category/transform
}

sink {
Console {
}
}

Specified format consumption Simple:

When I consume the topic data in json format parsing and pulling the number of bars each time is 400, the consumption starts from the original location

env {
parallelism = 1
job.mode = "BATCH"
}

source {
Rocketmq {
name.srv.addr = "localhost:9876"
topics = "test_topic"
plugin_output = "rocketmq_table"
start.mode = "CONSUME_FROM_FIRST_OFFSET"
batch.size = "400"
consumer.group = "test_topic_group"
format = "json"
format = json
schema = {
fields {
c_map = "map<string, string>"
c_array = "array<int>"
c_string = string
c_boolean = boolean
c_tinyint = tinyint
c_smallint = smallint
c_int = int
c_bigint = bigint
c_float = float
c_double = double
c_decimal = "decimal(30, 8)"
c_bytes = bytes
c_date = date
c_timestamp = timestamp
}
}
}
}

transform {
# If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
# please go to https://seatunnel.apache.org/docs/category/transform
}
sink {
Console {
}
}

Specified timestamp Simple:

This is to specify a time to consume, and I dynamically sense the existence of a new partition every 1000 milliseconds to pull the consumption

env {
parallelism = 1
spark.app.name = "SeaTunnel"
spark.executor.instances = 2
spark.executor.cores = 1
spark.executor.memory = "1g"
spark.master = local
job.mode = "BATCH"
}

source {
Rocketmq {
name.srv.addr = "localhost:9876"
topics = "test_topic"
partition.discovery.interval.millis = "1000"
start.mode.timestamp="1694508382000"
consumer.group="test_topic_group"
format="json"
format = json
schema = {
fields {
c_map = "map<string, string>"
c_array = "array<int>"
c_string = string
c_boolean = boolean
c_tinyint = tinyint
c_smallint = smallint
c_int = int
c_bigint = bigint
c_float = float
c_double = double
c_decimal = "decimal(30, 8)"
c_bytes = bytes
c_date = date
c_timestamp = timestamp
}
}
}
}

transform {
# If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
# please go to https://seatunnel.apache.org/docs/category/transform
}

sink {
Console {
}
}

Specified tag example:

Here you can specify a tag to consume data. If there are multiple tags, use , to separate them, for example: "tag1,tag2"

env {
parallelism = 1
job.mode = "BATCH"

# You can set spark configuration here
spark.app.name = "SeaTunnel"
spark.executor.instances = 2
spark.executor.cores = 1
spark.executor.memory = "1g"
spark.master = local
}

source {
Rocketmq {
plugin_output = "rocketmq_table"
name.srv.addr = "localhost:9876"
topics = "test_topic"
format = text
# The default field delimiter is ","
field_delimiter = ","
tags = "test_tag"
schema = {
fields {
id = bigint
c_map = "map<string, smallint>"
c_array = "array<tinyint>"
c_string = string
c_boolean = boolean
c_tinyint = tinyint
c_smallint = smallint
c_int = int
c_bigint = bigint
c_float = float
c_double = double
c_decimal = "decimal(2, 1)"
c_bytes = bytes
c_date = date
c_timestamp = timestamp
}
}
}
}

transform {
# If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
# please go to https://seatunnel.apache.org/docs/category/transform
}

sink {
Console {
plugin_input = "rocketmq_table"
}
}

Changelog

Change Log
ChangeCommitVersion
[Improve][Connector-V2] Add optional flag for rocketmq connector to skip parse errors instead of failing (#8737)https://github.com/apache/seatunnel/commit/701f17b5d4dev
[Improve][Connector-V2] RocketMQ Sink add message tag config (#7996)https://github.com/apache/seatunnel/commit/97a1b00e482.3.9
[Feature][Restapi] Allow metrics information to be associated to logical plan nodes (#7786)https://github.com/apache/seatunnel/commit/6b7c53d03c2.3.9
[Fix][Connector-V2] Fix some throwable error not be caught (#7657)https://github.com/apache/seatunnel/commit/e19d73282e2.3.8
[Feature][Kafka] Support multi-table source read (#5992)https://github.com/apache/seatunnel/commit/60104602d12.3.6
[Fix][connector-rocketmq] commit a correct offset to broker & reduce ThreadInterruptedException log (#6668)https://github.com/apache/seatunnel/commit/b7480e1a892.3.6
[fix][connector-rocketmq]Fix a NPE problem when checkpoint.interval is set too small(#6624) (#6625)https://github.com/apache/seatunnel/commit/6e0c81d4922.3.5
[Test][E2E] Add thread leak check for connector (#5773)https://github.com/apache/seatunnel/commit/1f2f3fc5f02.3.4
[Fix][Connector] Rocketmq source startOffset greater than endOffset error (#6287)https://github.com/apache/seatunnel/commit/cd44b5894e2.3.4
[Improve][Common] Introduce new error define rule (#5793)https://github.com/apache/seatunnel/commit/9d1b2582b22.3.4
[Improve] Remove use SeaTunnelSink::getConsumedType method and mark it as deprecated (#5755)https://github.com/apache/seatunnel/commit/8de74081002.3.4
[Improve][CheckStyle] Remove useless 'SuppressWarnings' annotation of checkstyle. (#5260)https://github.com/apache/seatunnel/commit/51c0d709ba2.3.4
[Improve][pom] Formatting pom (#4761)https://github.com/apache/seatunnel/commit/1d6d3815ec2.3.2
[Hotfix][Connector-V2][RocketMQ] Fix rocketmq spark e2e test cases (#4583)https://github.com/apache/seatunnel/commit/e711f6ef4c2.3.2
[Feature][Connector-V2] Add rocketmq source and sink (#4007)https://github.com/apache/seatunnel/commit/e3338975522.3.2