Pulsar
Pulsar 数据连接器
引擎支持
Spark
Flink
Seatunnel Zeta
核心特性
描述
Apache Pulsar 的接收连接器。
支持的数据源信息
数据源 | 支持的版本 |
---|---|
Pulsar | Universal |
输出选项
名称 | 类型 | 是否必须 | 默认值 | 描述 |
---|---|---|---|---|
topic | String | Yes | - | 输出到Pulsar主题名称. |
client.service-url | String | Yes | - | Pulsar 服务的服务 URL 提供者. |
admin.service-url | String | Yes | - | 管理端点的 Pulsar 服务 HTTP URL. |
auth.plugin-class | String | No | - | 身份验证插件的名称. |
auth.params | String | No | - | 身份验证插件的参数. |
format | String | No | json | 数据格式。默认格式为 json。可选的文本格式. |
field_delimiter | String | No | , | 自定义数据格式的字段分隔符. |
semantics | Enum | No | AT_LEAST_ONCE | 写入 pulsar 的一致性语义. |
transaction_timeout | Int | No | 600 | 默认情况下,事务超时指定为 10 分钟. |
pulsar.config | Map | No | - | 除了上述必须由 Pulsar 生产者客户端指定的参数外. |
message.routing.mode | Enum | No | RoundRobinPartition | 要分区的消息的默认路由模式. |
partition_key_fields | array | No | - | 配置哪些字段用作 pulsar 消息的键. |
common-options | config | no | - | 源插件常用参数,详见源码 常用选项. |
参数解释
client.service-url [String]
Pulsar 服务的 Service URL 提供程序。要使用客户端库连接到 Pulsar, 您需要指定一个 Pulsar 协议 URL。您可以将 Pulsar 协议 URL 分配给特定集群并使用 Pulsar 方案。
例如, localhost
: pulsar://localhost:6650,localhost:6651
.
admin.service-url [String]
管理端点的 Pulsar 服务 HTTP URL.
例如, http://my-broker.example.com:8080
, or https://my-broker.example.com:8443
for TLS.
auth.plugin-class [String]
身份验证插件的名称。
auth.params [String]
身份验证插件的参数。
例如, key1:val1,key2:val2
format [String]
数据格式。默认格式为 json。可选的文本格式。默认字段分隔符为","。如果自定义分隔符,请添加"field_delimiter"选项。
field_delimiter [String]
自定义数据格式的字段分隔符。默认field_delimiter为','。
semantics [Enum]
写入 pulsar 的一致性语义。可用选项包括 EXACTLY_ONCE、NON、AT_LEAST_ONCE、默认AT_LEAST_ONCE。 如果语义被指定为 EXACTLY_ONCE,我们将使用 2pc 来保证消息被准确地发送到 pulsar 一次。 如果语义指定为 NON,我们将直接将消息发送到 pulsar,如果作业重启/重试或网络错误,数据可能会重复/丢失。
transaction_timeout [Int]
默认情况下,事务超时指定为 10 分钟。如果事务未在指定的超时时间内提交,则事务将自动中止。因此,您需要确保超时大于检查点间隔。
pulsar.config [Map]
除了上述 Pulsar 生产者客户端必须指定的参数外,用户还可以为生产者客户端指定多个非强制性参数, 涵盖 Pulsar 官方文档中指定的所有生产者参数。
message.routing.mode [Enum]
要分区的消息的默认路由模式。可用选项包括 SinglePartition、RoundRobinPartition。 如果选择 SinglePartition,如果未提供密钥,分区生产者将随机选择一个分区并将所有消息发布到该分区中,如果消息上提供了密钥,则分区生产者将对密钥进行哈希处理并将消息分配给特定分区。 如果选择 RoundRobinPartition,则如果未提供密钥,则生产者将以循环方式跨所有分区发布消息,以实现最大吞吐量。请注意,轮询不是按单个消息完成的,而是设置为相同的批处理延迟边界,以确保批处理有效。
partition_key_fields [String]
配置哪些字段用作 pulsar 消息的键。
例如,如果要使用上游数据中的字段值作为键,则可以为此属性分配字段名称。
上游数据如下:
name | age | data |
---|---|---|
Jack | 16 | data-example1 |
Mary | 23 | data-example2 |
如果将 name 设置为键,则 name 列的哈希值将确定消息发送到哪个分区。
如果未设置分区键字段,则将向 null 消息键发送至。
消息键的格式为 json,如果 name 设置为键,例如 '{“name”:“Jack”}'。
所选字段必须是上游的现有字段。
常见选项
源插件常用参数,详见源码常用选项 .
任务示例
简单:
该示例定义了一个 SeaTunnel 同步任务,该任务通过 FakeSource 自动生成数据并将其发送到 Pulsar Sink。FakeSource 总共生成 16 行数据 (row.num=16),每行有两个字段,name(字符串类型)和 age(int 类型)。最终目标主题是test_topic主题中还将有 16 行数据。 如果您尚未安装和部署 SeaTunnel,则需要按照安装Seatunnel SeaTunnel 中的说明安装和部署 SeaTunnel。然后按照 SeaTunnel 引擎快速入门中的说明运行此作业。
# Defining the runtime environment
env {
# You can set flink configuration here
execution.parallelism = 1
job.mode = "BATCH"
}
source {
FakeSource {
parallelism = 1
plugin_output = "fake"
row.num = 16
schema = {
fields {
name = "string"
age = "int"
}
}
}
}
sink {
Pulsar {
topic = "example"
client.service-url = "localhost:pulsar://localhost:6650"
admin.service-url = "http://my-broker.example.com:8080"
plugin_output = "test"
pulsar.config = {
sendTimeoutMs = 30000
}
}
}
变更日志
Change Log
Change | Commit | Version |
---|---|---|
[Improve] restruct connector common options (#8634) | https://github.com/apache/seatunnel/commit/f3499a6ee | 2.3.10 |
[Improve][dist]add shade check rule (#8136) | https://github.com/apache/seatunnel/commit/51ef80001 | 2.3.9 |
[Feature][Restapi] Allow metrics information to be associated to logical plan nodes (#7786) | https://github.com/apache/seatunnel/commit/6b7c53d03 | 2.3.9 |
[Improve][API] Make sure the table name in TablePath not be null (#7252) | https://github.com/apache/seatunnel/commit/764d8b0bc | 2.3.7 |
[Feature][Kafka] Support multi-table source read (#5992) | https://github.com/apache/seatunnel/commit/60104602d | 2.3.6 |
[PulsarSource]Improve pulsar throughput performance. (#6234) | https://github.com/apache/seatunnel/commit/37461f4f3 | 2.3.4 |
[Feature][Connector-v2][PulsarSink]Add Pulsar Sink Connector. (#4382) | https://github.com/apache/seatunnel/commit/543d2c508 | 2.3.4 |
[Chore] Remove useless DeserializationFormatFactory and its implement (#5880) | https://github.com/apache/seatunnel/commit/f0511544f | 2.3.4 |
fix: update IDENTIFIER = Pulsar for pulsar-datasource on project:seatunnel-web (#5852) | https://github.com/apache/seatunnel/commit/3b6de3743 | 2.3.4 |
[Improve][Common] Introduce new error define rule (#5793) | https://github.com/apache/seatunnel/commit/9d1b2582b | 2.3.4 |
Support config column/primaryKey/constraintKey in schema (#5564) | https://github.com/apache/seatunnel/commit/eac76b4e5 | 2.3.4 |
[Improve][CheckStyle] Remove useless 'SuppressWarnings' annotation of checkstyle. (#5260) | https://github.com/apache/seatunnel/commit/51c0d709b | 2.3.4 |
[Hotfix] Fix com.google.common.base.Preconditions to seatunnel shade one (#5284) | https://github.com/apache/seatunnel/commit/ed5eadcf7 | 2.3.3 |
[Feature][Json-format] support read format for pulsar (#4111) | https://github.com/apache/seatunnel/commit/7d61ae93e | 2.3.2 |
[hotfix][pulsar] Fix the bug that can't consume messages all the time. (#4125) | https://github.com/apache/seatunnel/commit/a6705cc5b | 2.3.2 |
[Feature] add cdc multiple table support & fix zeta bug | https://github.com/apache/seatunnel/commit/533ff2c2f | 2.3.1 |
[hotfix][pulsar] PulsarSource consumer ack exception. (#4237) | https://github.com/apache/seatunnel/commit/9725d675d | 2.3.1 |
Merge branch 'dev' into merge/cdc | https://github.com/apache/seatunnel/commit/4324ee191 | 2.3.1 |
[Improve][Project] Code format with spotless plugin. | https://github.com/apache/seatunnel/commit/423b58303 | 2.3.1 |
[Improve][Connector-v2][Pulsar] Set the name of the pulsar consumption thread. (#4182) | https://github.com/apache/seatunnel/commit/e567203f7 | 2.3.1 |
[improve][api] Refactoring schema parse (#4157) | https://github.com/apache/seatunnel/commit/b2f573a13 | 2.3.1 |
[Improve][build] Give the maven module a human readable name (#4114) | https://github.com/apache/seatunnel/commit/d7cd60105 | 2.3.1 |
[Improve][Project] Code format with spotless plugin. (#4101) | https://github.com/apache/seatunnel/commit/a2ab16656 | 2.3.1 |
[Bug][Connector-v2][PulsarSource]Fix pulsar option topic-pattern bug. (#3989) | https://github.com/apache/seatunnel/commit/aee2c580e | 2.3.1 |
[Feature][Connector] add get source method to all source connector (#3846) | https://github.com/apache/seatunnel/commit/417178fb8 | 2.3.1 |
[Feature][API & Connector & Doc] add parallelism and column projection interface (#3829) | https://github.com/apache/seatunnel/commit/b9164b8ba | 2.3.1 |
[Improve][Connector-V2][Pulsar] Unified exception for Pulsar source &… (#3590) | https://github.com/apache/seatunnel/commit/4fe932341 | 2.3.0 |
[Hotfix][OptionRule] Fix option rule about all connectors (#3592) | https://github.com/apache/seatunnel/commit/226dc6a11 | 2.3.0 |
[Hotfix][Connector-V2][Pulsar] fix conditional options (#3504) | https://github.com/apache/seatunnel/commit/0066affac | 2.3.0 |
[Feature][Connector][pulsar] expose configurable options in Pulsar (#3341) | https://github.com/apache/seatunnel/commit/200faa7c2 | 2.3.0 |
[Connector][Dependency] Add Miss Dependency Cassandra And Change Kudu Plugin Name (#3432) | https://github.com/apache/seatunnel/commit/6ac6a0a0c | 2.3.0 |
[chore] fix pulsar consumer comment error (#3356) | https://github.com/apache/seatunnel/commit/91e632c52 | 2.3.0 |
[Connector-V2][ElasticSearch] Add ElasticSearch Source/Sink Factory (#3325) | https://github.com/apache/seatunnel/commit/38254e3f2 | 2.3.0 |
[hotfix][connector][pulsar] Fix not being able to mark #noMoreNewSplits when restoring (#2945) | https://github.com/apache/seatunnel/commit/5ad69076b | 2.3.0-beta |
Move Handover to common module (#2877) | https://github.com/apache/seatunnel/commit/d94a874bc | 2.3.0-beta |
[hotfix][connector-v2] fix pulsar source exceptions (#2820) | https://github.com/apache/seatunnel/commit/8ff0ba701 | 2.2.0-beta |
[#2606]Dependency management split (#2630) | https://github.com/apache/seatunnel/commit/fc047be69 | 2.2.0-beta |
[SeaTunnel]Simply seatunnel package pipeline. (#2563) | https://github.com/apache/seatunnel/commit/9d88b6221 | 2.2.0-beta |
[Improve][Connector-V2] Pulsar support user-defined schema (#2436) | https://github.com/apache/seatunnel/commit/16cabe6a3 | 2.2.0-beta |
[improve][UT] Upgrade junit to 5.+ (#2305) | https://github.com/apache/seatunnel/commit/362319ff3 | 2.2.0-beta |
StateT of SeaTunnelSource should extend Serializable (#2214) | https://github.com/apache/seatunnel/commit/8c426ef85 | 2.2.0-beta |
[doc][connector-v2] pulsar source options doc (#2128) | https://github.com/apache/seatunnel/commit/59ce8a2b3 | 2.2.0-beta |
[api-draft][Optimize] Optimize module name (#2062) | https://github.com/apache/seatunnel/commit/f79e3112b | 2.2.0-beta |