跳到主要内容
版本:2.3.10

Pulsar

Pulsar 数据连接器

引擎支持

Spark
Flink
Seatunnel Zeta

核心特性

描述

Apache Pulsar 的接收连接器。

支持的数据源信息

数据源支持的版本
PulsarUniversal

输出选项

名称类型是否必须默认值描述
topicStringYes-输出到Pulsar主题名称.
client.service-urlStringYes-Pulsar 服务的服务 URL 提供者.
admin.service-urlStringYes-管理端点的 Pulsar 服务 HTTP URL.
auth.plugin-classStringNo-身份验证插件的名称.
auth.paramsStringNo-身份验证插件的参数.
formatStringNojson数据格式。默认格式为 json。可选的文本格式.
field_delimiterStringNo,自定义数据格式的字段分隔符.
semanticsEnumNoAT_LEAST_ONCE写入 pulsar 的一致性语义.
transaction_timeoutIntNo600默认情况下,事务超时指定为 10 分钟.
pulsar.configMapNo-除了上述必须由 Pulsar 生产者客户端指定的参数外.
message.routing.modeEnumNoRoundRobinPartition要分区的消息的默认路由模式.
partition_key_fieldsarrayNo-配置哪些字段用作 pulsar 消息的键.
common-optionsconfigno-源插件常用参数,详见源码 常用选项.

参数解释

client.service-url [String]

Pulsar 服务的 Service URL 提供程序。要使用客户端库连接到 Pulsar, 您需要指定一个 Pulsar 协议 URL。您可以将 Pulsar 协议 URL 分配给特定集群并使用 Pulsar 方案。

例如, localhost: pulsar://localhost:6650,localhost:6651.

admin.service-url [String]

管理端点的 Pulsar 服务 HTTP URL.

例如, http://my-broker.example.com:8080, or https://my-broker.example.com:8443 for TLS.

auth.plugin-class [String]

身份验证插件的名称。

auth.params [String]

身份验证插件的参数。

例如, key1:val1,key2:val2

format [String]

数据格式。默认格式为 json。可选的文本格式。默认字段分隔符为","。如果自定义分隔符,请添加"field_delimiter"选项。

field_delimiter [String]

自定义数据格式的字段分隔符。默认field_delimiter为','。

semantics [Enum]

写入 pulsar 的一致性语义。可用选项包括 EXACTLY_ONCE、NON、AT_LEAST_ONCE、默认AT_LEAST_ONCE。 如果语义被指定为 EXACTLY_ONCE,我们将使用 2pc 来保证消息被准确地发送到 pulsar 一次。 如果语义指定为 NON,我们将直接将消息发送到 pulsar,如果作业重启/重试或网络错误,数据可能会重复/丢失。

transaction_timeout [Int]

默认情况下,事务超时指定为 10 分钟。如果事务未在指定的超时时间内提交,则事务将自动中止。因此,您需要确保超时大于检查点间隔。

pulsar.config [Map]

除了上述 Pulsar 生产者客户端必须指定的参数外,用户还可以为生产者客户端指定多个非强制性参数, 涵盖 Pulsar 官方文档中指定的所有生产者参数。

message.routing.mode [Enum]

要分区的消息的默认路由模式。可用选项包括 SinglePartition、RoundRobinPartition。 如果选择 SinglePartition,如果未提供密钥,分区生产者将随机选择一个分区并将所有消息发布到该分区中,如果消息上提供了密钥,则分区生产者将对密钥进行哈希处理并将消息分配给特定分区。 如果选择 RoundRobinPartition,则如果未提供密钥,则生产者将以循环方式跨所有分区发布消息,以实现最大吞吐量。请注意,轮询不是按单个消息完成的,而是设置为相同的批处理延迟边界,以确保批处理有效。

partition_key_fields [String]

配置哪些字段用作 pulsar 消息的键。

例如,如果要使用上游数据中的字段值作为键,则可以为此属性分配字段名称。

上游数据如下:

nameagedata
Jack16data-example1
Mary23data-example2

如果将 name 设置为键,则 name 列的哈希值将确定消息发送到哪个分区。

如果未设置分区键字段,则将向 null 消息键发送至。

消息键的格式为 json,如果 name 设置为键,例如 '{“name”:“Jack”}'。

所选字段必须是上游的现有字段。

常见选项

源插件常用参数,详见源码常用选项 .

任务示例

简单:

该示例定义了一个 SeaTunnel 同步任务,该任务通过 FakeSource 自动生成数据并将其发送到 Pulsar Sink。FakeSource 总共生成 16 行数据 (row.num=16),每行有两个字段,name(字符串类型)和 age(int 类型)。最终目标主题是test_topic主题中还将有 16 行数据。 如果您尚未安装和部署 SeaTunnel,则需要按照安装Seatunnel SeaTunnel 中的说明安装和部署 SeaTunnel。然后按照 SeaTunnel 引擎快速入门中的说明运行此作业。

# Defining the runtime environment
env {
# You can set flink configuration here
execution.parallelism = 1
job.mode = "BATCH"
}

source {
FakeSource {
parallelism = 1
plugin_output = "fake"
row.num = 16
schema = {
fields {
name = "string"
age = "int"
}
}
}
}

sink {
Pulsar {
topic = "example"
client.service-url = "localhost:pulsar://localhost:6650"
admin.service-url = "http://my-broker.example.com:8080"
plugin_output = "test"
pulsar.config = {
sendTimeoutMs = 30000
}
}
}

变更日志

Change Log
ChangeCommitVersion
[Improve] restruct connector common options (#8634)https://github.com/apache/seatunnel/commit/f3499a6ee2.3.10
[Improve][dist]add shade check rule (#8136)https://github.com/apache/seatunnel/commit/51ef800012.3.9
[Feature][Restapi] Allow metrics information to be associated to logical plan nodes (#7786)https://github.com/apache/seatunnel/commit/6b7c53d032.3.9
[Improve][API] Make sure the table name in TablePath not be null (#7252)https://github.com/apache/seatunnel/commit/764d8b0bc2.3.7
[Feature][Kafka] Support multi-table source read (#5992)https://github.com/apache/seatunnel/commit/60104602d2.3.6
[PulsarSource]Improve pulsar throughput performance. (#6234)https://github.com/apache/seatunnel/commit/37461f4f32.3.4
[Feature][Connector-v2][PulsarSink]Add Pulsar Sink Connector. (#4382)https://github.com/apache/seatunnel/commit/543d2c5082.3.4
[Chore] Remove useless DeserializationFormatFactory and its implement (#5880)https://github.com/apache/seatunnel/commit/f0511544f2.3.4
fix: update IDENTIFIER = Pulsar for pulsar-datasource on project:seatunnel-web (#5852)https://github.com/apache/seatunnel/commit/3b6de37432.3.4
[Improve][Common] Introduce new error define rule (#5793)https://github.com/apache/seatunnel/commit/9d1b2582b2.3.4
Support config column/primaryKey/constraintKey in schema (#5564)https://github.com/apache/seatunnel/commit/eac76b4e52.3.4
[Improve][CheckStyle] Remove useless 'SuppressWarnings' annotation of checkstyle. (#5260)https://github.com/apache/seatunnel/commit/51c0d709b2.3.4
[Hotfix] Fix com.google.common.base.Preconditions to seatunnel shade one (#5284)https://github.com/apache/seatunnel/commit/ed5eadcf72.3.3
[Feature][Json-format] support read format for pulsar (#4111)https://github.com/apache/seatunnel/commit/7d61ae93e2.3.2
[hotfix][pulsar] Fix the bug that can't consume messages all the time. (#4125)https://github.com/apache/seatunnel/commit/a6705cc5b2.3.2
[Feature] add cdc multiple table support & fix zeta bughttps://github.com/apache/seatunnel/commit/533ff2c2f2.3.1
[hotfix][pulsar] PulsarSource consumer ack exception. (#4237)https://github.com/apache/seatunnel/commit/9725d675d2.3.1
Merge branch 'dev' into merge/cdchttps://github.com/apache/seatunnel/commit/4324ee1912.3.1
[Improve][Project] Code format with spotless plugin.https://github.com/apache/seatunnel/commit/423b583032.3.1
[Improve][Connector-v2][Pulsar] Set the name of the pulsar consumption thread. (#4182)https://github.com/apache/seatunnel/commit/e567203f72.3.1
[improve][api] Refactoring schema parse (#4157)https://github.com/apache/seatunnel/commit/b2f573a132.3.1
[Improve][build] Give the maven module a human readable name (#4114)https://github.com/apache/seatunnel/commit/d7cd601052.3.1
[Improve][Project] Code format with spotless plugin. (#4101)https://github.com/apache/seatunnel/commit/a2ab166562.3.1
[Bug][Connector-v2][PulsarSource]Fix pulsar option topic-pattern bug. (#3989)https://github.com/apache/seatunnel/commit/aee2c580e2.3.1
[Feature][Connector] add get source method to all source connector (#3846)https://github.com/apache/seatunnel/commit/417178fb82.3.1
[Feature][API & Connector & Doc] add parallelism and column projection interface (#3829)https://github.com/apache/seatunnel/commit/b9164b8ba2.3.1
[Improve][Connector-V2][Pulsar] Unified exception for Pulsar source &… (#3590)https://github.com/apache/seatunnel/commit/4fe9323412.3.0
[Hotfix][OptionRule] Fix option rule about all connectors (#3592)https://github.com/apache/seatunnel/commit/226dc6a112.3.0
[Hotfix][Connector-V2][Pulsar] fix conditional options (#3504)https://github.com/apache/seatunnel/commit/0066affac2.3.0
[Feature][Connector][pulsar] expose configurable options in Pulsar (#3341)https://github.com/apache/seatunnel/commit/200faa7c22.3.0
[Connector][Dependency] Add Miss Dependency Cassandra And Change Kudu Plugin Name (#3432)https://github.com/apache/seatunnel/commit/6ac6a0a0c2.3.0
[chore] fix pulsar consumer comment error (#3356)https://github.com/apache/seatunnel/commit/91e632c522.3.0
[Connector-V2][ElasticSearch] Add ElasticSearch Source/Sink Factory (#3325)https://github.com/apache/seatunnel/commit/38254e3f22.3.0
[hotfix][connector][pulsar] Fix not being able to mark #noMoreNewSplits when restoring (#2945)https://github.com/apache/seatunnel/commit/5ad69076b2.3.0-beta
Move Handover to common module (#2877)https://github.com/apache/seatunnel/commit/d94a874bc2.3.0-beta
[hotfix][connector-v2] fix pulsar source exceptions (#2820)https://github.com/apache/seatunnel/commit/8ff0ba7012.2.0-beta
[#2606]Dependency management split (#2630)https://github.com/apache/seatunnel/commit/fc047be692.2.0-beta
[SeaTunnel]Simply seatunnel package pipeline. (#2563)https://github.com/apache/seatunnel/commit/9d88b62212.2.0-beta
[Improve][Connector-V2] Pulsar support user-defined schema (#2436)https://github.com/apache/seatunnel/commit/16cabe6a32.2.0-beta
[improve][UT] Upgrade junit to 5.+ (#2305)https://github.com/apache/seatunnel/commit/362319ff32.2.0-beta
StateT of SeaTunnelSource should extend Serializable (#2214)https://github.com/apache/seatunnel/commit/8c426ef852.2.0-beta
[doc][connector-v2] pulsar source options doc (#2128)https://github.com/apache/seatunnel/commit/59ce8a2b32.2.0-beta
[api-draft][Optimize] Optimize module name (#2062)https://github.com/apache/seatunnel/commit/f79e3112b2.2.0-beta