Rabbitmq
Rabbitmq source connector
Description
Used to read data from Rabbitmq.
Key features
The source must be non-parallel (parallelism set to 1) in order to achieve exactly-once. This limitation is mainly due to RabbitMQ’s approach to dispatching messages from a single queue to multiple consumers.
Options
name | type | required | default value |
---|---|---|---|
host | string | yes | - |
port | int | yes | - |
virtual_host | string | yes | - |
username | string | yes | - |
password | string | yes | - |
queue_name | string | yes | - |
schema | config | yes | - |
url | string | no | - |
routing_key | string | no | - |
exchange | string | no | - |
network_recovery_interval | int | no | - |
topology_recovery_enabled | boolean | no | - |
automatic_recovery_enabled | boolean | no | - |
connection_timeout | int | no | - |
requested_channel_max | int | no | - |
requested_frame_max | int | no | - |
requested_heartbeat | int | no | - |
prefetch_count | int | no | - |
delivery_timeout | long | no | - |
common-options | no | - |
host [string]
the default host to use for connections
port [int]
the default port to use for connections
virtual_host [string]
virtual host – the virtual host to use when connecting to the broker
username [string]
the AMQP user name to use when connecting to the broker
password [string]
the password to use when connecting to the broker
url [string]
convenience method for setting the fields in an AMQP URI: host, port, username, password and virtual host
queue_name [string]
the queue to publish the message to
routing_key [string]
the routing key to publish the message to
exchange [string]
the exchange to publish the message to
schema [Config]
fields [Config]
the schema fields of upstream data.
network_recovery_interval [int]
how long will automatic recovery wait before attempting to reconnect, in ms
topology_recovery [string]
if true, enables topology recovery
automatic_recovery [string]
if true, enables connection recovery
connection_timeout [int]
connection tcp establishment timeout in milliseconds; zero for infinite
requested_channel_max [int]
initially requested maximum channel number; zero for unlimited **Note: Note the value must be between 0 and 65535 (unsigned short in AMQP 0-9-1).
requested_frame_max [int]
the requested maximum frame size
requested_heartbeat [int]
Set the requested heartbeat timeout **Note: Note the value must be between 0 and 65535 (unsigned short in AMQP 0-9-1).
prefetch_count [int]
prefetchCount the max number of messages to receive without acknowledgement
delivery_timeout [long]
deliveryTimeout maximum wait time, in milliseconds, for the next message delivery
common options
Source plugin common parameters, please refer to Source Common Options for details
Example
simple:
source {
RabbitMQ {
host = "rabbitmq-e2e"
port = 5672
virtual_host = "/"
username = "guest"
password = "guest"
queue_name = "test"
schema = {
fields {
id = bigint
c_map = "map<string, smallint>"
c_array = "array<tinyint>"
}
}
}
}
Changelog
Change Log
Change | Commit | Version |
---|---|---|
[Improve] rabbit mq options (#8740) | https://github.com/apache/seatunnel/commit/4eec9be012 | dev |
[Improve] restruct connector common options (#8634) | https://github.com/apache/seatunnel/commit/f3499a6eeb | dev |
[Improve][dist]add shade check rule (#8136) | https://github.com/apache/seatunnel/commit/51ef800016 | 2.3.9 |
[Feature][Restapi] Allow metrics information to be associated to logical plan nodes (#7786) | https://github.com/apache/seatunnel/commit/6b7c53d03c | 2.3.9 |
[Feature][Rabbitmq] Allow configuration of queue durability and deletion policy (#7365) | https://github.com/apache/seatunnel/commit/aabfc8eb78 | 2.3.8 |
[Hotfix][connector-v2-rabbit] fix rabbit checkpoint exception in Flink mode (#7108) | https://github.com/apache/seatunnel/commit/423a7b142b | 2.3.6 |
[Feature][Kafka] Support multi-table source read (#5992) | https://github.com/apache/seatunnel/commit/60104602d1 | 2.3.6 |
[Improve] Remove use SeaTunnelSink::getConsumedType method and mark it as deprecated (#5755) | https://github.com/apache/seatunnel/commit/8de7408100 | 2.3.4 |
Support config column/primaryKey/constraintKey in schema (#5564) | https://github.com/apache/seatunnel/commit/eac76b4e50 | 2.3.4 |
[Bugfix][connector-v2][rabbitmq] Fix reduplicate ack msg bug and code style (#4842) | https://github.com/apache/seatunnel/commit/985fb6642a | 2.3.2 |
[Hotfix][E2E] Fix RabbitmqIT (#4593) | https://github.com/apache/seatunnel/commit/9bd5403d71 | 2.3.2 |
Merge branch 'dev' into merge/cdc | https://github.com/apache/seatunnel/commit/4324ee1912 | 2.3.1 |
[Improve][Project] Code format with spotless plugin. | https://github.com/apache/seatunnel/commit/423b583038 | 2.3.1 |
[improve][api] Refactoring schema parse (#4157) | https://github.com/apache/seatunnel/commit/b2f573a13e | 2.3.1 |
[Improve][build] Give the maven module a human readable name (#4114) | https://github.com/apache/seatunnel/commit/d7cd601051 | 2.3.1 |
[Improve][Project] Code format with spotless plugin. (#4101) | https://github.com/apache/seatunnel/commit/a2ab166561 | 2.3.1 |
[Feature][Connector] add get source method to all source connector (#3846) | https://github.com/apache/seatunnel/commit/417178fb84 | 2.3.1 |
[Improve][Connector-V2] Change Connector Custom Config Prefix To Map (#3719) | https://github.com/apache/seatunnel/commit/ef1b8b1bb5 | 2.3.1 |
[Feature][API & Connector & Doc] add parallelism and column projection interface (#3829) | https://github.com/apache/seatunnel/commit/b9164b8ba1 | 2.3.1 |
[Hotfix][OptionRule] Fix option rule about all connectors (#3592) | https://github.com/apache/seatunnel/commit/226dc6a119 | 2.3.0 |
[Feature][Connector-V2][RabbitMQ] Add RabbitMQ source & sink connector (#3312) | https://github.com/apache/seatunnel/commit/4b12691a8d | 2.3.0 |