Skip to main content
Version: Next

GraphQL

GraphQL sink connector

Support Those Engines

Spark
Flink
SeaTunnel Zeta

Key Features

Description

Used to launch web hooks using data.

For example, if the data from upstream is [label: {"__name__": "test1"}, value: 1.2.3,time:2024-08-15T17:00:00], the body content is the following: {"label":{"__name__": "test1"}, "value":"1.23","time":"2024-08-15T17:00:00"}

Tips: GraphQL sink only support post json webhook and the data from source will be treated as body content in web hook.And does not support passing past data

Supported DataSource Info

In order to use the Http connector, the following dependencies are required. They can be downloaded via install-plugin.sh or from the Maven central repository.

DatasourceSupported VersionsDependency
HttpuniversalDownload

Sink Options

NameTypeRequiredDefaultDescription
urlStringYes-Http request url
queryStringYes-GraphQL query
variablesStringNo-GraphQL variables
valueCoverBooleanNo-Whether the data overwrites the variable value
headersMapNo-Http headers
retryIntNo-The max retry times if request http return to IOException
retry_backoff_multiplier_msIntNo100The retry-backoff times(millis) multiplier if request http failed
retry_backoff_max_msIntNo10000The maximum retry-backoff times(millis) if request http failed
connect_timeout_msIntNo12000Connection timeout setting, default 12s.
socket_timeout_msIntNo60000Socket timeout setting, default 60s.
key_timestampIntNO-prometheus timestamp key .
key_labelStringyes-prometheus label key
key_valueDoubleyes-prometheus value
batch_sizeIntfalse1024prometheus batch size write
flush_intervalLongfalse300000Lprometheus flush commit interval
common-optionsNo-Sink plugin common parameters, please refer to Sink Common Options for details

Example

simple:

env {
parallelism = 1
job.mode = "BATCH"
}

source {
FakeSource {
tables_configs = [
{
schema = {
table = "graphql_sink_1"
fields {
id = int
val_bool = boolean
val_int8 = tinyint
val_int16 = smallint
val_int32 = int
val_int64 = bigint
val_float = float
val_double = double
val_decimal = "decimal(16, 1)"
val_string = string
val_unixtime_micros = timestamp
}
}
rows = [
{
kind = INSERT
fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"]
}
]
},
{
schema = {
table = "graphql_sink_2"
fields {
id = int
val_bool = boolean
val_int8 = tinyint
val_int16 = smallint
val_int32 = int
val_int64 = bigint
val_float = float
val_double = double
val_decimal = "decimal(16, 1)"
val_string = string
val_unixtime_micros = timestamp
}
}
rows = [
{
kind = INSERT
fields = [2, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"]
}
]
}
]
}
}

sink {
GraphQL {
url = "http://192.168.1.103:9081/v1/graphql"
query = """
mutation MyMutation(
$id: Int!
$val_bool: Boolean!
$val_int8: smallint!
$val_int16: smallint!
$val_int32: Int!
$val_int64: bigint!
$val_float: Float!
$val_double: Float!
$val_decimal: numeric!
$val_string: String!
$val_unixtime_micros: timestamp!
) {
insert_sink(objects: {
id: $id,
val_bool: $val_bool,
val_int8: $val_int8,
val_int16: $val_int16,
val_int32: $val_int32,
val_int64: $val_int64,
val_float: $val_float,
val_double: $val_double,
val_decimal: $val_decimal,
val_string: $val_string,
val_unixtime_micros: $val_unixtime_micros
}) {
affected_rows
returning {
id
val_bool
val_decimal
val_double
val_float
val_int16
val_int32
val_int64
val_int8
val_string
val_unixtime_micros
}
}
}
"""
variables = {
"val_bool": True
}
}
}

Changelog

Change Log
ChangeCommitVersion
[Feature][Connector-V2] Add GraphQL source and sink (#7265)https://github.com/apache/seatunnel/commit/dde6f9fcbd2.3.9