跳到主要内容
版本:Next

Assert

Assert 数据接收器

描述

Assert 数据接收器是一个用于断言数据是否符合用户定义规则的数据接收器。用户可以通过配置规则来断言数据是否符合预期,如果数据不符合规则,将会抛出异常。

核心特性

配置

NameTypeRequiredDefault
rulesConfigMapyes-
rules.field_rulesstringyes-
rules.field_rules.field_namestring|ConfigMapyes-
rules.field_rules.field_typestringno-
rules.field_rules.field_valueConfigListno-
rules.field_rules.field_value.rule_typestringno-
rules.field_rules.field_value.rule_valuenumericno-
rules.field_rules.field_value.equals_toboolean|numeric|string|ConfigList|ConfigMapno-
rules.row_rulesstringyes-
rules.row_rules.rule_typestringno-
rules.row_rules.rule_valuestringno-
rules.catalog_table_ruleConfigMapno-
rules.catalog_table_rule.primary_key_ruleConfigMapno-
rules.catalog_table_rule.primary_key_rule.primary_key_namestringno-
rules.catalog_table_rule.primary_key_rule.primary_key_columnsConfigListno-
rules.catalog_table_rule.constraint_key_ruleConfigListno-
rules.catalog_table_rule.constraint_key_rule.constraint_key_namestringno-
rules.catalog_table_rule.constraint_key_rule.constraint_key_typestringno-
rules.catalog_table_rule.constraint_key_rule.constraint_key_columnsConfigListno-
rules.catalog_table_rule.constraint_key_rule.constraint_key_columns.constraint_key_column_namestringno-
rules.catalog_table_rule.constraint_key_rule.constraint_key_columns.constraint_key_sort_typestringno-
rules.catalog_table_rule.column_ruleConfigListno-
rules.catalog_table_rule.column_rule.namestringno-
rules.catalog_table_rule.column_rule.typestringno-
rules.catalog_table_rule.column_rule.column_lengthintno-
rules.catalog_table_rule.column_rule.nullablebooleanno-
rules.catalog_table_rule.column_rule.default_valuestringno-
rules.catalog_table_rule.column_rule.commentcommentno-
rules.table-namesConfigListno-
rules.tables_configsConfigListno-
rules.tables_configs.table_pathStringno-
common-optionsno-

rules [ConfigMap]

规则定义用户可用数据的规则。每个规则代表一个字段验证或行数量验证。

field_rules [ConfigList]

字段规则用于字段验证

field_name [string]

字段名

field_type [string | ConfigMap]

字段类型。字段类型应符合此指南

field_value [ConfigList]

字段值规则定义数据值验证

rule_type [string]

规则类型。目前支持以下规则

  • NOT_NULL 值不能为空
  • NULL 值可以为空
  • MIN 定义数据的最小值
  • MAX 定义数据的最大值
  • MIN_LENGTH 定义字符串数据的最小长度
  • MAX_LENGTH 定义字符串数据的最大长度
  • MIN_ROW 定义最小行数
  • MAX_ROW 定义最大行数

rule_value [numeric]

与规则类型相关的值。当rule_typeMINMAXMIN_LENGTHMAX_LENGTHMIN_ROWMAX_ROW时,用户需要为rule_value分配一个值。

equals_to [boolean | numeric | string | ConfigList | ConfigMap]

equals_to用于比较字段值是否等于配置的预期值。用户可以将所有类型的值分配给equals_to。这些类型在这里有详细说明。 例如,如果一个字段是一个包含三个字段的行,行类型的声明是{a = array<string>, b = map<string, decimal(30, 2)>, c={c_0 = int, b = string}},用户可以将值[["a", "b"], { k0 = 9999.99, k1 = 111.11 }, [123, "abcd"]]分配给equals_to

定义字段值的方式与FakeSource一致。

equals_to不能应用于null类型字段。但是,用户可以使用规则类型NULL进行验证,例如{rule_type = NULL}

catalog_table_rule [ConfigMap]

catalog_table_rule用于断言Catalog表是否与用户定义的表相同。

table-names [ConfigList]

用于断言表是否在数据中。

tables_configs [ConfigList]

用于断言多个表是否在数据中。

table_path [String]

表的路径。

common options

Sink 插件的通用参数,请参考 Sink Common Options 了解详情

示例

简单

整个Config遵循hocon风格

Assert {
rules =
{
row_rules = [
{
rule_type = MAX_ROW
rule_value = 10
},
{
rule_type = MIN_ROW
rule_value = 5
}
],
field_rules = [{
field_name = name
field_type = string
field_value = [
{
rule_type = NOT_NULL
},
{
rule_type = MIN_LENGTH
rule_value = 5
},
{
rule_type = MAX_LENGTH
rule_value = 10
}
]
}, {
field_name = age
field_type = int
field_value = [
{
rule_type = NOT_NULL
equals_to = 23
},
{
rule_type = MIN
rule_value = 32767
},
{
rule_type = MAX
rule_value = 2147483647
}
]
}
]
catalog_table_rule {
primary_key_rule = {
primary_key_name = "primary key"
primary_key_columns = ["id"]
}
constraint_key_rule = [
{
constraint_key_name = "unique_name"
constraint_key_type = UNIQUE_KEY
constraint_key_columns = [
{
constraint_key_column_name = "id"
constraint_key_sort_type = ASC
}
]
}
]
column_rule = [
{
name = "id"
type = bigint
},
{
name = "name"
type = string
},
{
name = "age"
type = int
}
]
}
}

}

复杂

这里有一个更复杂的例子,涉及到equals_to

source {
FakeSource {
row.num = 1
schema = {
fields {
c_null = "null"
c_string = string
c_boolean = boolean
c_tinyint = tinyint
c_smallint = smallint
c_int = int
c_bigint = bigint
c_float = float
c_double = double
c_decimal = "decimal(30, 8)"
c_date = date
c_timestamp = timestamp
c_time = time
c_bytes = bytes
c_array = "array<int>"
c_map = "map<time, string>"
c_map_nest = "map<string, {c_int = int, c_string = string}>"
c_row = {
c_null = "null"
c_string = string
c_boolean = boolean
c_tinyint = tinyint
c_smallint = smallint
c_int = int
c_bigint = bigint
c_float = float
c_double = double
c_decimal = "decimal(30, 8)"
c_date = date
c_timestamp = timestamp
c_time = time
c_bytes = bytes
c_array = "array<int>"
c_map = "map<string, string>"
}
}
}
rows = [
{
kind = INSERT
fields = [
null, "AAA", false, 1, 1, 333, 323232, 3.1, 9.33333, 99999.99999999, "2012-12-21", "2012-12-21T12:34:56", "12:34:56",
"bWlJWmo=",
[0, 1, 2],
"{ 12:01:26 = v0 }",
{ k1 = [123, "BBB-BB"]},
[
null, "AAA", false, 1, 1, 333, 323232, 3.1, 9.33333, 99999.99999999, "2012-12-21", "2012-12-21T12:34:56", "12:34:56",
"bWlJWmo=",
[0, 1, 2],
{ k0 = v0 }
]
]
}
]
plugin_output = "fake"
}
}

sink{
Assert {
plugin_input = "fake"
rules =
{
row_rules = [
{
rule_type = MAX_ROW
rule_value = 1
},
{
rule_type = MIN_ROW
rule_value = 1
}
],
field_rules = [
{
field_name = c_null
field_type = "null"
field_value = [
{
rule_type = NULL
}
]
},
{
field_name = c_string
field_type = string
field_value = [
{
rule_type = NOT_NULL
equals_to = "AAA"
}
]
},
{
field_name = c_boolean
field_type = boolean
field_value = [
{
rule_type = NOT_NULL
equals_to = false
}
]
},
{
field_name = c_tinyint
field_type = tinyint
field_value = [
{
rule_type = NOT_NULL
equals_to = 1
}
]
},
{
field_name = c_smallint
field_type = smallint
field_value = [
{
rule_type = NOT_NULL
equals_to = 1
}
]
},
{
field_name = c_int
field_type = int
field_value = [
{
rule_type = NOT_NULL
equals_to = 333
}
]
},
{
field_name = c_bigint
field_type = bigint
field_value = [
{
rule_type = NOT_NULL
equals_to = 323232
}
]
},
{
field_name = c_float
field_type = float
field_value = [
{
rule_type = NOT_NULL
equals_to = 3.1
}
]
},
{
field_name = c_double
field_type = double
field_value = [
{
rule_type = NOT_NULL
equals_to = 9.33333
}
]
},
{
field_name = c_decimal
field_type = "decimal(30, 8)"
field_value = [
{
rule_type = NOT_NULL
equals_to = 99999.99999999
}
]
},
{
field_name = c_date
field_type = date
field_value = [
{
rule_type = NOT_NULL
equals_to = "2012-12-21"
}
]
},
{
field_name = c_timestamp
field_type = timestamp
field_value = [
{
rule_type = NOT_NULL
equals_to = "2012-12-21T12:34:56"
}
]
},
{
field_name = c_time
field_type = time
field_value = [
{
rule_type = NOT_NULL
equals_to = "12:34:56"
}
]
},
{
field_name = c_bytes
field_type = bytes
field_value = [
{
rule_type = NOT_NULL
equals_to = "bWlJWmo="
}
]
},
{
field_name = c_array
field_type = "array<int>"
field_value = [
{
rule_type = NOT_NULL
equals_to = [0, 1, 2]
}
]
},
{
field_name = c_map
field_type = "map<time, string>"
field_value = [
{
rule_type = NOT_NULL
equals_to = "{ 12:01:26 = v0 }"
}
]
},
{
field_name = c_map_nest
field_type = "map<string, {c_int = int, c_string = string}>"
field_value = [
{
rule_type = NOT_NULL
equals_to = { k1 = [123, "BBB-BB"] }
}
]
},
{
field_name = c_row
field_type = {
c_null = "null"
c_string = string
c_boolean = boolean
c_tinyint = tinyint
c_smallint = smallint
c_int = int
c_bigint = bigint
c_float = float
c_double = double
c_decimal = "decimal(30, 8)"
c_date = date
c_timestamp = timestamp
c_time = time
c_bytes = bytes
c_array = "array<int>"
c_map = "map<string, string>"
}
field_value = [
{
rule_type = NOT_NULL
equals_to = [
null, "AAA", false, 1, 1, 333, 323232, 3.1, 9.33333, 99999.99999999, "2012-12-21", "2012-12-21T12:34:56", "12:34:56",
"bWlJWmo=",
[0, 1, 2],
{ k0 = v0 }
]
}
]
}
]
}
}
}

验证多表

验证多个表

env {
parallelism = 1
job.mode = BATCH
}

source {
FakeSource {
tables_configs = [
{
row.num = 16
schema {
table = "test.table1"
fields {
c_int = int
c_bigint = bigint
}
}
},
{
row.num = 17
schema {
table = "test.table2"
fields {
c_string = string
c_tinyint = tinyint
}
}
}
]
}
}

transform {
}

sink {
Assert {
rules =
{
tables_configs = [
{
table_path = "test.table1"
row_rules = [
{
rule_type = MAX_ROW
rule_value = 16
},
{
rule_type = MIN_ROW
rule_value = 16
}
],
field_rules = [{
field_name = c_int
field_type = int
field_value = [
{
rule_type = NOT_NULL
}
]
}, {
field_name = c_bigint
field_type = bigint
field_value = [
{
rule_type = NOT_NULL
}
]
}]
},
{
table_path = "test.table2"
row_rules = [
{
rule_type = MAX_ROW
rule_value = 17
},
{
rule_type = MIN_ROW
rule_value = 17
}
],
field_rules = [{
field_name = c_string
field_type = string
field_value = [
{
rule_type = NOT_NULL
}
]
}, {
field_name = c_tinyint
field_type = tinyint
field_value = [
{
rule_type = NOT_NULL
}
]
}]
}
]

}
}
}

变更日志

Change Log
ChangeCommitVersion
[Improve] restruct connector common options (#8634)https://github.com/apache/seatunnel/commit/f3499a6eebdev
[improve] add assert options (#8620)https://github.com/apache/seatunnel/commit/b159cc0c75dev
[Feature][API] Support timestamp with timezone offset (#8367)https://github.com/apache/seatunnel/commit/e18bfeabd22.3.9
[fix][connector-v2][connector-assert] Optimize Assert Sink verification method (#8356)https://github.com/apache/seatunnel/commit/5c9159d7cd2.3.9
[Improve][dist]add shade check rule (#8136)https://github.com/apache/seatunnel/commit/51ef8000162.3.9
[Feature][File] Support config null format for text file read (#8109)https://github.com/apache/seatunnel/commit/2dbf02df472.3.9
[Feature][Transform-V2] Support transform with multi-table (#7628)https://github.com/apache/seatunnel/commit/72c9c4576d2.3.9
[Improve][API] Unified tables_configs and table_list (#8100)https://github.com/apache/seatunnel/commit/84c0b8d6602.3.9
[Fix][API] Fix column length can not be long (#8039)https://github.com/apache/seatunnel/commit/16cf632d3e2.3.9
[Feature][Restapi] Allow metrics information to be associated to logical plan nodes (#7786)https://github.com/apache/seatunnel/commit/6b7c53d03c2.3.9
[Feature][Connector-V2] Assert support multi-table check (#7687)https://github.com/apache/seatunnel/commit/c4778a24972.3.8
[Feature][Transform] Add embedding transform (#7534)https://github.com/apache/seatunnel/commit/3310cfcd342.3.8
[Improve][Connector] Add multi-table sink option check (#7360)https://github.com/apache/seatunnel/commit/2489f6446b2.3.7
[Feature][Core] Support using upstream table placeholders in sink options and auto replacement (#7131)https://github.com/apache/seatunnel/commit/c4ca74122c2.3.6
[Hotfix] fix http source can not read yyyy-MM-dd HH:mm:ss format bug & Improve DateTime Utils (#6601)https://github.com/apache/seatunnel/commit/19888e79692.3.5
[Feature][Connector-V2][Assert] Support field type assert and field value equality assert for full data types (#6275)https://github.com/apache/seatunnel/commit/576919bfab2.3.4
[Feature][Connector-V2][Assert] Support check the precision and scale of Decimal type. (#6110)https://github.com/apache/seatunnel/commit/dd64ed52d42.3.4
[Hotfix][SQL Transform] Fix cast to timestamp, date, time bug (#5812)https://github.com/apache/seatunnel/commit/de181de02a2.3.4
[Improve][Common] Introduce new error define rule (#5793)https://github.com/apache/seatunnel/commit/9d1b2582b22.3.4
[Improve] Remove use SeaTunnelSink::getConsumedType method and mark it as deprecated (#5755)https://github.com/apache/seatunnel/commit/8de74081002.3.4
[Improve] Add default implement for SeaTunnelSink::setTypeInfo (#5682)https://github.com/apache/seatunnel/commit/86cba874502.3.4
[Fix] Fix log error when multi-table sink close (#5683)https://github.com/apache/seatunnel/commit/fea4b6f2682.3.4
Support config tableIdentifier for schema (#5628)https://github.com/apache/seatunnel/commit/652921fb752.3.4
[Feature] Add table-names from FakeSource/Assert to produce/assert multi-table (#5604)https://github.com/apache/seatunnel/commit/2c67cd8f3e2.3.4
[Improve] Remove useless ReadonlyConfig flatten feature (#5612)https://github.com/apache/seatunnel/commit/243edfef3d2.3.4
Support config column/primaryKey/constraintKey in schema (#5564)https://github.com/apache/seatunnel/commit/eac76b4e502.3.4
[Improve][connector-assert]support 'DECIMAL' type and fix 'Number' type precision issue (#5479)https://github.com/apache/seatunnel/commit/d308e277332.3.4
[Improve][CheckStyle] Remove useless 'SuppressWarnings' annotation of checkstyle. (#5260)https://github.com/apache/seatunnel/commit/51c0d709ba2.3.4
[Feature][Transform] Add SimpleSQL transform plugin (#4148)https://github.com/apache/seatunnel/commit/b914d49abf2.3.1
[Improve][build] Give the maven module a human readable name (#4114)https://github.com/apache/seatunnel/commit/d7cd6010512.3.1
[Improve][Project] Code format with spotless plugin. (#4101)https://github.com/apache/seatunnel/commit/a2ab1665612.3.1
[Hotfix][OptionRule] Fix option rule about all connectors (#3592)https://github.com/apache/seatunnel/commit/226dc6a1192.3.0
[Improve][Connector-V2][Assert] Unified exception for assert connector (#3331)https://github.com/apache/seatunnel/commit/e74c9bc6fd2.3.0
[improve][connector] The Factory#factoryIdentifier must be consistent with PluginIdentifierInterface#getPluginName (#3328)https://github.com/apache/seatunnel/commit/d9519d696a2.3.0
[Improve][Connector-V2] Add Clickhouse and Assert Source/Sink Factory (#3306)https://github.com/apache/seatunnel/commit/9e4a1283812.3.0
[Feature][Connector-v2] improve assert sink connector (#2844)https://github.com/apache/seatunnel/commit/967fec0e932.3.0-beta
[DEV][Api] Replace SeaTunnelContext with JobContext and remove singleton pattern (#2706)https://github.com/apache/seatunnel/commit/cbf82f755c2.2.0-beta
[improve][UT] Upgrade junit to 5.+ (#2305)https://github.com/apache/seatunnel/commit/362319ff3e2.2.0-beta
[checkstyle] Improved validation scope of MagicNumber (#2194)https://github.com/apache/seatunnel/commit/6d08b5f3692.2.0-beta
[API-DRAFT][MERGE] update license and pom.xmlhttps://github.com/apache/seatunnel/commit/5ae8865b7c2.2.0-beta
add assert sink to Api draft (#2071)https://github.com/apache/seatunnel/commit/fc640b52bd2.2.0-beta