Kafka
Kafka sink connector
Support Those Engines
Spark
Flink
Seatunnel Zeta
Key Features
By default, we will use 2pc to guarantee the message is sent to kafka exactly once.
Description
Write Rows to a Kafka topic.
Supported DataSource Info
In order to use the Kafka connector, the following dependencies are required. They can be downloaded via install-plugin.sh or from the Maven central repository.
Datasource | Supported Versions | Maven |
---|---|---|
Kafka | Universal | Download |
Sink Options
Name | Type | Required | Default | Description |
---|---|---|---|---|
topic | String | Yes | - | When the table is used as sink, the topic name is the topic to write data to. |
bootstrap.servers | String | Yes | - | Comma separated list of Kafka brokers. |
kafka.config | Map | No | - | In addition to the above parameters that must be specified by the Kafka producer client, the user can also specify multiple non-mandatory parameters for the producer client, covering all the producer parameters specified in the official Kafka document. |
semantics | String | No | NON | Semantics that can be chosen EXACTLY_ONCE/AT_LEAST_ONCE/NON, default NON. |
partition_key_fields | Array | No | - | Configure which fields are used as the key of the kafka message. |
partition | Int | No | - | We can specify the partition, all messages will be sent to this partition. |
assign_partitions | Array | No | - | We can decide which partition to send based on the content of the message. The function of this parameter is to distribute information. |
transaction_prefix | String | No | - | If semantic is specified as EXACTLY_ONCE, the producer will write all messages in a Kafka transaction,kafka distinguishes different transactions by different transactionId. This parameter is prefix of kafka transactionId, make sure different job use different prefix. |
format | String | No | json | Data format. The default format is json. Optional text format, canal_json, debezium_json, ogg_json , avro and native.If you use json or text format. The default field separator is ", ". If you customize the delimiter, add the "field_delimiter" option.If you use canal format, please refer to canal-json for details.If you use debezium format, please refer to debezium-json for details. |
field_delimiter | String | No | , | Customize the field delimiter for data format. |
common-options | No | - | Source plugin common parameters, please refer to Source Common Options for details | |
protobuf_message_name | String | No | - | Effective when the format is set to protobuf, specifies the Message name |
protobuf_schema | String | No | - | Effective when the format is set to protobuf, specifies the Schema definition |
Parameter Interpretation
Topic Formats
Currently two formats are supported:
Fill in the name of the topic.
Use value of a field from upstream data as topic,the format is
${your field name}
, where topic is the value of one of the columns of the upstream data.For example, Upstream data is the following:
name | age | data |
---|---|---|
Jack | 16 | data-example1 |
Mary | 23 | data-example2 |
If ${name}
is set as the topic. So the first row is sent to Jack topic, and the second row is sent to Mary topic.
Semantics
In EXACTLY_ONCE, producer will write all messages in a Kafka transaction that will be committed to Kafka on a checkpoint. In AT_LEAST_ONCE, producer will wait for all outstanding messages in the Kafka buffers to be acknowledged by the Kafka producer on a checkpoint. NON does not provide any guarantees: messages may be lost in case of issues on the Kafka broker and messages may be duplicated.
Partition Key Fields
For example, if you want to use value of fields from upstream data as key, you can assign field names to this property.
Upstream data is the following:
name | age | data |
---|---|---|
Jack | 16 | data-example1 |
Mary | 23 | data-example2 |
If name is set as the key, then the hash value of the name column will determine which partition the message is sent to. If not set partition key fields, the null message key will be sent to. The format of the message key is json, If name is set as the key, for example '{"name":"Jack"}'. The selected field must be an existing field in the upstream.
Assign Partitions
For example, there are five partitions in total, and the assign_partitions field in config is as follows:
assign_partitions = ["shoe", "clothing"]
Then the message containing "shoe" will be sent to partition zero ,because "shoe" is subscribed as zero in assign_partitions, and the message containing "clothing" will be sent to partition one.For other messages, the hash algorithm will be used to divide them into the remaining partitions.
This function by MessageContentPartitioner
class implements org.apache.kafka.clients.producer.Partitioner
interface.If we need custom partitions, we need to implement this interface as well.
Task Example
Simple:
This example defines a SeaTunnel synchronization task that automatically generates data through FakeSource and sends it to Kafka Sink. FakeSource generates a total of 16 rows of data (row.num=16), with each row having two fields, name (string type) and age (int type). The final target topic is test_topic will also be 16 rows of data in the topic. And if you have not yet installed and deployed SeaTunnel, you need to follow the instructions in Install SeaTunnel to install and deploy SeaTunnel. And then follow the instructions in Quick Start With SeaTunnel Engine to run this job.
# Defining the runtime environment
env {
parallelism = 1
job.mode = "BATCH"
}
source {
FakeSource {
parallelism = 1
plugin_output = "fake"
row.num = 16
schema = {
fields {
name = "string"
age = "int"
}
}
}
}
sink {
kafka {
topic = "test_topic"
bootstrap.servers = "localhost:9092"
format = json
kafka.request.timeout.ms = 60000
semantics = EXACTLY_ONCE
kafka.config = {
acks = "all"
request.timeout.ms = 60000
buffer.memory = 33554432
}
}
}
AWS MSK SASL/SCRAM
Replace the following ${username}
and ${password}
with the configuration values in AWS MSK.
sink {
kafka {
topic = "seatunnel"
bootstrap.servers = "localhost:9092"
format = json
kafka.request.timeout.ms = 60000
semantics = EXACTLY_ONCE
kafka.config = {
security.protocol=SASL_SSL
sasl.mechanism=SCRAM-SHA-512
sasl.jaas.config="org.apache.kafka.common.security.scram.ScramLoginModule required \nusername=${username}\npassword=${password};"
}
}
}
AWS MSK IAM
Download aws-msk-iam-auth-1.1.5.jar
from https://github.com/aws/aws-msk-iam-auth/releases and put it in $SEATUNNEL_HOME/plugin/kafka/lib
dir.
Please ensure the IAM policy have "kafka-cluster:Connect",
. Like this:
"Effect": "Allow",
"Action": [
"kafka-cluster:Connect",
"kafka-cluster:AlterCluster",
"kafka-cluster:DescribeCluster"
],
Sink Config
sink {
kafka {
topic = "seatunnel"
bootstrap.servers = "localhost:9092"
format = json
kafka.request.timeout.ms = 60000
semantics = EXACTLY_ONCE
kafka.config = {
security.protocol=SASL_SSL
sasl.mechanism=AWS_MSK_IAM
sasl.jaas.config="software.amazon.msk.auth.iam.IAMLoginModule required;"
sasl.client.callback.handler.class="software.amazon.msk.auth.iam.IAMClientCallbackHandler"
}
}
}
Kerberos Authentication Example
Sink Config
sink {
Kafka {
topic = "seatunnel"
bootstrap.servers = "127.0.0.1:9092"
format = json
semantics = EXACTLY_ONCE
kafka.config = {
security.protocol=SASL_PLAINTEXT
sasl.kerberos.service.name=kafka
sasl.mechanism=GSSAPI
java.security.krb5.conf="/etc/krb5.conf"
sasl.jaas.config="com.sun.security.auth.module.Krb5LoginModule required \n useKeyTab=true \n storeKey=true \n keyTab=\"/path/to/xxx.keytab\" \n principal=\"user@xxx.com\";"
}
}
}
Protobuf Configuration
Set the format
to protobuf
and configure the protobuf
data structure using the protobuf_message_name
and protobuf_schema
parameters.
Example Usage:
sink {
kafka {
topic = "test_protobuf_topic_fake_source"
bootstrap.servers = "kafkaCluster:9092"
format = protobuf
kafka.request.timeout.ms = 60000
kafka.config = {
acks = "all"
request.timeout.ms = 60000
buffer.memory = 33554432
}
protobuf_message_name = Person
protobuf_schema = """
syntax = "proto3";
package org.apache.seatunnel.format.protobuf;
option java_outer_classname = "ProtobufE2E";
message Person {
int32 c_int32 = 1;
int64 c_int64 = 2;
float c_float = 3;
double c_double = 4;
bool c_bool = 5;
string c_string = 6;
bytes c_bytes = 7;
message Address {
string street = 1;
string city = 2;
string state = 3;
string zip = 4;
}
Address address = 8;
map<string, float> attributes = 9;
repeated string phone_numbers = 10;
}
"""
}
}
format
If you need to write Kafka's native information, you can refer to the following configuration.
Config Example:
sink {
kafka {
topic = "test_topic_native_sink"
bootstrap.servers = "kafkaCluster:9092"
format = "NATIVE"
}
}
The input parameter requirements are as follows:
{
"headers": {
"header1": "header1",
"header2": "header2"
},
"key": "dGVzdF9ieXRlc19kYXRh",
"partition": 3,
"timestamp": 1672531200000,
"timestampType": "CREATE_TIME",
"value": "dGVzdF9ieXRlc19kYXRh"
}
Note:key/value is of type byte[].
Changelog
Change Log
Change | Commit | Version |
---|---|---|
[Feature][Kafka] Support native format read/write kafka record (#8724) | https://github.com/apache/seatunnel/commit/86e2d6fcf | 2.3.10 |
[improve] update kafka source default schema from content<ROW<content STRING>> to content<STRING> (#8642) | https://github.com/apache/seatunnel/commit/db6e2994d | 2.3.10 |
[Improve] restruct connector common options (#8634) | https://github.com/apache/seatunnel/commit/f3499a6ee | 2.3.10 |
[improve] kafka connector options (#8616) | https://github.com/apache/seatunnel/commit/aadfe99f8 | 2.3.10 |
[Fix][Kafka Source] kafka source use topic as table name instead of fullName (#8401) | https://github.com/apache/seatunnel/commit/3d4f4bb33 | 2.3.10 |
[Feature][Kafka] Add debezium_record_table_filter and fix error (#8391) | https://github.com/apache/seatunnel/commit/b27a30a5a | 2.3.9 |
[Bug][Kafka] kafka reads repeatedly (#8465) | https://github.com/apache/seatunnel/commit/f67f27279 | 2.3.9 |
[Hotfix][Connector-V2][kafka] fix kafka sink config exactly-once exception (#7857) | https://github.com/apache/seatunnel/commit/92b3253a5 | 2.3.9 |
[Improve][dist]add shade check rule (#8136) | https://github.com/apache/seatunnel/commit/51ef80001 | 2.3.9 |
[Improve][Kafka] Support custom topic for debezium compatible format (#8145) | https://github.com/apache/seatunnel/commit/deefe8762 | 2.3.9 |
[Improve][API] Unified tables_configs and table_list (#8100) | https://github.com/apache/seatunnel/commit/84c0b8d66 | 2.3.9 |
[Fix][Kafka] Fix in kafka streaming mode can not read incremental data (#7871) | https://github.com/apache/seatunnel/commit/a0eeeb9b6 | 2.3.9 |
[Feature][Core] Support cdc task ddl restore for zeta (#7463) | https://github.com/apache/seatunnel/commit/8e322281e | 2.3.9 |
[Fix][Connector-V2] Fix kafka format_error_handle_way not work (#7838) | https://github.com/apache/seatunnel/commit/63c7b4e9c | 2.3.9 |
[Feature][Restapi] Allow metrics information to be associated to logical plan nodes (#7786) | https://github.com/apache/seatunnel/commit/6b7c53d03 | 2.3.9 |
[Feature][kafka] Add arg poll.timeout for interval poll messages (#7606) | https://github.com/apache/seatunnel/commit/09d12fc40 | 2.3.8 |
[Improve][Kafka] kafka source refactored some reader read logic (#6408) | https://github.com/apache/seatunnel/commit/10598b6ae | 2.3.8 |
[Feature][connector-v2]Add Kafka Protobuf Data Parsing Support (#7361) | https://github.com/apache/seatunnel/commit/51c8e1a83 | 2.3.8 |
[Hotfix][Connector] Fix kafka consumer log next startup offset (#7312) | https://github.com/apache/seatunnel/commit/891652399 | 2.3.7 |
[Fix][Connector kafka]Fix Kafka consumer stop fetching after TM node restarted (#7233) | https://github.com/apache/seatunnel/commit/7dc3fa8a1 | 2.3.6 |
[Fix][Connector-V2] Fix kafka batch mode can not read all message (#7135) | https://github.com/apache/seatunnel/commit/1784c01a3 | 2.3.6 |
[Feature][connector][kafka] Support read Maxwell format message from kafka #4415 (#4428) | https://github.com/apache/seatunnel/commit/4281b867a | 2.3.6 |
[Hotfix][Connector-V2][kafka]Kafka consumer group automatically commits offset logic error fix (#6961) | https://github.com/apache/seatunnel/commit/181f01ee5 | 2.3.6 |
[Improve][CDC] Bump the version of debezium to 1.9.8.Final (#6740) | https://github.com/apache/seatunnel/commit/c3ac95352 | 2.3.6 |
[Feature][Kafka] Support multi-table source read (#5992) | https://github.com/apache/seatunnel/commit/60104602d | 2.3.6 |
[Fix][Kafka-Sink] fix kafka sink factory option rule (#6657) | https://github.com/apache/seatunnel/commit/37578e103 | 2.3.5 |
[Feature][Connector-V2] Remove useless code for kafka connector (#6157) | https://github.com/apache/seatunnel/commit/0f286d162 | 2.3.4 |
[Feature] support avro format (#5084) | https://github.com/apache/seatunnel/commit/93a006156 | 2.3.4 |
[Improve][Common] Introduce new error define rule (#5793) | https://github.com/apache/seatunnel/commit/9d1b2582b | 2.3.4 |
[Improve] Remove use SeaTunnelSink::getConsumedType method and mark it as deprecated (#5755) | https://github.com/apache/seatunnel/commit/8de740810 | 2.3.4 |
[Feature][formats][ogg] Support read ogg format message #4201 (#4225) | https://github.com/apache/seatunnel/commit/7728e241e | 2.3.4 |
[Improve] Remove all useless prepare , getProducedType method (#5741) | https://github.com/apache/seatunnel/commit/ed94fffbb | 2.3.4 |
[Improve] Add default implement for SeaTunnelSink::setTypeInfo (#5682) | https://github.com/apache/seatunnel/commit/86cba8745 | 2.3.4 |
KafkaSource use Factory to create source (#5635) | https://github.com/apache/seatunnel/commit/1c6176e51 | 2.3.4 |
[Improve] Refactor CatalogTable and add SeaTunnelSource::getProducedCatalogTables (#5562) | https://github.com/apache/seatunnel/commit/41173357f | 2.3.4 |
[Improve][CheckStyle] Remove useless 'SuppressWarnings' annotation of checkstyle. (#5260) | https://github.com/apache/seatunnel/commit/51c0d709b | 2.3.4 |
[Feature][Connector-V2] connector-kafka source support data conversion extracted by kafka connect source (#4516) | https://github.com/apache/seatunnel/commit/bd7498909 | 2.3.3 |
[Feature][connector][kafka] Support read debezium format message from kafka (#5066) | https://github.com/apache/seatunnel/commit/53a1f0c6c | 2.3.3 |
[hotfix][kafka] Fix the problem that the partition information cannot be obtained when kafka is restored (#4764) | https://github.com/apache/seatunnel/commit/c203ef5f8 | 2.3.2 |
Fix the processing bug of abnormal parsing method of kafkaSource format. (#4687) | https://github.com/apache/seatunnel/commit/228257b2e | 2.3.2 |
[hotfix][e2e][kafka] Fix the job not stopping (#4600) | https://github.com/apache/seatunnel/commit/93471c9ad | 2.3.2 |
[Improve][connector][kafka] Set default value for partition option (#4524) | https://github.com/apache/seatunnel/commit/884f733c3 | 2.3.2 |
[chore] delete unavailable S3 & Kafka Catalogs (#4477) | https://github.com/apache/seatunnel/commit/e0aec5ece | 2.3.2 |
[Feature][API] Add options check before create source and sink and transform in FactoryUtil (#4424) | https://github.com/apache/seatunnel/commit/38f1903be | 2.3.2 |
[Feature][Connector-V2][Kafka] Kafka source supports data deserialization failure skipping (#4364) | https://github.com/apache/seatunnel/commit/e1ed22b15 | 2.3.2 |
[Bug][Connector-v2][KafkaSource]Fix KafkaConsumerThread exit caused by commit offset error. (#4379) | https://github.com/apache/seatunnel/commit/71f4d0c78 | 2.3.2 |
[Bug][Connector-v2][KafkaSink]Fix the permission problem caused by client.id. (#4246) | https://github.com/apache/seatunnel/commit/3cdb7cfa4 | 2.3.2 |
Fix KafkaProducer resources have never been released. (#4302) | https://github.com/apache/seatunnel/commit/f99f02caa | 2.3.2 |
[Improve][CDC] Optimize options & add docs for compatible_debezium_json (#4351) | https://github.com/apache/seatunnel/commit/336f59049 | 2.3.1 |
[Hotfix][Zeta] Fix TaskExecutionService Deploy Failed The Job Can't Stop (#4265) | https://github.com/apache/seatunnel/commit/cf55b070b | 2.3.1 |
[Feature][CDC] Support export debezium-json format to kafka (#4339) | https://github.com/apache/seatunnel/commit/5817ec07b | 2.3.1 |
[Improve]]Connector-V2[Kafka] Set kafka consumer default group (#4271) | https://github.com/apache/seatunnel/commit/82c784a3e | 2.3.1 |
[chore] Fix the words of canal & kafka (#4261) | https://github.com/apache/seatunnel/commit/077a8d27a | 2.3.1 |
Merge branch 'dev' into merge/cdc | https://github.com/apache/seatunnel/commit/4324ee191 | 2.3.1 |
[Improve][Project] Code format with spotless plugin. | https://github.com/apache/seatunnel/commit/423b58303 | 2.3.1 |
[Improve][Connector-V2] [StarRocks] Starrocks Support Auto Create Table (#4177) | https://github.com/apache/seatunnel/commit/7e0008e6f | 2.3.1 |
[improve][api] Refactoring schema parse (#4157) | https://github.com/apache/seatunnel/commit/b2f573a13 | 2.3.1 |
[Imprve][Connector-V2][Hive] Support read text table & Column projection (#4105) | https://github.com/apache/seatunnel/commit/717620f54 | 2.3.1 |
[Improve][build] Give the maven module a human readable name (#4114) | https://github.com/apache/seatunnel/commit/d7cd60105 | 2.3.1 |
Add convertor factory (#4119) | https://github.com/apache/seatunnel/commit/cbdea45d9 | 2.3.1 |
Add ElasticSearch catalog (#4108) | https://github.com/apache/seatunnel/commit/9ee4d8394 | 2.3.1 |
Add Kafka catalog (#4106) | https://github.com/apache/seatunnel/commit/34f1f21e4 | 2.3.1 |
[Improve][Project] Code format with spotless plugin. (#4101) | https://github.com/apache/seatunnel/commit/a2ab16656 | 2.3.1 |
[Feature][Json-format][canal] Support read canal format message (#3950) | https://github.com/apache/seatunnel/commit/b80be72c8 | 2.3.1 |
[Improve][Connector-V2][Kafka] Support extract topic from SeaTunnelRow field (#3742) | https://github.com/apache/seatunnel/commit/8aff80730 | 2.3.1 |
[Feature][shade][Jackson] Add seatunnel-jackson module (#3947) | https://github.com/apache/seatunnel/commit/5d8862ec9 | 2.3.1 |
[Hotfix][Connector-V2][Kafka] Fix the bug that kafka consumer is not close. (#3836) | https://github.com/apache/seatunnel/commit/344726642 | 2.3.1 |
fix commit kafka offset bug. (#3933) | https://github.com/apache/seatunnel/commit/e60ad938b | 2.3.1 |
[Feature][Connector] add get source method to all source connector (#3846) | https://github.com/apache/seatunnel/commit/417178fb8 | 2.3.1 |
[Improve][Connector-V2] Change Connector Custom Config Prefix To Map (#3719) | https://github.com/apache/seatunnel/commit/ef1b8b1bb | 2.3.1 |
[Feature][API & Connector & Doc] add parallelism and column projection interface (#3829) | https://github.com/apache/seatunnel/commit/b9164b8ba | 2.3.1 |
[Bug][KafkaSource]Fix the default value of commit_on_checkpoint. (#3831) | https://github.com/apache/seatunnel/commit/df969849f | 2.3.1 |
[Bug][KafkaSource]Failed to parse offset format (#3810) | https://github.com/apache/seatunnel/commit/8e1196acc | 2.3.1 |
[Improve][Connector-V2] Kafka client user configured clientid is preferred (#3783) | https://github.com/apache/seatunnel/commit/aacf0abc0 | 2.3.1 |
[Improve][Connector-V2] Fix Kafka sink can't run EXACTLY_ONCE semantics (#3724) | https://github.com/apache/seatunnel/commit/5e3f196e2 | 2.3.0 |
[Improve][Connector-V2] fix kafka admin client can't get property config (#3721) | https://github.com/apache/seatunnel/commit/74c335170 | 2.3.0 |
[Improve][Connector-V2][Kafka] Add text format for kafka sink connector (#3711) | https://github.com/apache/seatunnel/commit/74bbd76b6 | 2.3.0 |
[Hotfix][OptionRule] Fix option rule about all connectors (#3592) | https://github.com/apache/seatunnel/commit/226dc6a11 | 2.3.0 |
[Improve][Connector-V2][Kafka]Unified exception for Kafka source and sink connector (#3574) | https://github.com/apache/seatunnel/commit/3b573798d | 2.3.0 |
options in conditional need add to required or optional options (#3501) | https://github.com/apache/seatunnel/commit/51d5bcba1 | 2.3.0 |
[Improve][Connector-V2-kafka] Support for dynamic discover topic & partition in streaming mode (#3125) | https://github.com/apache/seatunnel/commit/999cfd606 | 2.3.0 |
[Improve][Connector-V2][Kafka] Support to specify multiple partition keys (#3230) | https://github.com/apache/seatunnel/commit/f65f44f44 | 2.3.0 |
[Feature][Connector-V2][Kafka] Add Kafka option rules (#3388) | https://github.com/apache/seatunnel/commit/cc0cb8cdb | 2.3.0 |
[Improve][Connector-V2][Kafka]Improve kafka metadata code format (#3397) | https://github.com/apache/seatunnel/commit/379da3097 | 2.3.0 |
[Improve][Connector-V2-kafka] Support setting read starting offset or time at startup config (#3157) | https://github.com/apache/seatunnel/commit/3da19d444 | 2.3.0 |
update (#3150) | https://github.com/apache/seatunnel/commit/2b4499275 | 2.3.0-beta |
[Feature][connectors-v2][kafka] Kafka supports custom schema #2371 (#2783) | https://github.com/apache/seatunnel/commit/6506e306e | 2.3.0-beta |
[feature][connector][kafka] Support extract partition from SeaTunnelRow fields (#3085) | https://github.com/apache/seatunnel/commit/385e1f42c | 2.3.0-beta |
[Improve][connector][kafka] sink support custom partition (#3041) | https://github.com/apache/seatunnel/commit/ebddc18c4 | 2.3.0-beta |
[Improve][all] change Log to @Slf4j (#3001) | https://github.com/apache/seatunnel/commit/6016100f1 | 2.3.0-beta |
[Imporve][Connector-V2]Parameter verification for connector V2 kafka sink (#2866) | https://github.com/apache/seatunnel/commit/254223fdb | 2.3.0-beta |
[Connector-V2][Kafka] Fix Kafka Streaming problem (#2759) | https://github.com/apache/seatunnel/commit/e92e7b728 | 2.2.0-beta |
[Improve][Connector-V2] Fix kafka connector (#2745) | https://github.com/apache/seatunnel/commit/90ce3851d | 2.2.0-beta |
[DEV][Api] Replace SeaTunnelContext with JobContext and remove singleton pattern (#2706) | https://github.com/apache/seatunnel/commit/cbf82f755 | 2.2.0-beta |
[#2606]Dependency management split (#2630) | https://github.com/apache/seatunnel/commit/fc047be69 | 2.2.0-beta |
StateT of SeaTunnelSource should extend Serializable (#2214) | https://github.com/apache/seatunnel/commit/8c426ef85 | 2.2.0-beta |
[api-draft][Optimize] Optimize module name (#2062) | https://github.com/apache/seatunnel/commit/f79e3112b | 2.2.0-beta |