跳到主要内容
版本:Next

Pulsar

Pulsar 数据连接器

引擎支持

Spark
Flink
Seatunnel Zeta

核心特性

描述

Apache Pulsar 的接收连接器。

支持的数据源信息

数据源支持的版本
PulsarUniversal

输出选项

名称类型是否必须默认值描述
topicStringYes-输出到Pulsar主题名称.
client.service-urlStringYes-Pulsar 服务的服务 URL 提供者.
admin.service-urlStringYes-管理端点的 Pulsar 服务 HTTP URL.
auth.plugin-classStringNo-身份验证插件的名称.
auth.paramsStringNo-身份验证插件的参数.
formatStringNojson数据格式。默认格式为 json。可选的文本格式.
field_delimiterStringNo,自定义数据格式的字段分隔符.
semanticsEnumNoAT_LEAST_ONCE写入 pulsar 的一致性语义.
transaction_timeoutIntNo600默认情况下,事务超时指定为 10 分钟.
pulsar.configMapNo-除了上述必须由 Pulsar 生产者客户端指定的参数外.
message.routing.modeEnumNoRoundRobinPartition要分区的消息的默认路由模式.
partition_key_fieldsarrayNo-配置哪些字段用作 pulsar 消息的键.
common-optionsconfigno-源插件常用参数,详见源码 常用选项.

参数解释

client.service-url [String]

Pulsar 服务的 Service URL 提供程序。要使用客户端库连接到 Pulsar, 您需要指定一个 Pulsar 协议 URL。您可以将 Pulsar 协议 URL 分配给特定集群并使用 Pulsar 方案。

例如, localhost: pulsar://localhost:6650,localhost:6651.

admin.service-url [String]

管理端点的 Pulsar 服务 HTTP URL.

例如, http://my-broker.example.com:8080, or https://my-broker.example.com:8443 for TLS.

auth.plugin-class [String]

身份验证插件的名称。

auth.params [String]

身份验证插件的参数。

例如, key1:val1,key2:val2

format [String]

数据格式。默认格式为 json。可选的文本格式。默认字段分隔符为","。如果自定义分隔符,请添加"field_delimiter"选项。

field_delimiter [String]

自定义数据格式的字段分隔符。默认field_delimiter为','。

semantics [Enum]

写入 pulsar 的一致性语义。可用选项包括 EXACTLY_ONCE、NON、AT_LEAST_ONCE、默认AT_LEAST_ONCE。 如果语义被指定为 EXACTLY_ONCE,我们将使用 2pc 来保证消息被准确地发送到 pulsar 一次。 如果语义指定为 NON,我们将直接将消息发送到 pulsar,如果作业重启/重试或网络错误,数据可能会重复/丢失。

transaction_timeout [Int]

默认情况下,事务超时指定为 10 分钟。如果事务未在指定的超时时间内提交,则事务将自动中止。因此,您需要确保超时大于检查点间隔。

pulsar.config [Map]

除了上述 Pulsar 生产者客户端必须指定的参数外,用户还可以为生产者客户端指定多个非强制性参数, 涵盖 Pulsar 官方文档中指定的所有生产者参数。

message.routing.mode [Enum]

要分区的消息的默认路由模式。可用选项包括 SinglePartition、RoundRobinPartition。 如果选择 SinglePartition,如果未提供密钥,分区生产者将随机选择一个分区并将所有消息发布到该分区中,如果消息上提供了密钥,则分区生产者将对密钥进行哈希处理并将消息分配给特定分区。 如果选择 RoundRobinPartition,则如果未提供密钥,则生产者将以循环方式跨所有分区发布消息,以实现最大吞吐量。请注意,轮询不是按单个消息完成的,而是设置为相同的批处理延迟边界,以确保批处理有效。

partition_key_fields [String]

配置哪些字段用作 pulsar 消息的键。

例如,如果要使用上游数据中的字段值作为键,则可以为此属性分配字段名称。

上游数据如下:

nameagedata
Jack16data-example1
Mary23data-example2

如果将 name 设置为键,则 name 列的哈希值将确定消息发送到哪个分区。

如果未设置分区键字段,则将向 null 消息键发送至。

消息键的格式为 json,如果 name 设置为键,例如 '{“name”:“Jack”}'。

所选字段必须是上游的现有字段。

常见选项

源插件常用参数,详见源码常用选项 .

任务示例

简单:

该示例定义了一个 SeaTunnel 同步任务,该任务通过 FakeSource 自动生成数据并将其发送到 Pulsar Sink。FakeSource 总共生成 16 行数据 (row.num=16),每行有两个字段,name(字符串类型)和 age(int 类型)。最终目标主题是test_topic主题中还将有 16 行数据。 如果您尚未安装和部署 SeaTunnel,则需要按照安装Seatunnel SeaTunnel 中的说明安装和部署 SeaTunnel。然后按照 SeaTunnel 引擎快速入门中的说明运行此作业。

# Defining the runtime environment
env {
# You can set flink configuration here
execution.parallelism = 1
job.mode = "BATCH"
}

source {
FakeSource {
parallelism = 1
plugin_output = "fake"
row.num = 16
schema = {
fields {
name = "string"
age = "int"
}
}
}
}

sink {
Pulsar {
topic = "example"
client.service-url = "localhost:pulsar://localhost:6650"
admin.service-url = "http://my-broker.example.com:8080"
plugin_output = "test"
pulsar.config = {
sendTimeoutMs = 30000
}
}
}

更改日志

下一个版本

  • 添加 Pulsar Sink 连接器