跳到主要内容
版本:Next

ActiveMQ

ActiveMQ Sink 连接器

描述

用于把 SeaTunnel 数据写入 ActiveMQ 队列。每一行数据都会被序列化成一条 JSON 文本消息。

关键特性

选项

名称类型是否必传默认值描述
uristring-ActiveMQ Broker 地址,例如 tcp://localhost:61616
queue_namestring-要写入的队列名称。
usernamestring-创建 ActiveMQ 连接时使用的用户名。配置该项时必须同时配置 password
passwordstring-创建 ActiveMQ 连接时使用的密码。配置该项时必须同时配置 username
client_idstring-连接工厂使用的 JMS 客户端 ID。
check_for_duplicateboolean-是否让 ActiveMQ 客户端检查重复消息。
always_session_asyncboolean-是否始终为每个 Session 使用独立线程分发消息。
always_sync_sendboolean-是否始终使用同步方式发送消息。
close_timeoutint-关闭连接的超时时间,单位毫秒。
dispatch_asyncboolean-Broker 是否异步分发消息。
nested_map_and_list_enabledboolean-是否允许结构化消息属性和 MapMessage 条目中包含嵌套的 MapList 对象。
warn_about_unstarted_connection_timeoutint-连接没有正确启动时,ActiveMQ 客户端发出警告前等待的毫秒数。设置为小于 0 的值可以关闭这个警告。

注意事项

  • uri 是连接入口,Broker 的主机和端口都写在这里,例如 tcp://activemq-host:61616
  • usernamepassword 是可选项;如果 Broker 需要认证,这两个配置要一起写。
  • 连接器会把每一行 SeaTunnel 数据作为一条 JSON 文本消息写入 queue_name

示例

把 FakeSource 数据写入 ActiveMQ 队列:

env {
parallelism = 1
job.mode = "BATCH"
}

source {
FakeSource {
schema = {
fields {
id = int
name = string
}
}
rows = [
{ kind = INSERT, fields = [1, "Alice"] }
{ kind = INSERT, fields = [2, "Bob"] }
]
}
}

sink {
ActiveMQ {
uri = "tcp://localhost:61616"
username = "admin"
password = "admin"
queue_name = "testQueue"
}
}

变更日志