Flink SQL Kafka Connector
Descriptionâ
With kafka connector, we can read data from kafka and write data to kafka using Flink SQL. Refer to the Kafka connector for more details.
Usageâ
Let us have a brief example to show how to use the connector from end to end.
1. kafka prepareâ
Please refer to the Kafka QuickStart to prepare kafka environment and produce data like following:
$ bin/kafka-console-producer.sh --topic <topic-name> --bootstrap-server localhost:9092
After executing the command, we will come to the interactive mode. Print the following message to send data to kafka.
{"id":1,"name":"abc"}
>{"id":2,"name":"def"}
>{"id":3,"name":"dfs"}
>{"id":4,"name":"eret"}
>{"id":5,"name":"yui"}
2. prepare seatunnel configurationâ
Here is a simple example of seatunnel configuration.
SET table.dml-sync = true;
CREATE TABLE events (
id INT,
name STRING
) WITH (
'connector' = 'kafka',
'topic'='<topic-name>',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json'
);
CREATE TABLE print_table (
id INT,
name STRING
) WITH (
'connector' = 'print',
'sink.parallelism' = '1'
);
INSERT INTO print_table SELECT * FROM events;
3. start flink local clusterâ
$ ${FLINK_HOME}/bin/start-cluster.sh
4. start Flink SQL jobâ
Execute the following command in seatunnel home path to start the Flink SQL job.
$ bin/start-seatunnel-sql.sh -c config/kafka.sql.conf
5. verify resultâ
After the job submitted, we can see the data printing by connector 'print' in taskmanager's log .
+I[1, abc]
+I[2, def]
+I[3, dfs]
+I[4, eret]
+I[5, yui]