跳到主要内容
版本:2.3.10

Kafka

Kafka 源连接器

支持以下引擎

Spark

Flink

Seatunnel Zeta

主要功能

描述

用于 Apache Kafka 的源连接器。

支持的数据源信息

使用 Kafka 连接器需要以下依赖项。
可以通过 install-plugin.sh 下载或从 Maven 中央仓库获取。

数据源支持的版本Maven 下载链接
Kafka通用版本下载

源选项

名称类型是否必填默认值描述
topicString-使用表作为数据源时要读取数据的主题名称。它也支持通过逗号分隔的多个主题列表,例如 'topic-1,topic-2'。
table_listMap-主题列表配置,你可以同时配置一个 table_list 和一个 topic
bootstrap.serversString-逗号分隔的 Kafka brokers 列表。
patternBooleanfalse如果 pattern 设置为 true,则会使用指定的正则表达式匹配并订阅主题。
consumer.groupStringSeaTunnel-Consumer-GroupKafka 消费者组 ID,用于区分不同的消费者组。
commit_on_checkpointBooleantrue如果为 true,消费者的偏移量将会定期在后台提交。
poll.timeoutLong10000kafka主动拉取时间间隔(毫秒)。
kafka.configMap-除了上述必要参数外,用户还可以指定多个非强制的消费者客户端参数,覆盖 Kafka 官方文档 中指定的所有消费者参数。
schemaConfig-数据结构,包括字段名称和字段类型。
formatStringjson数据格式。默认格式为 json。可选格式包括 text, canal_json, debezium_json, ogg_json, maxwell_json, avro , protobuf和native。默认字段分隔符为 ", "。如果自定义分隔符,添加 "field_delimiter" 选项。如果使用 canal 格式,请参考 canal-json 了解详细信息。如果使用 debezium 格式,请参考 debezium-json。一些Format的详细信息请参考 formats
format_error_handle_wayStringfail数据格式错误的处理方式。默认值为 fail,可选值为 fail 和 skip。当选择 fail 时,数据格式错误将阻塞并抛出异常。当选择 skip 时,数据格式错误将跳过此行数据。
debezium_record_table_filterConfig-用于过滤 debezium 格式的数据,仅当格式设置为 debezium_json 时使用。请参阅下面的 debezium_record_table_filter
field_delimiterString,自定义数据格式的字段分隔符。
start_modeStartMode[earliest],[group_offsets]group_offsets消费者的初始消费模式。
start_mode.offsetsConfig-用于 specific_offsets 消费模式的偏移量。
start_mode.timestampLong-用于 "timestamp" 消费模式的时间。
partition-discovery.interval-millisLong-1动态发现主题和分区的间隔时间。
common-options-源插件的常见参数,详情请参考 Source Common Options
protobuf_message_nameString-当格式设置为 protobuf 时有效,指定消息名称。
protobuf_schemaString-当格式设置为 protobuf 时有效,指定 Schema 定义。
is_nativeBooleanNofalse支持保留record的源信息。

debezium_record_table_filter

我们可以使用 debezium_record_table_filter 来过滤 debezium 格式的数据。配置如下:

debezium_record_table_filter {
database_name = "test"
schema_name = "public" // null 如果不存在
table_name = "products"
}

只有 test.public.products 表的数据将被消费。

任务示例

简单示例

此示例读取 Kafka 的 topic_1、topic_2 和 topic_3 的数据并将其打印到客户端。如果尚未安装和部署 SeaTunnel,请按照 安装指南 进行安装和部署。然后,按照 快速开始 运行此任务。

# 定义运行环境
env {
parallelism = 2
job.mode = "BATCH"
}
source {
Kafka {
schema = {
fields {
name = "string"
age = "int"
}
}
format = text
field_delimiter = "#"
topic = "topic_1,topic_2,topic_3"
bootstrap.servers = "localhost:9092"
kafka.config = {
client.id = client_1
max.poll.records = 500
auto.offset.reset = "earliest"
enable.auto.commit = "false"
}
}
}
sink {
Console {}
}

正则表达式主题

source {
Kafka {
topic = ".*seatunnel*."
pattern = "true"
bootstrap.servers = "localhost:9092"
consumer.group = "seatunnel_group"
}
}

AWS MSK SASL/SCRAM

将以下 ${username}${password} 替换为 AWS MSK 中的配置值。

source {
Kafka {
topic = "seatunnel"
bootstrap.servers = "xx.amazonaws.com.cn:9096,xxx.amazonaws.com.cn:9096,xxxx.amazonaws.com.cn:9096"
consumer.group = "seatunnel_group"
kafka.config = {
security.protocol=SASL_SSL
sasl.mechanism=SCRAM-SHA-512
sasl.jaas.config="org.apache.kafka.common.security.scram.ScramLoginModule required username=\"username\" password=\"password\";"
}
}
}

AWS MSK IAM

此处 下载 aws-msk-iam-auth-1.1.5.jar 并将其放在 $SEATUNNEL_HOME/plugin/kafka/lib 目录下。

确保 IAM 策略中包含 "kafka-cluster:Connect" 权限,如下所示:

"Effect": "Allow",
"Action": [
"kafka-cluster:Connect",
"kafka-cluster:AlterCluster",
"kafka-cluster:DescribeCluster"
],

源配置示例:

source {
Kafka {
topic = "seatunnel"
bootstrap.servers = "xx.amazonaws.com.cn:9098,xxx.amazonaws.com.cn:9098,xxxx.amazonaws.com.cn:9098"
consumer.group = "seatunnel_group"
kafka.config = {
security.protocol=SASL_SSL
sasl.mechanism=AWS_MSK_IAM
sasl.jaas.config="software.amazon.msk.auth.iam.IAMLoginModule required;"
sasl.client.callback.handler.class="software.amazon.msk.auth.iam.IAMClientCallbackHandler"
}
}
}

Kerberos 认证示例

源配置示例:

source {
Kafka {
topic = "seatunnel"
bootstrap.servers = "127.0.0.1:9092"
consumer.group = "seatunnel_group"
kafka.config = {
security.protocol=SASL_PLAINTEXT
sasl.kerberos.service.name=kafka
sasl.mechanism=GSSAPI
java.security.krb5.conf="/etc/krb5.conf"
sasl.jaas.config="com.sun.security.auth.module.Krb5LoginModule required \n useKeyTab=true \n storeKey=true \n keyTab=\"/path/to/xxx.keytab\" \n principal=\"user@xxx.com\";"
}
}
}

多 Kafka 源示例

根据不同的 Kafka 主题和格式解析数据,并基于 ID 执行 upsert 操作。

注意: Kafka是一个非结构化数据源,应该使用tables_configs,将来会删除table_list


env {
execution.parallelism = 1
job.mode = "BATCH"
}

source {
Kafka {
bootstrap.servers = "kafka_e2e:9092"
tables_configs = [
{
topic = "^test-ogg-sou.*"
pattern = "true"
consumer.group = "ogg_multi_group"
start_mode = earliest
schema = {
fields {
id = "int"
name = "string"
description = "string"
weight = "string"
}
},
format = ogg_json
},
{
topic = "test-cdc_mds"
start_mode = earliest
schema = {
fields {
id = "int"
name = "string"
description = "string"
weight = "string"
}
},
format = canal_json
}
]
}
}

sink {
Jdbc {
driver = org.postgresql.Driver
url = "jdbc:postgresql://postgresql:5432/test?loggerLevel=OFF"
user = test
password = test
generate_sink_sql = true
database = test
table = public.sink
primary_keys = ["id"]
}
}
env {
execution.parallelism = 1
job.mode = "BATCH"
}

source {
Kafka {


bootstrap.servers = "kafka_e2e:9092"
table_list = [
{
topic = "^test-ogg-sou.*"
pattern = "true"
consumer.group = "ogg_multi_group"
start_mode = earliest
schema = {
fields {
id = "int"
name = "string"
description = "string"
weight = "string"
}
},
format = ogg_json
},
{
topic = "test-cdc_mds"
start_mode = earliest
schema = {
fields {
id = "int"
name = "string"
description = "string"
weight = "string"
}
},
format = canal_json
}
]
}
}

sink {
Jdbc {
driver = org.postgresql.Driver
url = "jdbc:postgresql://postgresql:5432/test?loggerLevel=OFF"
user = test
password = test
generate_sink_sql = true
database = test
table = public.sink
primary_keys = ["id"]
}
}

Protobuf配置

format 设置为 protobuf,配置protobuf数据结构,protobuf_message_nameprotobuf_schema参数

使用样例:

source {
Kafka {
topic = "test_protobuf_topic_fake_source"
format = protobuf
protobuf_message_name = Person
protobuf_schema = """
syntax = "proto3";

package org.apache.seatunnel.format.protobuf;

option java_outer_classname = "ProtobufE2E";

message Person {
int32 c_int32 = 1;
int64 c_int64 = 2;
float c_float = 3;
double c_double = 4;
bool c_bool = 5;
string c_string = 6;
bytes c_bytes = 7;

message Address {
string street = 1;
string city = 2;
string state = 3;
string zip = 4;
}

Address address = 8;

map<string, float> attributes = 9;

repeated string phone_numbers = 10;
}
"""
bootstrap.servers = "kafkaCluster:9092"
start_mode = "earliest"
plugin_output = "kafka_table"
}
}

format

如果需要保留Kafka原生的信息,可以参考如下配置。

配置示例:

source {
Kafka {
topic = "test_topic_native_source"
bootstrap.servers = "kafkaCluster:9092"
start_mode = "earliest"
format_error_handle_way = skip
format = "NATIVE"
value_converter_schema_enabled = false
consumer.group = "native_group"
}
}

返回数据格式如下:

{
"headers": {
"header1": "header1",
"header2": "header2"
},
"key": "dGVzdF9ieXRlc19kYXRh",
"partition": 3,
"timestamp": 1672531200000,
"timestampType": "CREATE_TIME",
"value": "dGVzdF9ieXRlc19kYXRh"
}

注意:key/value是byte[]类型。

变更日志

Change Log
ChangeCommitVersion
[Feature][Kafka] Support native format read/write kafka record (#8724)https://github.com/apache/seatunnel/commit/86e2d6fcf2.3.10
[improve] update kafka source default schema from content<ROW<content STRING>> to content<STRING> (#8642)https://github.com/apache/seatunnel/commit/db6e2994d2.3.10
[Improve] restruct connector common options (#8634)https://github.com/apache/seatunnel/commit/f3499a6ee2.3.10
[improve] kafka connector options (#8616)https://github.com/apache/seatunnel/commit/aadfe99f82.3.10
[Fix][Kafka Source] kafka source use topic as table name instead of fullName (#8401)https://github.com/apache/seatunnel/commit/3d4f4bb332.3.10
[Feature][Kafka] Add debezium_record_table_filter and fix error (#8391)https://github.com/apache/seatunnel/commit/b27a30a5a2.3.9
[Bug][Kafka] kafka reads repeatedly (#8465)https://github.com/apache/seatunnel/commit/f67f272792.3.9
[Hotfix][Connector-V2][kafka] fix kafka sink config exactly-once exception (#7857)https://github.com/apache/seatunnel/commit/92b3253a52.3.9
[Improve][dist]add shade check rule (#8136)https://github.com/apache/seatunnel/commit/51ef800012.3.9
[Improve][Kafka] Support custom topic for debezium compatible format (#8145)https://github.com/apache/seatunnel/commit/deefe87622.3.9
[Improve][API] Unified tables_configs and table_list (#8100)https://github.com/apache/seatunnel/commit/84c0b8d662.3.9
[Fix][Kafka] Fix in kafka streaming mode can not read incremental data (#7871)https://github.com/apache/seatunnel/commit/a0eeeb9b62.3.9
[Feature][Core] Support cdc task ddl restore for zeta (#7463)https://github.com/apache/seatunnel/commit/8e322281e2.3.9
[Fix][Connector-V2] Fix kafka format_error_handle_way not work (#7838)https://github.com/apache/seatunnel/commit/63c7b4e9c2.3.9
[Feature][Restapi] Allow metrics information to be associated to logical plan nodes (#7786)https://github.com/apache/seatunnel/commit/6b7c53d032.3.9
[Feature][kafka] Add arg poll.timeout for interval poll messages (#7606)https://github.com/apache/seatunnel/commit/09d12fc402.3.8
[Improve][Kafka] kafka source refactored some reader read logic (#6408)https://github.com/apache/seatunnel/commit/10598b6ae2.3.8
[Feature][connector-v2]Add Kafka Protobuf Data Parsing Support (#7361)https://github.com/apache/seatunnel/commit/51c8e1a832.3.8
[Hotfix][Connector] Fix kafka consumer log next startup offset (#7312)https://github.com/apache/seatunnel/commit/8916523992.3.7
[Fix][Connector kafka]Fix Kafka consumer stop fetching after TM node restarted (#7233)https://github.com/apache/seatunnel/commit/7dc3fa8a12.3.6
[Fix][Connector-V2] Fix kafka batch mode can not read all message (#7135)https://github.com/apache/seatunnel/commit/1784c01a32.3.6
[Feature][connector][kafka] Support read Maxwell format message from kafka #4415 (#4428)https://github.com/apache/seatunnel/commit/4281b867a2.3.6
[Hotfix][Connector-V2][kafka]Kafka consumer group automatically commits offset logic error fix (#6961)https://github.com/apache/seatunnel/commit/181f01ee52.3.6
[Improve][CDC] Bump the version of debezium to 1.9.8.Final (#6740)https://github.com/apache/seatunnel/commit/c3ac953522.3.6
[Feature][Kafka] Support multi-table source read (#5992)https://github.com/apache/seatunnel/commit/60104602d2.3.6
[Fix][Kafka-Sink] fix kafka sink factory option rule (#6657)https://github.com/apache/seatunnel/commit/37578e1032.3.5
[Feature][Connector-V2] Remove useless code for kafka connector (#6157)https://github.com/apache/seatunnel/commit/0f286d1622.3.4
[Feature] support avro format (#5084)https://github.com/apache/seatunnel/commit/93a0061562.3.4
[Improve][Common] Introduce new error define rule (#5793)https://github.com/apache/seatunnel/commit/9d1b2582b2.3.4
[Improve] Remove use SeaTunnelSink::getConsumedType method and mark it as deprecated (#5755)https://github.com/apache/seatunnel/commit/8de7408102.3.4
[Feature][formats][ogg] Support read ogg format message #4201 (#4225)https://github.com/apache/seatunnel/commit/7728e241e2.3.4
[Improve] Remove all useless prepare, getProducedType method (#5741)https://github.com/apache/seatunnel/commit/ed94fffbb2.3.4
[Improve] Add default implement for SeaTunnelSink::setTypeInfo (#5682)https://github.com/apache/seatunnel/commit/86cba87452.3.4
KafkaSource use Factory to create source (#5635)https://github.com/apache/seatunnel/commit/1c6176e512.3.4
[Improve] Refactor CatalogTable and add SeaTunnelSource::getProducedCatalogTables (#5562)https://github.com/apache/seatunnel/commit/41173357f2.3.4
[Improve][CheckStyle] Remove useless 'SuppressWarnings' annotation of checkstyle. (#5260)https://github.com/apache/seatunnel/commit/51c0d709b2.3.4
[Feature][Connector-V2] connector-kafka source support data conversion extracted by kafka connect source (#4516)https://github.com/apache/seatunnel/commit/bd74989092.3.3
[Feature][connector][kafka] Support read debezium format message from kafka (#5066)https://github.com/apache/seatunnel/commit/53a1f0c6c2.3.3
[hotfix][kafka] Fix the problem that the partition information cannot be obtained when kafka is restored (#4764)https://github.com/apache/seatunnel/commit/c203ef5f82.3.2
Fix the processing bug of abnormal parsing method of kafkaSource format. (#4687)https://github.com/apache/seatunnel/commit/228257b2e2.3.2
[hotfix][e2e][kafka] Fix the job not stopping (#4600)https://github.com/apache/seatunnel/commit/93471c9ad2.3.2
[Improve][connector][kafka] Set default value for partition option (#4524)https://github.com/apache/seatunnel/commit/884f733c32.3.2
[chore] delete unavailable S3 & Kafka Catalogs (#4477)https://github.com/apache/seatunnel/commit/e0aec5ece2.3.2
[Feature][API] Add options check before create source and sink and transform in FactoryUtil (#4424)https://github.com/apache/seatunnel/commit/38f1903be2.3.2
[Feature][Connector-V2][Kafka] Kafka source supports data deserialization failure skipping (#4364)https://github.com/apache/seatunnel/commit/e1ed22b152.3.2
[Bug][Connector-v2][KafkaSource]Fix KafkaConsumerThread exit caused by commit offset error. (#4379)https://github.com/apache/seatunnel/commit/71f4d0c782.3.2
[Bug][Connector-v2][KafkaSink]Fix the permission problem caused by client.id. (#4246)https://github.com/apache/seatunnel/commit/3cdb7cfa42.3.2
Fix KafkaProducer resources have never been released. (#4302)https://github.com/apache/seatunnel/commit/f99f02caa2.3.2
[Improve][CDC] Optimize options & add docs for compatible_debezium_json (#4351)https://github.com/apache/seatunnel/commit/336f590492.3.1
[Hotfix][Zeta] Fix TaskExecutionService Deploy Failed The Job Can't Stop (#4265)https://github.com/apache/seatunnel/commit/cf55b070b2.3.1
[Feature][CDC] Support export debezium-json format to kafka (#4339)https://github.com/apache/seatunnel/commit/5817ec07b2.3.1
[Improve]]Connector-V2[Kafka] Set kafka consumer default group (#4271)https://github.com/apache/seatunnel/commit/82c784a3e2.3.1
[chore] Fix the words of canal & kafka (#4261)https://github.com/apache/seatunnel/commit/077a8d27a2.3.1
Merge branch 'dev' into merge/cdchttps://github.com/apache/seatunnel/commit/4324ee1912.3.1
[Improve][Project] Code format with spotless plugin.https://github.com/apache/seatunnel/commit/423b583032.3.1
[Improve][Connector-V2] [StarRocks] Starrocks Support Auto Create Table (#4177)https://github.com/apache/seatunnel/commit/7e0008e6f2.3.1
[improve][api] Refactoring schema parse (#4157)https://github.com/apache/seatunnel/commit/b2f573a132.3.1
[Imprve][Connector-V2][Hive] Support read text table & Column projection (#4105)https://github.com/apache/seatunnel/commit/717620f542.3.1
[Improve][build] Give the maven module a human readable name (#4114)https://github.com/apache/seatunnel/commit/d7cd601052.3.1
Add convertor factory (#4119)https://github.com/apache/seatunnel/commit/cbdea45d92.3.1
Add ElasticSearch catalog (#4108)https://github.com/apache/seatunnel/commit/9ee4d83942.3.1
Add Kafka catalog (#4106)https://github.com/apache/seatunnel/commit/34f1f21e42.3.1
[Improve][Project] Code format with spotless plugin. (#4101)https://github.com/apache/seatunnel/commit/a2ab166562.3.1
[Feature][Json-format][canal] Support read canal format message (#3950)https://github.com/apache/seatunnel/commit/b80be72c82.3.1
[Improve][Connector-V2][Kafka] Support extract topic from SeaTunnelRow field (#3742)https://github.com/apache/seatunnel/commit/8aff807302.3.1
[Feature][shade][Jackson] Add seatunnel-jackson module (#3947)https://github.com/apache/seatunnel/commit/5d8862ec92.3.1
[Hotfix][Connector-V2][Kafka] Fix the bug that kafka consumer is not close. (#3836)https://github.com/apache/seatunnel/commit/3447266422.3.1
fix commit kafka offset bug. (#3933)https://github.com/apache/seatunnel/commit/e60ad938b2.3.1
[Feature][Connector] add get source method to all source connector (#3846)https://github.com/apache/seatunnel/commit/417178fb82.3.1
[Improve][Connector-V2] Change Connector Custom Config Prefix To Map (#3719)https://github.com/apache/seatunnel/commit/ef1b8b1bb2.3.1
[Feature][API &amp; Connector &amp; Doc] add parallelism and column projection interface (#3829)https://github.com/apache/seatunnel/commit/b9164b8ba2.3.1
[Bug][KafkaSource]Fix the default value of commit_on_checkpoint. (#3831)https://github.com/apache/seatunnel/commit/df969849f2.3.1
[Bug][KafkaSource]Failed to parse offset format (#3810)https://github.com/apache/seatunnel/commit/8e1196acc2.3.1
[Improve][Connector-V2] Kafka client user configured clientid is preferred (#3783)https://github.com/apache/seatunnel/commit/aacf0abc02.3.1
[Improve][Connector-V2] Fix Kafka sink can't run EXACTLY_ONCE semantics (#3724)https://github.com/apache/seatunnel/commit/5e3f196e22.3.0
[Improve][Connector-V2] fix kafka admin client can't get property config (#3721)https://github.com/apache/seatunnel/commit/74c3351702.3.0
[Improve][Connector-V2][Kafka] Add text format for kafka sink connector (#3711)https://github.com/apache/seatunnel/commit/74bbd76b62.3.0
[Hotfix][OptionRule] Fix option rule about all connectors (#3592)https://github.com/apache/seatunnel/commit/226dc6a112.3.0
[Improve][Connector-V2][Kafka]Unified exception for Kafka source and sink connector (#3574)https://github.com/apache/seatunnel/commit/3b573798d2.3.0
options in conditional need add to required or optional options (#3501)https://github.com/apache/seatunnel/commit/51d5bcba12.3.0
[Improve][Connector-V2-kafka] Support for dynamic discover topic & partition in streaming mode (#3125)https://github.com/apache/seatunnel/commit/999cfd6062.3.0
[Improve][Connector-V2][Kafka] Support to specify multiple partition keys (#3230)https://github.com/apache/seatunnel/commit/f65f44f442.3.0
[Feature][Connector-V2][Kafka] Add Kafka option rules (#3388)https://github.com/apache/seatunnel/commit/cc0cb8cdb2.3.0
[Improve][Connector-V2][Kafka]Improve kafka metadata code format (#3397)https://github.com/apache/seatunnel/commit/379da30972.3.0
[Improve][Connector-V2-kafka] Support setting read starting offset or time at startup config (#3157)https://github.com/apache/seatunnel/commit/3da19d4442.3.0
update (#3150)https://github.com/apache/seatunnel/commit/2b44992752.3.0-beta
[Feature][connectors-v2][kafka] Kafka supports custom schema #2371 (#2783)https://github.com/apache/seatunnel/commit/6506e306e2.3.0-beta
[feature][connector][kafka] Support extract partition from SeaTunnelRow fields (#3085)https://github.com/apache/seatunnel/commit/385e1f42c2.3.0-beta
[Improve][connector][kafka] sink support custom partition (#3041)https://github.com/apache/seatunnel/commit/ebddc18c42.3.0-beta
[Improve][all] change Log to @Slf4j (#3001)https://github.com/apache/seatunnel/commit/6016100f12.3.0-beta
[Imporve][Connector-V2]Parameter verification for connector V2 kafka sink (#2866)https://github.com/apache/seatunnel/commit/254223fdb2.3.0-beta
[Connector-V2][Kafka] Fix Kafka Streaming problem (#2759)https://github.com/apache/seatunnel/commit/e92e7b7282.2.0-beta
[Improve][Connector-V2] Fix kafka connector (#2745)https://github.com/apache/seatunnel/commit/90ce3851d2.2.0-beta
[DEV][Api] Replace SeaTunnelContext with JobContext and remove singleton pattern (#2706)https://github.com/apache/seatunnel/commit/cbf82f7552.2.0-beta
[#2606]Dependency management split (#2630)https://github.com/apache/seatunnel/commit/fc047be692.2.0-beta
StateT of SeaTunnelSource should extend Serializable (#2214)https://github.com/apache/seatunnel/commit/8c426ef852.2.0-beta
[api-draft][Optimize] Optimize module name (#2062)https://github.com/apache/seatunnel/commit/f79e3112b2.2.0-beta