Rabbitmq
Rabbitmq sink connector
Description
Used to write data to Rabbitmq.
Key features
Options
name | type | required | default value |
---|---|---|---|
host | string | yes | - |
port | int | yes | - |
virtual_host | string | yes | - |
username | string | yes | - |
password | string | yes | - |
queue_name | string | yes | - |
url | string | no | - |
network_recovery_interval | int | no | - |
topology_recovery_enabled | boolean | no | - |
automatic_recovery_enabled | boolean | no | - |
use_correlation_id | boolean | no | false |
connection_timeout | int | no | - |
rabbitmq.config | map | no | - |
common-options | no | - |
host [string]
the default host to use for connections
port [int]
the default port to use for connections
virtual_host [string]
virtual host – the virtual host to use when connecting to the broker
username [string]
the AMQP user name to use when connecting to the broker
password [string]
the password to use when connecting to the broker
url [string]
convenience method for setting the fields in an AMQP URI: host, port, username, password and virtual host
queue_name [string]
the queue to write the message to
durable [boolean]
true: The queue will survive a server restart. false: The queue will be deleted on server restart.
exclusive [boolean]
true: The queue is used only by the current connection and will be deleted when the connection closes. false: The queue can be used by multiple connections.
auto_delete [boolean]
true: The queue will be deleted automatically when the last consumer unsubscribes. false: The queue will not be automatically deleted.
schema [Config]
fields [Config]
the schema fields of upstream data.
network_recovery_interval [int]
how long will automatic recovery wait before attempting to reconnect, in ms
topology_recovery_enabled [boolean]
if true, enables topology recovery
automatic_recovery_enabled [boolean]
if true, enables connection recovery
use_correlation_id [boolean]
whether the messages received are supplied with a unique id to deduplicate messages (in case of failed acknowledgments).
connection_timeout [int]
connection TCP establishment timeout in milliseconds; zero for infinite
rabbitmq.config [map]
In addition to the above parameters that must be specified by the RabbitMQ client, the user can also specify multiple non-mandatory parameters for the client, covering all the parameters specified in the official RabbitMQ document.
common options
Sink plugin common parameters, please refer to Sink Common Options for details
Example
simple:
sink {
RabbitMQ {
host = "rabbitmq-e2e"
port = 5672
virtual_host = "/"
username = "guest"
password = "guest"
queue_name = "test1"
rabbitmq.config = {
requested-heartbeat = 10
connection-timeout = 10
}
}
}
Example 2
queue with durable, exclusive, auto_delete:
sink {
RabbitMQ {
host = "rabbitmq-e2e"
port = 5672
virtual_host = "/"
username = "guest"
password = "guest"
queue_name = "test1"
durable = "true"
exclusive = "false"
auto_delete = "false"
rabbitmq.config = {
requested-heartbeat = 10
connection-timeout = 10
}
}
}
Changelog
Change Log
Change | Commit | Version |
---|---|---|
[Improve] rabbit mq options (#8740) | https://github.com/apache/seatunnel/commit/4eec9be01 | 2.3.10 |
[Improve] restruct connector common options (#8634) | https://github.com/apache/seatunnel/commit/f3499a6ee | 2.3.10 |
[Improve][dist]add shade check rule (#8136) | https://github.com/apache/seatunnel/commit/51ef80001 | 2.3.9 |
[Feature][Restapi] Allow metrics information to be associated to logical plan nodes (#7786) | https://github.com/apache/seatunnel/commit/6b7c53d03 | 2.3.9 |
[Feature][Rabbitmq] Allow configuration of queue durability and deletion policy (#7365) | https://github.com/apache/seatunnel/commit/aabfc8eb7 | 2.3.8 |
[Hotfix][connector-v2-rabbit] fix rabbit checkpoint exception in Flink mode (#7108) | https://github.com/apache/seatunnel/commit/423a7b142 | 2.3.6 |
[Feature][Kafka] Support multi-table source read (#5992) | https://github.com/apache/seatunnel/commit/60104602d | 2.3.6 |
[Improve] Remove use SeaTunnelSink::getConsumedType method and mark it as deprecated (#5755) | https://github.com/apache/seatunnel/commit/8de740810 | 2.3.4 |
Support config column/primaryKey/constraintKey in schema (#5564) | https://github.com/apache/seatunnel/commit/eac76b4e5 | 2.3.4 |
[Bugfix][connector-v2][rabbitmq] Fix reduplicate ack msg bug and code style (#4842) | https://github.com/apache/seatunnel/commit/985fb6642 | 2.3.2 |
[Hotfix][E2E] Fix RabbitmqIT (#4593) | https://github.com/apache/seatunnel/commit/9bd5403d7 | 2.3.2 |
Merge branch 'dev' into merge/cdc | https://github.com/apache/seatunnel/commit/4324ee191 | 2.3.1 |
[Improve][Project] Code format with spotless plugin. | https://github.com/apache/seatunnel/commit/423b58303 | 2.3.1 |
[improve][api] Refactoring schema parse (#4157) | https://github.com/apache/seatunnel/commit/b2f573a13 | 2.3.1 |
[Improve][build] Give the maven module a human readable name (#4114) | https://github.com/apache/seatunnel/commit/d7cd60105 | 2.3.1 |
[Improve][Project] Code format with spotless plugin. (#4101) | https://github.com/apache/seatunnel/commit/a2ab16656 | 2.3.1 |
[Feature][Connector] add get source method to all source connector (#3846) | https://github.com/apache/seatunnel/commit/417178fb8 | 2.3.1 |
[Improve][Connector-V2] Change Connector Custom Config Prefix To Map (#3719) | https://github.com/apache/seatunnel/commit/ef1b8b1bb | 2.3.1 |
[Feature][API & Connector & Doc] add parallelism and column projection interface (#3829) | https://github.com/apache/seatunnel/commit/b9164b8ba | 2.3.1 |
[Hotfix][OptionRule] Fix option rule about all connectors (#3592) | https://github.com/apache/seatunnel/commit/226dc6a11 | 2.3.0 |
[Feature][Connector-V2][RabbitMQ] Add RabbitMQ source & sink connector (#3312) | https://github.com/apache/seatunnel/commit/4b12691a8 | 2.3.0 |