Skip to main content
Version: 2.3.7

Sls

Sls source connector

Support Those Engines

Spark
Flink
Seatunnel Zeta

Key Features

Description

Source connector for Aliyun Sls.

Supported DataSource Info

In order to use the Sls connector, the following dependencies are required. They can be downloaded via install-plugin.sh or from the Maven central repository.

DatasourceSupported VersionsMaven
SlsUniversalDownload

Source Options

NameTypeRequiredDefaultDescription
projectStringYes-Aliyun Sls Project
logstoreStringYes-Aliyun Sls Logstore
endpointStringYes-Aliyun Access Endpoint
access_key_idStringYes-Aliyun AccessKey ID
access_key_secretStringYes-Aliyun AccessKey Secret
start_modeStartMode[earliest],[group_cursor],[latest]Nogroup_cursorThe initial consumption pattern of consumers.
consumer_groupStringNoSeaTunnel-Consumer-GroupSls consumer group id, used to distinguish different consumer groups.
auto_cursor_resetCursorMode[begin],[end]NoendWhen there is no cursor in the consumer group, cursor initialization occurs
batch_sizeIntNo1000The amount of data pulled from SLS each time
partition-discovery.interval-millisLongNo-1The interval for dynamically discovering topics and partitions.

Task Example

Simple

This example reads the data of sls's logstore1 and prints it to the client.And if you have not yet installed and deployed SeaTunnel, you need to follow the instructions in Install SeaTunnel to install and deploy SeaTunnel. And if you have not yet installed and deployed SeaTunnel, you need to follow the instructions in Install SeaTunnel to install and deploy SeaTunnel. And then follow the instructions in Quick Start With SeaTunnel Engine to run this job.

Create RAM user and authorization,Please ensure thr ram user have sufficient rights to perform, reference RAM Custom Authorization Example

# Defining the runtime environment
env {
parallelism = 2
job.mode = "STREAMING"
checkpoint.interval = 30000
}

source {
Sls {
endpoint = "cn-hangzhou-intranet.log.aliyuncs.com"
project = "project1"
logstore = "logstore1"
access_key_id = "xxxxxxxxxxxxxxxxxxxxxxxx"
access_key_secret = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
schema = {
fields = {
id = "int"
name = "string"
description = "string"
weight = "string"
}
}
}
}

sink {
Console {
}
}