跳到主要内容
版本:Next

Rabbitmq

Rabbitmq 数据接收器

描述

该数据接收器是将数据写入Rabbitmq。

主要特性

接收器选项

名称类型是否必须默认值
hoststring-
portint-
virtual_hoststring-
usernamestring-
passwordstring-
queue_namestring-
urlstring-
routing_keystring-
exchangestring-
network_recovery_intervalint-
topology_recovery_enabledboolean-
AUTOMATIC_RECOVERY_ENABLEDboolean-
connection_timeoutint-
rabbitmq.configmap-
durablebooleantrue
exclusivebooleanfalse
auto_deletebooleanfalse
common-options-

host [string]

Rabbitmq服务器地址

port [int]

Rabbitmq服务器端口

virtual_host [string]

virtual host – 连接broker使用的vhost

username [string]

连接broker时使用的用户名

password [string]

连接broker时使用的密码

usernamepassword 需要一起配置。

url [string]

设置host、port、username、password和virtual host的简便方式。

queue_name [string]

数据写入的队列名。如果没有配置 routing_key,连接器会通过默认 exchange 将消息直接写入该队列。

routing_key [string]

发布消息时使用的路由键。如果希望通过指定 exchange 发布消息,而不是直接写入 queue_name,请同时配置 routing_keyexchange

exchange [string]

配置 routing_key 时使用的 exchange。

durable [boolean]

  • true:队列将在服务器重启时保留。
  • false:队列将在服务器重启时删除。

exclusive [boolean]

  • true:队列仅由当前连接使用,连接关闭时将删除。
  • false:队列可以由多个连接使用。

auto_delete [boolean]

  • true:队列将在最后一个消费者取消订阅时自动删除。
  • false:队列不会自动删除。

network_recovery_interval [int]

自动恢复需等待多长时间才尝试重连,单位为毫秒。

topology_recovery_enabled [boolean]

设置为true,表示启用拓扑恢复。

AUTOMATIC_RECOVERY_ENABLED [boolean]

设置为 true,表示启用连接恢复。

当前连接器配置项名称使用大写形式。请写成 AUTOMATIC_RECOVERY_ENABLED,不要写成 automatic_recovery_enabled

connection_timeout [int]

TCP连接建立的超时时间,单位为毫秒;0代表不限制。

rabbitmq.config [map]

除了上面提及必须设置的RabbitMQ客户端参数,你也还可以为客户端指定多个非强制参数,参见 RabbitMQ官方文档参数设置

common options

Sink插件常用参数,请参考Sink常用选项获取更多细节信息。

配置说明

  • 如果配置了 username,也必须配置 password,反过来也一样。
  • hostportvirtual_hostqueue_name 是连接器必填项。url 可额外提供 RabbitMQ 客户端使用的 AMQP URI。
  • durableexclusiveauto_delete 用于连接器声明目标队列。

示例

simple:

env {
parallelism = 1
job.mode = "STREAMING"
}

source {
FakeSource {
row.num = 10
schema = {
fields {
id = bigint
c_string = string
}
}
}
}

sink {
RabbitMQ {
host = "rabbitmq-e2e"
port = 5672
virtual_host = "/"
username = "guest"
password = "guest"
queue_name = "test1"
rabbitmq.config = {
requested-heartbeat = 10
connection-timeout = 10
}
}
}

示例 2

配置队列的 durable、exclusive、auto_delete:

env {
parallelism = 1
job.mode = "STREAMING"
}

source {
FakeSource {
row.num = 10
schema = {
fields {
id = bigint
c_string = string
}
}
}
}

sink {
RabbitMQ {
host = "rabbitmq-e2e"
port = 5672
virtual_host = "/"
username = "guest"
password = "guest"
queue_name = "test1"
durable = "true"
exclusive = "false"
auto_delete = "false"
rabbitmq.config = {
requested-heartbeat = 10
connection-timeout = 10
}
}
}

变更日志

Change Log
ChangeCommitVersion
[Fix][connector-rabbitmq] Set default value for durable, exclusive and auto-delete (#9631)https://github.com/apache/seatunnel/commit/5f9492e62a2.3.12
[Feature][Checkpoint] Add check script for source/sink state class serialVersionUID missing (#9118)https://github.com/apache/seatunnel/commit/4f5adeb1c72.3.11
[Improve] rabbit mq options (#8740)https://github.com/apache/seatunnel/commit/4eec9be0122.3.10
[Improve] restruct connector common options (#8634)https://github.com/apache/seatunnel/commit/f3499a6eeb2.3.10
[Improve][dist]add shade check rule (#8136)https://github.com/apache/seatunnel/commit/51ef8000162.3.9
[Feature][Restapi] Allow metrics information to be associated to logical plan nodes (#7786)https://github.com/apache/seatunnel/commit/6b7c53d03c2.3.9
[Feature][Rabbitmq] Allow configuration of queue durability and deletion policy (#7365)https://github.com/apache/seatunnel/commit/aabfc8eb782.3.8
[Hotfix][connector-v2-rabbit] fix rabbit checkpoint exception in Flink mode (#7108)https://github.com/apache/seatunnel/commit/423a7b142b2.3.6
[Feature][Kafka] Support multi-table source read (#5992)https://github.com/apache/seatunnel/commit/60104602d12.3.6
[Improve] Remove use SeaTunnelSink::getConsumedType method and mark it as deprecated (#5755)https://github.com/apache/seatunnel/commit/8de74081002.3.4
Support config column/primaryKey/constraintKey in schema (#5564)https://github.com/apache/seatunnel/commit/eac76b4e502.3.4
[Bugfix][connector-v2][rabbitmq] Fix reduplicate ack msg bug and code style (#4842)https://github.com/apache/seatunnel/commit/985fb6642a2.3.2
[Hotfix][E2E] Fix RabbitmqIT (#4593)https://github.com/apache/seatunnel/commit/9bd5403d712.3.2
Merge branch 'dev' into merge/cdchttps://github.com/apache/seatunnel/commit/4324ee19122.3.1
[Improve][Project] Code format with spotless plugin.https://github.com/apache/seatunnel/commit/423b5830382.3.1
[improve][api] Refactoring schema parse (#4157)https://github.com/apache/seatunnel/commit/b2f573a13e2.3.1
[Improve][build] Give the maven module a human readable name (#4114)https://github.com/apache/seatunnel/commit/d7cd6010512.3.1
[Improve][Project] Code format with spotless plugin. (#4101)https://github.com/apache/seatunnel/commit/a2ab1665612.3.1
[Feature][Connector] add get source method to all source connector (#3846)https://github.com/apache/seatunnel/commit/417178fb842.3.1
[Improve][Connector-V2] Change Connector Custom Config Prefix To Map (#3719)https://github.com/apache/seatunnel/commit/ef1b8b1bb52.3.1
[Feature][API & Connector & Doc] add parallelism and column projection interface (#3829)https://github.com/apache/seatunnel/commit/b9164b8ba12.3.1
[Hotfix][OptionRule] Fix option rule about all connectors (#3592)https://github.com/apache/seatunnel/commit/226dc6a1192.3.0
[Feature][Connector-V2][RabbitMQ] Add RabbitMQ source & sink connector (#3312)https://github.com/apache/seatunnel/commit/4b12691a8d2.3.0