跳到主要内容
版本:1.x

Json

Filter plugin : Json

Description

It takes an existing field which contains a json string and extract its fields.

Options

nametyperequireddefault value
source_fieldstringnoraw_message
target_fieldstringno__root__
source_field [string]

Source field, default is raw_message.

target_field [string]

New field name.

schema_dir [string]

Json schema absolute directory path,default is $seatunnelRoot/plugins/json/files/schemas/

schema_file [string]

Json schema file name, if not set, the system will infer the schema from input source.

Use cases

  1. json schema use case

There might be multiple input json schemas in a single job, e.g. the schema in topicA of kafka can be:

{
"A": "a_val",
"B": "b_val"
}

the schema of topicB can be:

{
"C": "c_val",
"D": "d_val"
}

If we need to combine two schemas and make it output as a wide table, we can specify a schema with content below:

{
"A": "a_val",
"B": "b_val",
"C": "c_val",
"D": "d_val"
}

then the output of topicA and topicB would be:

+-----+-----+-----+-----+
|A |B |C |D |
+-----+-----+-----+-----+
|a_val|b_val|null |null |
|null |null |c_val|d_val|
+-----+-----+-----+-----+

Examples

  1. Without target_field

    json {
    source_field = "message"
    }
    • Input
    +----------------------------+
    |message |
    +----------------------------+
    |{"name": "ricky", "age": 24}|
    |{"name": "gary", "age": 28} |
    +----------------------------+
    • Output
    +----------------------------+---+-----+
    |message |age|name |
    +----------------------------+---+-----+
    |{"name": "gary", "age": 28} |28 |gary |
    |{"name": "ricky", "age": 23}|23 |ricky|
    +----------------------------+---+-----+
  2. With target_field

    json {
    source_field = "message"
    target_field = "info"
    }
    • Input
    +----------------------------+
    |message |
    +----------------------------+
    |{"name": "ricky", "age": 24}|
    |{"name": "gary", "age": 28} |
    +----------------------------+
    • Output
    +----------------------------+----------+
    |message |info |
    +----------------------------+----------+
    |{"name": "gary", "age": 28} |[28,gary] |
    |{"name": "ricky", "age": 23}|[23,ricky]|
    +----------------------------+----------+

  3. With schema_file

    json {
    source_field = "message"
    schema_file = "demo.json"
    }
    • Schema

    Make the content of /opt/seatunnel/plugins/json/files/schemas/demo.json on Driver node as below:

    {
    "name": "demo",
    "age": 24,
    "city": "LA"
    }
    • Input
    +----------------------------+
    |message |
    +----------------------------+
    |{"name": "ricky", "age": 24}|
    |{"name": "gary", "age": 28} |
    +----------------------------+
    • Output
    +----------------------------+---+-----+-----+
    |message |age|name |city |
    +----------------------------+---+-----+-----+
    |{"name": "gary", "age": 28} |28 |gary |null |
    |{"name": "ricky", "age": 23}|23 |ricky|null |
    +----------------------------+---+-----+-----+

    If deploy in cluster mode,make sure json schemas directory is packed in plugins.tar.gz