Skip to main content
Version: 2.2.0-beta

Flink SQL Kafka Connector

Kafka connector based by flink sql

Description​

With kafka connector, we can read data from kafka and write data to kafka using Flink SQL. Refer to the Kafka connector for more details.

Usage​

Let us have a brief example to show how to use the connector from end to end.

1. kafka prepare​

Please refer to the Kafka QuickStart to prepare kafka environment and produce data like following:

$ bin/kafka-console-producer.sh --topic <topic-name> --bootstrap-server localhost:9092

After executing the command, we will come to the interactive mode. Print the following message to send data to kafka.

{"id":1,"name":"abc"}
>{"id":2,"name":"def"}
>{"id":3,"name":"dfs"}
>{"id":4,"name":"eret"}
>{"id":5,"name":"yui"}

2. prepare seatunnel configuration​

Here is a simple example of seatunnel configuration.

SET table.dml-sync = true;

CREATE TABLE events (
id INT,
name STRING
) WITH (
'connector' = 'kafka',
'topic'='<topic-name>',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json'
);

CREATE TABLE print_table (
id INT,
name STRING
) WITH (
'connector' = 'print',
'sink.parallelism' = '1'
);

INSERT INTO print_table SELECT * FROM events;
$ ${FLINK_HOME}/bin/start-cluster.sh

Execute the following command in seatunnel home path to start the Flink SQL job.

$ bin/start-seatunnel-sql.sh -c config/kafka.sql.conf

5. verify result​

After the job submitted, we can see the data printing by connector 'print' in taskmanager's log .

+I[1, abc]
+I[2, def]
+I[3, dfs]
+I[4, eret]
+I[5, yui]