跳到主要内容
版本:Next

GraphQL

GraphQL sink 连接器

支持的引擎

Spark
Flink
SeaTunnel Zeta

主要特性

描述

接收Source端传入的数据,利用数据触发 web hooks。

例如,来自上游的数据为 [label: {"__name__": "test1"}, value: 1.2.3,time:2024-08-15T17:00:00], 则body内容如下: {"label":{"__name__": "test1"}, "value":"1.23","time":"2024-08-15T17:00:00"}

Tips: GraphQL 数据接收器 仅支持 post json 类型的 web hook,source 数据将被视为 webhook 中的 body 内容。并且不支持传递过去太久的数据

支持的数据源信息

想使用 Prometheus 连接器,需要安装以下必要的依赖。可以通过运行 install-plugin.sh 脚本或者从 Maven 中央仓库下载这些依赖

数据源支持版本依赖
HttpuniversalDownload

接收器选项

NameTypeRequiredDefaultDescription
urlStringYes-Http request url
queryStringYes-GraphQL query
variablesStringNo-GraphQL variables
valueCoverBooleanNo-Whether the data overwrites the variable value
headersMapNo-Http headers
retryIntNo-The max retry times if request http return to IOException
retry_backoff_multiplier_msIntNo100The retry-backoff times(millis) multiplier if request http failed
retry_backoff_max_msIntNo10000The maximum retry-backoff times(millis) if request http failed
connect_timeout_msIntNo12000Connection timeout setting, default 12s.
socket_timeout_msIntNo60000Socket timeout setting, default 60s.
key_timestampIntNO-prometheus timestamp key .
key_labelStringyes-prometheus label key
key_valueDoubleyes-prometheus value
batch_sizeIntfalse1024prometheus batch size write
flush_intervalLongfalse300000Lprometheus flush commit interval
common-optionsNo-Sink plugin common parameters, please refer to Sink Common Options for details

示例

简单示例:

env {
parallelism = 1
job.mode = "BATCH"
}

source {
FakeSource {
tables_configs = [
{
schema = {
table = "graphql_sink_1"
fields {
id = int
val_bool = boolean
val_int8 = tinyint
val_int16 = smallint
val_int32 = int
val_int64 = bigint
val_float = float
val_double = double
val_decimal = "decimal(16, 1)"
val_string = string
val_unixtime_micros = timestamp
}
}
rows = [
{
kind = INSERT
fields = [1, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"]
}
]
},
{
schema = {
table = "graphql_sink_2"
fields {
id = int
val_bool = boolean
val_int8 = tinyint
val_int16 = smallint
val_int32 = int
val_int64 = bigint
val_float = float
val_double = double
val_decimal = "decimal(16, 1)"
val_string = string
val_unixtime_micros = timestamp
}
}
rows = [
{
kind = INSERT
fields = [2, true, 1, 2, 3, 4, 4.3,5.3,6.3, "NEW", "2020-02-02T02:02:02"]
}
]
}
]
}
}

sink {
GraphQL {
url = "http://192.168.1.103:9081/v1/graphql"
query = """
mutation MyMutation(
$id: Int!
$val_bool: Boolean!
$val_int8: smallint!
$val_int16: smallint!
$val_int32: Int!
$val_int64: bigint!
$val_float: Float!
$val_double: Float!
$val_decimal: numeric!
$val_string: String!
$val_unixtime_micros: timestamp!
) {
insert_sink(objects: {
id: $id,
val_bool: $val_bool,
val_int8: $val_int8,
val_int16: $val_int16,
val_int32: $val_int32,
val_int64: $val_int64,
val_float: $val_float,
val_double: $val_double,
val_decimal: $val_decimal,
val_string: $val_string,
val_unixtime_micros: $val_unixtime_micros
}) {
affected_rows
returning {
id
val_bool
val_decimal
val_double
val_float
val_int16
val_int32
val_int64
val_int8
val_string
val_unixtime_micros
}
}
}
"""
variables = {
"val_bool": True
}
}
}

变更日志

Change Log
ChangeCommitVersion
[Feature][Connector-V2] Add GraphQL source and sink (#7265)https://github.com/apache/seatunnel/commit/dde6f9fcbd2.3.9