Skip to main content
Version: 2.3.0

Kafka

Kafka sink connector

Description

Write Rows to a Kafka topic.

Key features

By default, we will use 2pc to guarantee the message is sent to kafka exactly once.

Options

nametyperequireddefault value
topicstringyes-
bootstrap.serversstringyes-
kafka.*kafka producer configno-
semanticstringnoNON
partition_key_fieldsarrayno-
partitionintno-
assign_partitionsarrayno-
transaction_prefixstringno-
formatStringnojson
field_delimiterStringno,
common-optionsconfigno-

topic [string]

Kafka Topic.

bootstrap.servers [string]

Kafka Brokers List.

kafka.* [kafka producer config]

In addition to the above parameters that must be specified by the Kafka producer client, the user can also specify multiple non-mandatory parameters for the producer client, covering all the producer parameters specified in the official Kafka document.

The way to specify the parameter is to add the prefix kafka. to the original parameter name. For example, the way to specify request.timeout.ms is: kafka.request.timeout.ms = 60000 . If these non-essential parameters are not specified, they will use the default values given in the official Kafka documentation.

semantic [string]

Semantics that can be chosen EXACTLY_ONCE/AT_LEAST_ONCE/NON, default NON.

In EXACTLY_ONCE, producer will write all messages in a Kafka transaction that will be committed to Kafka on a checkpoint.

In AT_LEAST_ONCE, producer will wait for all outstanding messages in the Kafka buffers to be acknowledged by the Kafka producer on a checkpoint.

NON does not provide any guarantees: messages may be lost in case of issues on the Kafka broker and messages may be duplicated.

partition_key_fields [array]

Configure which fields are used as the key of the kafka message.

For example, if you want to use value of fields from upstream data as key, you can assign field names to this property.

Upstream data is the following:

nameagedata
Jack16data-example1
Mary23data-example2

If name is set as the key, then the hash value of the name column will determine which partition the message is sent to.

If not set partition key fields, the null message key will be sent to.

partition [int]

We can specify the partition, all messages will be sent to this partition.

assign_partitions [array]

We can decide which partition to send based on the content of the message. The function of this parameter is to distribute information.

For example, there are five partitions in total, and the assign_partitions field in config is as follows: assign_partitions = ["shoe", "clothing"]

Then the message containing "shoe" will be sent to partition zero ,because "shoe" is subscripted as zero in assign_partitions, and the message containing "clothing" will be sent to partition one.For other messages, the hash algorithm will be used to divide them into the remaining partitions.

This function by MessageContentPartitioner class implements org.apache.kafka.clients.producer.Partitioner interface.If we need custom partitions, we need to implement this interface as well.

transaction_prefix [string]

If semantic is specified as EXACTLY_ONCE, the producer will write all messages in a Kafka transaction. Kafka distinguishes different transactions by different transactionId. This parameter is prefix of kafka transactionId, make sure different job use different prefix.

format

Data format. The default format is json. Optional text format. The default field separator is ",". If you customize the delimiter, add the "field_delimiter" option.

field_delimiter

Customize the field delimiter for data format.

common options [config]

Sink plugin common parameters, please refer to Sink Common Options for details.

Examples

sink {

kafka {
topic = "seatunnel"
bootstrap.servers = "localhost:9092"
partition = 3
format = json
kafka.request.timeout.ms = 60000
semantics = EXACTLY_ONCE
}

}

AWS MSK SASL/SCRAM

Replace the following ${username} and ${password} with the configuration values in AWS MSK.

sink {
kafka {
topic = "seatunnel"
bootstrap.servers = "localhost:9092"
partition = 3
format = json
kafka.request.timeout.ms = 60000
semantics = EXACTLY_ONCE
kafka.security.protocol=SASL_SSL
kafka.sasl.mechanism=SCRAM-SHA-512
kafka.sasl.jaas.config="org.apache.kafka.common.security.scram.ScramLoginModule required \nusername=${username}\npassword=${password};"
}

}

AWS MSK IAM

Download aws-msk-iam-auth-1.1.5.jar from https://github.com/aws/aws-msk-iam-auth/releases and put it in $SEATUNNEL_HOME/plugin/kafka/lib dir.

Please ensure the IAM policy have "kafka-cluster:Connect",. Like this:

"Effect": "Allow",
"Action": [
"kafka-cluster:Connect",
"kafka-cluster:AlterCluster",
"kafka-cluster:DescribeCluster"
],

Sink Config

sink {
kafka {
topic = "seatunnel"
bootstrap.servers = "localhost:9092"
partition = 3
format = json
kafka.request.timeout.ms = 60000
semantics = EXACTLY_ONCE
kafka.security.protocol=SASL_SSL
kafka.sasl.mechanism=AWS_MSK_IAM
kafka.sasl.jaas.config="software.amazon.msk.auth.iam.IAMLoginModule required;"
kafka.sasl.client.callback.handler.class="software.amazon.msk.auth.iam.IAMClientCallbackHandler"
}

}

Changelog

2.3.0-beta 2022-10-20

  • Add Kafka Sink Connector

2.3.0 2022-12-30

  • [Improve] Support to specify multiple partition keys 3230
  • [Improve] Add text format for kafka sink connector 3711