跳到主要内容
版本:Next

AmazonSqs

Amazon SQS Sink 连接器

描述

Amazon SQS Sink 连接器用于把每条输入的 SeaTunnel 行数据写入一个 Amazon SQS 队列 URL。 连接器会按照 format 序列化数据,并把序列化后的内容作为 SQS 消息体发送。

支持的引擎

Spark
Flink
SeaTunnel Zeta

主要特性

Sink 选项

名称类型是否必填默认值描述
urlString-要写入的完整 SQS 队列 URL,例如 https://sqs.us-east-1.amazonaws.com/123456789012/sink_queue
regionString-SQS 队列所在的 AWS 区域,例如 us-east-1
access_key_idString-AWS access key ID。和 secret_access_key 一起配置时使用静态凭证;两者都不配置时使用 AWS 默认凭证链。
secret_access_keyString-AWS secret access key。和 access_key_id 一起配置时使用静态凭证。
formatStringjson消息体格式。支持 jsontextcanal_jsondebezium_json
field_delimiterString,format = text 时使用的字段分隔符。
common-options-Sink 插件通用参数,详见 Sink Common Options

格式说明

  • json:把每行数据写成 JSON 对象。
  • text:用 field_delimiter 拼接每行中的字段。
  • canal_json:写出 Canal JSON 消息,详见 Canal JSON
  • debezium_json:写出 Debezium JSON 消息,详见 Debezium JSON
  • 当前 sink 只发送消息体,不提供 SQS message attributes、delay seconds、deduplication ID 或 message group ID 等配置。

任务示例

写入 JSON 消息

env {
parallelism = 1
job.mode = "BATCH"
}

source {
FakeSource {
row.num = 1
schema = {
fields {
name = string
}
}
rows = [
{
kind = INSERT
fields = ["test_name"]
}
]
}
}

sink {
AmazonSqs {
url = "https://sqs.us-east-1.amazonaws.com/123456789012/sink_queue"
region = "us-east-1"
access_key_id = "AKIA..."
secret_access_key = "SECRET..."
}
}

使用自定义分隔符写入文本消息

source {
FakeSource {
schema = {
fields {
artist = string
album = string
release_year = int
}
}
}
}

sink {
AmazonSqs {
url = "https://sqs.us-east-1.amazonaws.com/123456789012/sink_queue"
region = "us-east-1"
format = text
field_delimiter = "|"
}
}

变更日志