跳到主要内容
版本:2.3.10

FakeSource

FakeSource 连接器

支持的引擎

Spark
Flink
SeaTunnel Zeta

描述

FakeSource 是一个虚拟数据源,它根据用户定义的 schema 数据结构随机生成指定数量的行数据,主要用于类型转换或连接器新功能测试等测试场景。

主要特性

数据源选项

名称类型必填默认值描述
tables_configslist-定义多个 FakeSource,每个项可以包含完整的 FakeSource 配置描述
schemaconfig-定义 Schema 信息
rowsconfig-每个并行度输出的伪数据行列表,详见标题 Options rows Case
row.numint5每个并行度生成的数据总行数
split.numint1枚举器为每个并行度生成的分片数量
split.read-intervallong1读取器在两个分片读取之间的间隔时间(毫秒)
map.sizeint5连接器生成的 map 类型的大小
array.sizeint5连接器生成的 array 类型的大小
bytes.lengthint5连接器生成的 bytes 类型的长度
string.lengthint5连接器生成的 string 类型的长度
string.fake.modestringrange生成字符串数据的伪数据模式,支持 rangetemplate,默认为 range,如果配置为 template,用户还需配置 string.template 选项
string.templatelist-连接器生成的字符串类型的模板列表,如果用户配置了此选项,连接器将从模板列表中随机选择一个项
tinyint.fake.modestringrange生成 tinyint 数据的伪数据模式,支持 rangetemplate,默认为 range,如果配置为 template,用户还需配置 tinyint.template 选项
tinyint.mintinyint0连接器生成的 tinyint 数据的最小值
tinyint.maxtinyint127连接器生成的 tinyint 数据的最大值
tinyint.templatelist-连接器生成的 tinyint 类型的模板列表,如果用户配置了此选项,连接器将从模板列表中随机选择一个项
smallint.fake.modestringrange生成 smallint 数据的伪数据模式,支持 rangetemplate,默认为 range,如果配置为 template,用户还需配置 smallint.template 选项
smallint.minsmallint0连接器生成的 smallint 数据的最小值
smallint.maxsmallint32767连接器生成的 smallint 数据的最大值
smallint.templatelist-连接器生成的 smallint 类型的模板列表,如果用户配置了此选项,连接器将从模板列表中随机选择一个项
int.fake.templatestringrange生成 int 数据的伪数据模式,支持 rangetemplate,默认为 range,如果配置为 template,用户还需配置 int.template 选项
int.minsmallint0连接器生成的 int 数据的最小值
int.maxsmallint0x7fffffff连接器生成的 int 数据的最大值
int.templatelist-连接器生成的 int 类型的模板列表,如果用户配置了此选项,连接器将从模板列表中随机选择一个项
bigint.fake.modestringrange生成 bigint 数据的伪数据模式,支持 rangetemplate,默认为 range,如果配置为 template,用户还需配置 bigint.template 选项
bigint.minbigint0连接器生成的 bigint 数据的最小值
bigint.maxbigint0x7fffffffffffffff连接器生成的 bigint 数据的最大值
bigint.templatelist-连接器生成的 bigint 类型的模板列表,如果用户配置了此选项,连接器将从模板列表中随机选择一个项
float.fake.modestringrange生成 float 数据的伪数据模式,支持 rangetemplate,默认为 range,如果配置为 template,用户还需配置 float.template 选项
float.minfloat0连接器生成的 float 数据的最小值
float.maxfloat0x1.fffffeP+127连接器生成的 float 数据的最大值
float.templatelist-连接器生成的 float 类型的模板列表,如果用户配置了此选项,连接器将从模板列表中随机选择一个项
double.fake.modestringrange生成 double 数据的伪数据模式,支持 rangetemplate,默认为 range,如果配置为 template,用户还需配置 double.template 选项
double.mindouble0连接器生成的 double 数据的最小值
double.maxdouble0x1.fffffffffffffP+1023连接器生成的 double 数据的最大值
double.templatelist-连接器生成的 double 类型的模板列表,如果用户配置了此选项,连接器将从模板列表中随机选择一个项
vector.dimensionint4生成的向量的维度,不包括二进制向量
binary.vector.dimensionint8生成的二进制向量的维度
vector.float.minfloat0连接器生成的向量中 float 数据的最小值
vector.float.maxfloat0x1.fffffeP+127连接器生成的向量中 float 数据的最大值
common-options-数据源插件通用参数,详情请参考 Source Common Options

任务示例

简单示例:

此示例随机生成指定类型的数据。如果您想了解如何声明字段类型,请点击 这里

schema = {
fields {
c_map = "map<string, array<int>>"
c_map_nest = "map<string, {c_int = int, c_string = string}>"
c_array = "array<int>"
c_string = string
c_boolean = boolean
c_tinyint = tinyint
c_smallint = smallint
c_int = int
c_bigint = bigint
c_float = float
c_double = double
c_decimal = "decimal(30, 8)"
c_null = "null"
c_bytes = bytes
c_date = date
c_timestamp = timestamp
c_row = {
c_map = "map<string, map<string, string>>"
c_array = "array<int>"
c_string = string
c_boolean = boolean
c_tinyint = tinyint
c_smallint = smallint
c_int = int
c_bigint = bigint
c_float = float
c_double = double
c_decimal = "decimal(30, 8)"
c_null = "null"
c_bytes = bytes
c_date = date
c_timestamp = timestamp
}
}
}

随机生成

随机生成 16 条符合类型的数据

source {
# 这是一个示例输入插件,**仅用于测试和演示功能输入插件**
FakeSource {
row.num = 16
schema = {
fields {
c_map = "map<string, string>"
c_array = "array<int>"
c_string = string
c_boolean = boolean
c_tinyint = tinyint
c_smallint = smallint
c_int = int
c_bigint = bigint
c_float = float
c_double = double
c_decimal = "decimal(30, 8)"
c_null = "null"
c_bytes = bytes
c_date = date
c_timestamp = timestamp
}
}
plugin_output = "fake"
}
}

自定义数据内容简单示例:

这是一个自定义数据源信息的示例,定义每条数据是添加还是删除修改操作,并定义每个字段存储的内容

source {
FakeSource {
schema = {
fields {
c_map = "map<string, string>"
c_array = "array<int>"
c_string = string
c_boolean = boolean
c_tinyint = tinyint
c_smallint = smallint
c_int = int
c_bigint = bigint
c_float = float
c_double = double
c_decimal = "decimal(30, 8)"
c_null = "null"
c_bytes = bytes
c_date = date
c_timestamp = timestamp
}
}
rows = [
{
kind = INSERT
fields = [{"a": "b"}, [101], "c_string", true, 117, 15987, 56387395, 7084913402530365000, 1.23, 1.23, "2924137191386439303744.39292216", null, "bWlJWmo=", "2023-04-22", "2023-04-22T23:20:58"]
}
{
kind = UPDATE_BEFORE
fields = [{"a": "c"}, [102], "c_string", true, 117, 15987, 56387395, 7084913402530365000, 1.23, 1.23, "2924137191386439303744.39292216", null, "bWlJWmo=", "2023-04-22", "2023-04-22T23:20:58"]
}
{
kind = UPDATE_AFTER
fields = [{"a": "e"}, [103], "c_string", true, 117, 15987, 56387395, 7084913402530365000, 1.23, 1.23, "2924137191386439303744.39292216", null, "bWlJWmo=", "2023-04-22", "2023-04-22T23:20:58"]
}
{
kind = DELETE
fields = [{"a": "f"}, [104], "c_string", true, 117, 15987, 56387395, 7084913402530365000, 1.23, 1.23, "2924137191386439303744.39292216", null, "bWlJWmo=", "2023-04-22", "2023-04-22T23:20:58"]
}
]
}
}

由于 HOCON 规范的限制,用户无法直接创建字节序列对象。FakeSource 使用字符串来分配 bytes 类型的值。在上面的示例中,bytes 类型字段被分配了 "bWlJWmo=",这是通过 base64 编码的 "miIZj"。因此,在为 bytes 类型字段赋值时,请使用 base64 编码的字符串。

指定数据数量简单示例:

此案例指定生成数据的数量以及生成值的长度

FakeSource {
row.num = 10
map.size = 10
array.size = 10
bytes.length = 10
string.length = 10
schema = {
fields {
c_map = "map<string, array<int>>"
c_array = "array<int>"
c_string = string
c_boolean = boolean
c_tinyint = tinyint
c_smallint = smallint
c_int = int
c_bigint = bigint
c_float = float
c_double = double
c_decimal = "decimal(30, 8)"
c_null = "null"
c_bytes = bytes
c_date = date
c_timestamp = timestamp
c_row = {
c_map = "map<string, map<string, string>>"
c_array = "array<int>"
c_string = string
c_boolean = boolean
c_tinyint = tinyint
c_smallint = smallint
c_int = int
c_bigint = bigint
c_float = float
c_double = double
c_decimal = "decimal(30, 8)"
c_null = "null"
c_bytes = bytes
c_date = date
c_timestamp = timestamp
}
}
}
}

模板数据简单示例:

根据指定模板随机生成

使用模板

FakeSource {
row.num = 5
string.fake.mode = "template"
string.template = ["tyrantlucifer", "hailin", "kris", "fanjia", "zongwen", "gaojun"]
tinyint.fake.mode = "template"
tinyint.template = [1, 2, 3, 4, 5, 6, 7, 8, 9]
smalling.fake.mode = "template"
smallint.template = [10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
int.fake.mode = "template"
int.template = [20, 21, 22, 23, 24, 25, 26, 27, 28, 29]
bigint.fake.mode = "template"
bigint.template = [30, 31, 32, 33, 34, 35, 36, 37, 38, 39]
float.fake.mode = "template"
float.template = [40.0, 41.0, 42.0, 43.0]
double.fake.mode = "template"
double.template = [44.0, 45.0, 46.0, 47.0]
schema {
fields {
c_string = string
c_tinyint = tinyint
c_smallint = smallint
c_int = int
c_bigint = bigint
c_float = float
c_double = double
}
}
}

范围数据简单示例:

在指定的数据生成范围内随机生成

FakeSource {
row.num = 5
string.template = ["tyrantlucifer", "hailin", "kris", "fanjia", "zongwen", "gaojun"]
tinyint.min = 1
tinyint.max = 9
smallint.min = 10
smallint.max = 19
int.min = 20
int.max = 29
bigint.min = 30
bigint.max = 39
float.min = 40.0
float.max = 43.0
double.min = 44.0
double.max = 47.0
schema {
fields {
c_string = string
c_tinyint = tinyint
c_smallint = smallint
c_int = int
c_bigint = bigint
c_float = float
c_double = double
}
}
}

生成多张表

这是一个生成多数据源测试表 test.table1test.table2 的示例

FakeSource {
tables_configs = [
{
row.num = 16
schema {
table = "test.table1"
fields {
c_string = string
c_tinyint = tinyint
c_smallint = smallint
c_int = int
c_bigint = bigint
c_float = float
c_double = double
}
}
},
{
row.num = 17
schema {
table = "test.table2"
fields {
c_string = string
c_tinyint = tinyint
c_smallint = smallint
c_int = int
c_bigint = bigint
c_float = float
c_double = double
}
}
}
]
}

rows 选项示例

rows = [
{
kind = INSERT
fields = [1, "A", 100]
},
{
kind = UPDATE_BEFORE
fields = [1, "A", 100]
},
{
kind = UPDATE_AFTER
fields = [1, "A_1", 100]
},
{
kind = DELETE
fields = [1, "A_1", 100]
}
]

table-names 选项示例

source {
# 这是一个示例源插件,**仅用于测试和演示源插件功能**
FakeSource {
table-names = ["test.table1", "test.table2", "test.table3"]
parallelism = 1
schema = {
fields {
name = "string"
age = "int"
}
}
}
}

defaultValue 选项示例

可以通过 rowcolumns 生成自定义数据。对于时间类型,可以通过 CURRENT_TIMESTAMPCURRENT_TIMECURRENT_DATE 获取当前时间。

    schema = {
fields {
pk_id = bigint
name = string
score = int
time1 = timestamp
time2 = time
time3 = date
}
}
# 使用 rows
rows = [
{
kind = INSERT
fields = [1, "A", 100, CURRENT_TIMESTAMP, CURRENT_TIME, CURRENT_DATE]
}
]
      schema = {
# 使用 columns
columns = [
{
name = book_publication_time
type = timestamp
defaultValue = "2024-09-12 15:45:30"
comment = "书籍出版时间"
},
{
name = book_publication_time2
type = timestamp
defaultValue = CURRENT_TIMESTAMP
comment = "书籍出版时间2"
},
{
name = book_publication_time3
type = time
defaultValue = "15:45:30"
comment = "书籍出版时间3"
},
{
name = book_publication_time4
type = time
defaultValue = CURRENT_TIME
comment = "书籍出版时间4"
},
{
name = book_publication_time5
type = date
defaultValue = "2024-09-12"
comment = "书籍出版时间5"
},
{
name = book_publication_time6
type = date
defaultValue = CURRENT_DATE
comment = "书籍出版时间6"
}
]
}

使用向量示例

source {
FakeSource {
row.num = 10
# 低优先级
vector.dimension= 4
binary.vector.dimension = 8
# 低优先级
schema = {
table = "simple_example"
columns = [
{
name = book_id
type = bigint
nullable = false
defaultValue = 0
comment = "主键 ID"
},
{
name = book_intro_1
type = binary_vector
columnScale =8
comment = "向量"
},
{
name = book_intro_2
type = float16_vector
columnScale =4
comment = "向量"
},
{
name = book_intro_3
type = bfloat16_vector
columnScale =4
comment = "向量"
},
{
name = book_intro_4
type = sparse_float_vector
columnScale =4
comment = "向量"
}
]
}
}
}

变更日志

Change Log
ChangeCommitVersion
[improve] fake source options (#8950)https://github.com/apache/seatunnel/commit/f8c47fb5f2.3.10
[Improve] restruct connector common options (#8634)https://github.com/apache/seatunnel/commit/f3499a6ee2.3.10
[Feature][API] Support timestamp with timezone offset (#8367)https://github.com/apache/seatunnel/commit/e18bfeabd2.3.9
[Improve][dist]add shade check rule (#8136)https://github.com/apache/seatunnel/commit/51ef800012.3.9
[Improve][API] Unified tables_configs and table_list (#8100)https://github.com/apache/seatunnel/commit/84c0b8d662.3.9
[Feature][Core] Rename result_table_name/source_table_name to plugin_input/plugin_output (#8072)https://github.com/apache/seatunnel/commit/c7bbd322d2.3.9
[Improve][Fake] Improve memory usage when split size is large (#7821)https://github.com/apache/seatunnel/commit/2d41b024c2.3.9
[Improve][Connector-V2] Time supports default value (#7639)https://github.com/apache/seatunnel/commit/33978689f2.3.8
[Improve][Connector-V2] Fake supports column configuration (#7503)https://github.com/apache/seatunnel/commit/39162a4e02.3.8
[Feature][Core] Add event notify for all connector (#7501)https://github.com/apache/seatunnel/commit/d71337b0e2.3.8
[Improve][Connector-V2] update vectorType (#7446)https://github.com/apache/seatunnel/commit/1bba723852.3.8
[Feature][Connector-V2] Fake Source support produce vector data (#7401)https://github.com/apache/seatunnel/commit/6937d10ac2.3.8
[Feature][Kafka] Support multi-table source read (#5992)https://github.com/apache/seatunnel/commit/60104602d2.3.6
[Feature][Doris] Add Doris type converter (#6354)https://github.com/apache/seatunnel/commit/5189991842.3.6
[Feature][Core] Support event listener for job (#6419)https://github.com/apache/seatunnel/commit/831d0022e2.3.5
[Fix][FakeSource] fix random from template not include the latest value issue (#6438)https://github.com/apache/seatunnel/commit/6ec16ac462.3.5
[Improve][Catalog] Use default tablepath when can not get the tablepath from source config (#6276)https://github.com/apache/seatunnel/commit/f8158bb802.3.4
[Improve][Connector-V2] Replace CommonErrorCodeDeprecated.JSON_OPERATION_FAILED (#5978)https://github.com/apache/seatunnel/commit/456cd17712.3.4
FakeSource support generate different CatalogTable for MultipleTable (#5766)https://github.com/apache/seatunnel/commit/a8b93805e2.3.4
[Improve][Common] Introduce new error define rule (#5793)https://github.com/apache/seatunnel/commit/9d1b2582b2.3.4
[Improve] Add default implement for SeaTunnelSource::getProducedType (#5670)https://github.com/apache/seatunnel/commit/a04add6992.3.4
Support config tableIdentifier for schema (#5628)https://github.com/apache/seatunnel/commit/652921fb72.3.4
[Feature] Add table-names from FakeSource/Assert to produce/assert multi-table (#5604)https://github.com/apache/seatunnel/commit/2c67cd8f32.3.4
Support config column/primaryKey/constraintKey in schema (#5564)https://github.com/apache/seatunnel/commit/eac76b4e52.3.4
[Improve][CheckStyle] Remove useless 'SuppressWarnings' annotation of checkstyle. (#5260)https://github.com/apache/seatunnel/commit/51c0d709b2.3.4
[improve][zeta] fix zeta bugshttps://github.com/apache/seatunnel/commit/3a82e8b392.3.1
[chore] Code format with spotless plugin.https://github.com/apache/seatunnel/commit/291214ad62.3.1
Merge branch 'dev' into merge/cdchttps://github.com/apache/seatunnel/commit/4324ee1912.3.1
[Improve][Project] Code format with spotless plugin.https://github.com/apache/seatunnel/commit/423b583032.3.1
[improve][api] Refactoring schema parse (#4157)https://github.com/apache/seatunnel/commit/b2f573a132.3.1
[Improve][build] Give the maven module a human readable name (#4114)https://github.com/apache/seatunnel/commit/d7cd601052.3.1
[Improve][Project] Code format with spotless plugin. (#4101)https://github.com/apache/seatunnel/commit/a2ab166562.3.1
[Improve][Connector-fake] Optimizing Data Generation Strategies refer to #4004 (#4061)https://github.com/apache/seatunnel/commit/c7c596a6d2.3.1
[Improve][Connector-V2][Fake] Improve fake connector (#3932)https://github.com/apache/seatunnel/commit/31f12431d2.3.1
[Feature][Connector-v2][StarRocks] Support write cdc changelog event(INSERT/UPDATE/DELETE) (#3865)https://github.com/apache/seatunnel/commit/8e3d158c02.3.1
[Feature][Connector] add get source method to all source connector (#3846)https://github.com/apache/seatunnel/commit/417178fb82.3.1
[Feature][API &amp; Connector &amp; Doc] add parallelism and column projection interface (#3829)https://github.com/apache/seatunnel/commit/b9164b8ba2.3.1
[Hotfix][OptionRule] Fix option rule about all connectors (#3592)https://github.com/apache/seatunnel/commit/226dc6a112.3.0
[Improve][Connector-V2][Fake] Unified exception for fake source connector (#3520)https://github.com/apache/seatunnel/commit/f371ad5822.3.0
[Connector-V2][Fake] Add Fake TableSourceFactory (#3345)https://github.com/apache/seatunnel/commit/74b61c33a2.3.0
[Connector-V2][ElasticSearch] Add ElasticSearch Source/Sink Factory (#3325)https://github.com/apache/seatunnel/commit/38254e3f22.3.0
[Improve][Engine] Improve Engine performance. (#3216)https://github.com/apache/seatunnel/commit/7393c47322.3.0
[hotfix][connector][fake] fix FakeSourceSplitEnumerator assigning duplicate splits when restoring (#3112)https://github.com/apache/seatunnel/commit/98b1feda82.3.0-beta
[improve][connector][fake] supports setting the number of split rows and reading interval (#3098)https://github.com/apache/seatunnel/commit/efabe6af72.3.0-beta
[feature][connector][fake] Support mutil splits for fake source connector (#2974)https://github.com/apache/seatunnel/commit/c28c44b7c2.3.0-beta
[E2E][ST-Engine] Add test data consistency in 3 node cluster and fix bug (#3038)https://github.com/apache/seatunnel/commit/97400a6f12.3.0-beta
[Improve][all] change Log to @Slf4j (#3001)https://github.com/apache/seatunnel/commit/6016100f12.3.0-beta
[Improve][Connector-V2] Improve fake source connector (#2944)https://github.com/apache/seatunnel/commit/044f62ef32.3.0-beta
[Improve][Connector-v2-Fake]Supports direct definition of data values(row) (#2839)https://github.com/apache/seatunnel/commit/b7d9dde6c2.3.0-beta
[Connector-V2][ElasticSearch] Fix ElasticSearch Connector V2 Bug (#2817)https://github.com/apache/seatunnel/commit/2fcbbf4642.2.0-beta
[DEV][Api] Replace SeaTunnelContext with JobContext and remove singleton pattern (#2706)https://github.com/apache/seatunnel/commit/cbf82f7552.2.0-beta
[Bug][connector-fake] Fake date calculation error(#2573)https://github.com/apache/seatunnel/commit/9ea01298f2.2.0-beta
[Bug][ConsoleSinkV2]fix fieldToString StackOverflow and add Unit-Test (#2545)https://github.com/apache/seatunnel/commit/6f87094562.2.0-beta
[chore][connector-common] Rename SeatunnelSchema to SeaTunnelSchema (#2538)https://github.com/apache/seatunnel/commit/7dc2a27382.2.0-beta
[Imporve][Fake-Connector-V2]support user-defined-schmea and random data for fake-table (#2406)https://github.com/apache/seatunnel/commit/a5447528c2.2.0-beta
[api-draft][Optimize] Optimize module name (#2062)https://github.com/apache/seatunnel/commit/f79e3112b2.2.0-beta