Plugin development
Introduction to plugin system
The seatunnel plugin is divided into three parts, Input, Filter and Output
Input
Input is responsible for converting the data of the external data source into DStream[(String, String)]
Filter
Filter is a transform operation, responsible for operating on the data structure of Dataset[Row]
Output
Output is the action operation, which is responsible for outputting the Dataset[Row] to an external data source or printing it to terminal
Ready to work
seatunnel supports Java/Scala as a development language for plug-ins, among which Scala is recommended for the Input plug-in, and both Java and Scala are available for other types of plug-ins.
Create a new Java/Scala project, or you can directly pull seatunnel-filter-example, and then make modifications on this project
1. Create a new pom.xml
Reference file pom.xml
Add the interface provided by seatunnel to the project's dependencies
<dependency>
<groupId>io.github.interestinglab.seatunnel</groupId>
<artifactId>seatunnel-apis_2.11</artifactId>
<version>1.1.0</version>
</dependency>
2. Implement your own method
Input (live stream)
Create a new class and inherit the parent class
BaseInput
provided by seatunnel-apisclass ScalaHdfs extends BaseStreamingInput {
var config: Config = ConfigFactory.empty()
/**
* SetConfig.
**/
override def setConfig(config: Config): Unit = {
this.config = config
}
/**
* GetConfig.
**/
override def getConfig(): Config = {
this.config
}Override the
checkConfig
,prepare
andgetDstream
methods defined by the parent classoverride def checkConfig(): (Boolean, String) = {}
override def prepare(spark: SparkSession): Unit = {}
override def getDStream(ssc: StreamingContext): DStream[(String, String)] = {}When the Input plugin is called, it will first execute the
checkConfig
method to check whether the parameters passed in when calling the plugin are correct, then call theprepare
method to configure the default values of the parameters and initialize the member variables of the class, and finally call thegetStream
method converts external data source toDStream[(String, String)]
Scala version Input plugin implementation refer to [ScalaHdfs](https://github.com/InterestingLab/seatunnel-filter-example/blob/master/src/main/scala/org/interestinglab/seatunnel/input/ScalaHdfs .scala)
Filter
- Create a new class and inherit the parent class
BaseFilter
provided by seatunnel-apisclass ScalaSubstring extends BaseFilter {
var config: Config = ConfigFactory.empty()
/**
* SetConfig.
**/
override def setConfig(config: Config): Unit = {
this.config = config
}
/**
* GetConfig.
**/
override def getConfig(): Config = {
this.config
}
}public class JavaSubstring extends BaseFilter {
private Config config;
@Override
public Config getConfig() {
return config;
}
@Override
public void setConfig(Config config) {
this.config = config;
}
} - Override the
checkConfig
,prepare
andprocess
methods defined by the parent classoverride def checkConfig(): (Boolean, String) = {}
override def prepare(spark: SparkSession): Unit = {}
override def process(spark: SparkSession, ds: Dataset[Row]): Dataset[Row] = {}@Override
public Tuple2<Object, String> checkConfig() {}
@Override
public void prepare(SparkSession spark, StreamingContext ssc) {}
@Override
public Dataset<Row> process(SparkSession spark, Dataset<Row> df) {}- When the Filter plugin is called, it will first execute the
checkConfig
method to check whether the parameters passed in when calling the plugin are correct, then call theprepare
method to configure the default values of the parameters and initialize the member variables of the class, and finally call theThe process
method processes data in Dataset[Row] format. - For the implementation of the Filter plugin in the Java version, refer to [JavaSubstring](https://github.com/InterestingLab/seatunnel-filter-example/blob/master/src/main/java/org/interestinglab/seatunnel/filter/ JavaSubstring.java), the implementation of the Scala version Filter plugin refers to [ScalaSubstring](https://github.com/InterestingLab/seatunnel-filter-example/blob/master/src/main/scala/org/interestinglab/ seatunnel/filter/ScalaSubstring.scala)
- When the Filter plugin is called, it will first execute the
Output
- Create a new class and inherit the parent class
BaseOutput
provided by seatunnel-apisclass ScalaStdout extends BaseOutput {
var config: Config = ConfigFactory.empty()
/**
* SetConfig.
**/
override def setConfig(config: Config): Unit = {
this.config = config
}
/**
* GetConfig.
**/
override def getConfig(): Config = {
this.config
}
}public class JavaStdout extends BaseOutput {
private Config config;
@Override
public Config getConfig() {
return config;
}
@Override
public void setConfig(Config config) {
this.config = config;
}
} - Override the
checkConfig
,prepare
andprocess
methods defined by the parent classoverride def checkConfig(): (Boolean, String) = {}
override def prepare(spark: SparkSession): Unit = {}
override def process(spark: SparkSession, ds: Dataset[Row]): Dataset[Row] = {}@Override
public Tuple2<Object, String> checkConfig() {}
@Override
public void prepare(SparkSession spark) {}
@Override
public Dataset<Row> process(SparkSsession spark, Dataset<Row> ds) {} - Output plugin call structure is similar to Filter plugin. When calling, it will first execute the
checkConfig
method to check whether the parameters passed in when calling the plugin are correct, then call theprepare
method to configure the default values of the parameters and initialize the member variables of the class, and finally call theprocess
method to set the Dataset [Row] format data output to an external data source. - For the implementation of the Output plugin in the Java version, refer to [JavaStdout](https://github.com/InterestingLab/seatunnel-filter-example/blob/master/src/main/java/org/interestinglab/seatunnel/output/ JavaStdout.java), the implementation of the Scala version Output plugin refers to [ScalaStdout](https://github.com/InterestingLab/seatunnel-filter-example/blob/master/src/main/scala/org/interestinglab/ seatunnel/output/ScalaStdout.scala)
UDFs
Create a new class and inherit the parent class
BaseFilter
provided by seatunnel-apisclass ScalaSubstring extends BaseFilter {
var config: Config = ConfigFactory.empty()
/**
* SetConfig.
**/
override def setConfig(config: Config): Unit = {
this.config = config
}
/**
* GetConfig.
**/
override def getConfig(): Config = {
this.config
}
}Override the
checkConfig
,prepare
,getUdfList
andprocess
methods defined by the parent class. Only thegetUdfList
andprocess
methods are introduced hereoverride def getUdfList(): List[(String, UserDefinedFunction)] = {
val func = udf((s: String, pos: Int, len: Int) => s. substring(pos, pos+len))
List(("my_sub", func))
}
override def process(spark: SparkSession, ds: Dataset[Row]): Dataset[Row] = {
val srcField = config.getString("source_field")
val targetField = config.getString("target_field")
val pos = config.getInt("pos")
val len = config.getInt("len")
val func = getUdfList().get(0)._2
df.withColumn(targetField, func(col(srcField), lit(pos), lit(len)))
}For a complete case of UDF plug-in development, refer to [ScalaSubstring](https://github.com/InterestingLab/seatunnel-example/blob/rickyhuo.fea.udf/src/main/scala/org/interestinglab/seatunnel/filter/ScalaSubstring.scala #L15)
New META-INF/services
Seatunnel will use the Service loader mechanism to register the method that implements
io.github.interestinglab.seatunnel.apis.BaseFilter
as UDF according to the method returned bygetUdfList
. If the interface implementation class is not specified in services, it will not be used. will be registered as a UDF.[META-INF] in the case (https://github.com/InterestingLab/seatunnel-example/blob/master/src/main/resources/META-INF/services/io.github.interestinglab.seatunnel.apis.BaseFilter )
3. Package and use
Packaging
mvn package
Put the packaged Jar package in the seatunnel
plugins
directorycd seatunnel-1.1.0
mkdir -p plugins/my_plugins/lib
cd plugins/my_plugins/libSeatunnel needs to put the third-party Jar package, and must create a new lib folder
plugins/your_plugin_name/lib/your_jar_name
other files in
plugins/your_plugin_name/files/your_file_name
Use plugins in configuration files
Here is a complete example using a third-party plugin and putting it in
config/application.conf
The test data is generated by the
Fake
plugin, split withSplit
, then the third-party pluginScalaSubstring
is used to intercept the string, and finally the third-party pluginJavaStdout
is used to print to the terminal.spark {
spark.streaming.batchDuration = 5
spark.app.name = "seatunnel-sample"
spark.ui.port = 13000
spark.executor.instances = 2
spark.executor.cores = 1
spark.executor.memory = "1g"
}
input {
fakeStream {
content = ["INFO : gary is 28 years old", "WARN : suwey is 16 years old"]
rate = 5
}
}
filter {
split {
fields = ["log_level", "message"]
delimiter = ":"
}
sql = {
table_name = "tmp"
# use UDF
sql = "select log_level, my_sub(message, 1, 3) from tmp"
}
}
output {
org.interestinglab.seatunnel.output.JavaStdout {
limit = 2
}
}Start seatunnel
./bin/start-seatunnel.sh --config config/application.conf --deploy-mode client --master local[2]
View the results
+---------+--------------------------------+
|log_level|UDF(message, 1, 3)|
+---------+--------------------------------+
|INFO |ary |
|INFO |ary |
+---------+--------------------------------+
only showing top 2 rows