跳到主要内容
版本:2.3.10

LocalFile

本地文件接收器

描述

将数据输出到本地文件。

提示

如果你使用的是 spark/flink,为了使用此连接器,你必须确保你的 spark/flink 集群已集成 hadoop。已测试的 hadoop 版本是 2.x。

如果你使用 SeaTunnel Engine,它会在下载和安装 SeaTunnel Engine 时自动集成 hadoop jar。你可以在 ${SEATUNNEL_HOME}/lib 下检查 jar 包以确认这一点。

主要特性

默认情况下,我们使用 2PC 提交以确保精确一次

  • 文件格式类型
    • 文本
    • csv
    • parquet
    • orc
    • json
    • excel
    • xml
    • 二进制

选项

名称类型是否必需默认值描述
pathstring-目标目录路径
tmp_pathstring/tmp/seatunnel结果文件将首先写入临时路径,然后使用 mv 将临时目录提交到目标目录。
custom_filenamebooleanfalse是否需要自定义文件名
file_name_expressionstring"${transactionId}"仅在 custom_filename 为 true 时使用
filename_time_formatstring"yyyy.MM.dd"仅在 custom_filename 为 true 时使用
file_format_typestring"csv"文件格式类型
filename_extensionstring-使用自定义的文件扩展名覆盖默认的文件扩展名。 例如:.xml, .json, dat, .customtype
field_delimiterstring'\001'仅在 file_format_type 为 text 时使用
row_delimiterstring"\n"仅在 file_format_type 为 text 时使用
have_partitionbooleanfalse是否需要处理分区
partition_byarray-仅在 have_partition 为 true 时使用
partition_dir_expressionstring"${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/"仅在 have_partition 为 true 时使用
is_partition_field_write_in_filebooleanfalse仅在 have_partition 为 true 时使用
sink_columnsarray当此参数为空时,所有字段都是 sink 列
is_enable_transactionbooleantrue是否启用事务
batch_sizeint1000000批量大小
single_file_modebooleanfalse每个并行度只会输出一个文件,当此参数开启时,batch_size就不会生效。输出的文件名没有文件块后缀。
create_empty_file_when_no_databooleanfalse当上游没有数据同步时,依然生成对应的数据文件。
compress_codecstringnone压缩编码
common-optionsobject-常见选项
max_rows_in_memoryint-仅在 file_format_type 为 excel 时使用
sheet_namestringSheet${随机数}仅在 file_format_type 为 excel 时使用
csv_string_quote_modeenumMINIMAL仅在文件格式为 CSV 时使用。
xml_root_tagstringRECORDS仅在 file_format 为 xml 时使用
xml_row_tagstringRECORD仅在 file_format 为 xml 时使用
xml_use_attr_formatboolean-仅在 file_format 为 xml 时使用
parquet_avro_write_timestamp_as_int96booleanfalse仅在 file_format 为 parquet 时使用
parquet_avro_write_fixed_as_int96array-仅在 file_format 为 parquet 时使用
enable_header_writebooleanfalse仅在 file_format_type 为 text,csv 时使用。
false:不写入表头,true:写入表头。
encodingstring"UTF-8"仅在 file_format_type 为 json,text,csv,xml 时使用

path [string]

目标目录路径是必需的,你可以通过使用 ${database_name}${table_name}${schema_name} 将上游的 CatalogTable 注入到路径中。

custom_filename [boolean]

是否自定义文件名

file_name_expression [string]

仅在 custom_filenametrue 时使用

file_name_expression 描述将创建到 path 中的文件表达式。我们可以在 file_name_expression 中添加变量 ${now}${uuid},例如 test_${uuid}_${now}${now} 表示当前时间,其格式可以通过指定 filename_time_format 选项来定义。

请注意,如果 is_enable_transactiontrue,我们将自动在文件名的头部添加 ${transactionId}_

filename_time_format [string]

仅在 custom_filenametrue 时使用

file_name_expression 参数中的格式为 xxxx-${now} 时,filename_time_format 可以指定路径的时间格式,默认值为 yyyy.MM.dd。常用的时间格式如下所示:

符号描述
y
M
d
H小时 (0-23)
m分钟
s

file_format_type [string]

我们支持以下文件类型:

text csv parquet orc json excel xml binary

请注意,最终的文件名将以 file_format_type 的后缀结尾,文本文件的后缀是 txt

field_delimiter [string]

数据行中列之间的分隔符。仅在 text 文件格式下需要。

row_delimiter [string]

文件中行之间的分隔符。仅在 text 文件格式下需要。

have_partition [boolean]

是否需要处理分区。

partition_by [array]

仅在 have_partitiontrue 时使用。

基于选定字段进行数据分区。

partition_dir_expression [string]

仅在 have_partitiontrue 时使用。

如果指定了 partition_by,我们将基于分区信息生成相应的分区目录,最终文件将放置在分区目录中。

默认的 partition_dir_expression${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/k0 是第一个分区字段,v0 是第一个分区字段的值。

is_partition_field_write_in_file [boolean]

仅在 have_partitiontrue 时使用。

如果 is_partition_field_write_in_filetrue,分区字段及其值将写入数据文件。

例如,如果你想写入一个 Hive 数据文件,其值应该为 false

sink_columns [array]

需要写入文件的列,默认值为从 TransformSource 获取的所有列。字段的顺序决定了实际写入文件的顺序。

is_enable_transaction [boolean]

如果 is_enable_transaction 为 true,我们将确保数据在写入目标目录时不会丢失或重复。

请注意,如果 is_enable_transaction 为 true,我们将自动在文件名前添加 ${transactionId}_

目前仅支持 true

batch_size [int]

文件中的最大行数。对于 SeaTunnel Engine,文件中的行数由 batch_sizecheckpoint.interval 共同决定。如果 checkpoint.interval 的值足够大,sink writer 将在文件中的行数超过 batch_size 时写入文件。如果 checkpoint.interval 很小,当触发新检查点时,sink writer 将创建一个新文件。

compress_codec [string]

文件的压缩编码,支持的压缩编码如下所示:

  • txt: lzo none
  • json: lzo none
  • csv: lzo none
  • orc: lzo snappy lz4 zlib none
  • parquet: lzo snappy lz4 gzip brotli zstd none

提示:excel 类型不支持任何压缩格式

常见选项

Sink 插件的常见参数,请参阅 Sink 常见选项 获取详细信息。

max_rows_in_memory [int]

当文件格式为 Excel 时,内存中可以缓存的数据项最大数量。

sheet_name [string]

工作簿的表名。

csv_string_quote_mode [string]

当文件格式为 CSV 时,CSV 的字符串引号模式。

  • ALL:所有字符串字段都会加引号。
  • MINIMAL:仅为包含特殊字符(如字段分隔符、引号字符或行分隔符字符串中的任何字符)的字段加引号。
  • NONE:从不为字段加引号。当数据中包含分隔符时,输出会在前面加上转义字符。如果未设置转义字符,则格式验证会抛出异常。

xml

_root_tag [string]

指定 XML 文件中根元素的标签名。

xml_row_tag [string]

指定 XML 文件中数据行的标签名。

xml_use_attr_format [boolean]

指定是否使用标签属性格式处理数据。

parquet_avro_write_timestamp_as_int96 [boolean]

支持从时间戳写入 Parquet INT96,仅对 parquet 文件有效。

parquet_avro_write_fixed_as_int96 [array]

支持从 12 字节字段写入 Parquet INT96,仅对 parquet 文件有效。

enable_header_write [boolean]

仅在 file_format_type 为 text,csv 时使用。false:不写入表头,true:写入表头。

encoding [string]

仅在 file_format_type 为 json,text,csv,xml 时使用。文件写入的编码。该参数将通过 Charset.forName(encoding) 解析。

示例

对于 orc 文件格式的简单配置


LocalFile {
path = "/tmp/hive/warehouse/test2"
file_format_type = "orc"
}

对于带有 encoding 的 json、text、csv 或 xml 文件格式


LocalFile {
path = "/tmp/hive/warehouse/test2"
file_format_type = "text"
encoding = "gbk"
}

对于带有 sink_columns 的 parquet 文件格式


LocalFile {
path = "/tmp/hive/warehouse/test2"
file_format_type = "parquet"
sink_columns = ["name","age"]
}

对于带有 have_partitioncustom_filenamesink_columns 的 text 文件格式


LocalFile {
path = "/tmp/hive/warehouse/test2"
file_format_type = "text"
field_delimiter = "\t"
row_delimiter = "\n"
have_partition = true
partition_by = ["age"]
partition_dir_expression = "${k0}=${v0}"
is_partition_field_write_in_file = true
custom_filename = true
file_name_expression = "${transactionId}_${now}"
filename_time_format = "yyyy.MM.dd"
sink_columns = ["name","age"]
is_enable_transaction = true
}

对于带有 sheet_namemax_rows_in_memory 的 excel 文件格式


LocalFile {
path="/tmp/seatunnel/excel"
sheet_name = "Sheet1"
max_rows_in_memory = 1024
partition_dir_expression="${k0}=${v0}"
is_partition_field_write_in_file=true
file_name_expression="${transactionId}_${now}"
file_format_type="excel"
filename_time_format="yyyy.MM.dd"
is_enable_transaction=true
}

对于从上游提取源元数据,可以在路径中使用 ${database_name}${table_name}${schema_name}


LocalFile {
path = "/tmp/hive/warehouse/${table_name}"
file_format_type = "parquet"
sink_columns = ["name","age"]
}

变更日志

Change Log
ChangeCommitVersion
Revert " [improve] update localfile connector config" (#9018)https://github.com/apache/seatunnel/commit/cdc79e13a2.3.10
[improve] update localfile connector config (#8765)https://github.com/apache/seatunnel/commit/def369a852.3.10
[Feature][Connector-V2] Add filename_extension parameter for read/write file (#8769)https://github.com/apache/seatunnel/commit/78b23c0ef2.3.10
[Improve] restruct connector common options (#8634)https://github.com/apache/seatunnel/commit/f3499a6ee2.3.10
[Feature][Connector-V2] Support create emtpy file when no data (#8543)https://github.com/apache/seatunnel/commit/275db78912.3.10
[Feature][Connector-V2] Support single file mode in file sink (#8518)https://github.com/apache/seatunnel/commit/e893deed52.3.10
[Feature][File] Support config null format for text file read (#8109)https://github.com/apache/seatunnel/commit/2dbf02df42.3.9
[Improve][API] Unified tables_configs and table_list (#8100)https://github.com/apache/seatunnel/commit/84c0b8d662.3.9
[Feature][Restapi] Allow metrics information to be associated to logical plan nodes (#7786)https://github.com/apache/seatunnel/commit/6b7c53d032.3.9
[Improve][Connector-V2] Support read archive compress file (#7633)https://github.com/apache/seatunnel/commit/3f98cd8a12.3.8
[Improve][Connector] Add multi-table sink option check (#7360)https://github.com/apache/seatunnel/commit/2489f64462.3.7
[Feature][Core] Support using upstream table placeholders in sink options and auto replacement (#7131)https://github.com/apache/seatunnel/commit/c4ca741222.3.6
[feature][connector-file-local] add save mode function for localfile (#7080)https://github.com/apache/seatunnel/commit/7b2f538312.3.6
[Improve][Files] Support write fixed/timestamp as int96 of parquet (#6971)https://github.com/apache/seatunnel/commit/1a48a9c492.3.6
[Chore] Fix file spell errors (#6606)https://github.com/apache/seatunnel/commit/2599d3b732.3.5
[Feature][Connectors-V2][File]support assign encoding for file source/sink (#6489)https://github.com/apache/seatunnel/commit/d159fbe082.3.5
Add support for XML file type to various file connectors such as SFTP, FTP, LocalFile, HdfsFile, and more. (#6327)https://github.com/apache/seatunnel/commit/ec533ecd92.3.5
[Feature][OssFile Connector] Make Oss implement source factory and sink factory (#6062)https://github.com/apache/seatunnel/commit/1a8e9b4552.3.4
Add multiple table file sink to base (#6049)https://github.com/apache/seatunnel/commit/085e0e5fc2.3.4
[Refactor][File Connector] Put Multiple Table File API to File Base Module (#6033)https://github.com/apache/seatunnel/commit/c324d663b2.3.4
Support using multiple hadoop account (#5903)https://github.com/apache/seatunnel/commit/d69d88d1a2.3.4
[Feature] LocalFile sink support multiple table (#5931)https://github.com/apache/seatunnel/commit/0fdf45f942.3.4
[Feature] LocalFileSource support multiple tablehttps://github.com/apache/seatunnel/commit/72be6663a2.3.4
[Improve][Common] Introduce new error define rule (#5793)https://github.com/apache/seatunnel/commit/9d1b2582b2.3.4
[Improve][connector-file] unifiy option between file source/sink and update document (#5680)https://github.com/apache/seatunnel/commit/8d87cf8fc2.3.4
[Feature][Connector-V2][File] Support read empty directory (#5591)https://github.com/apache/seatunnel/commit/1f58f224a2.3.4
Support config column/primaryKey/constraintKey in schema (#5564)https://github.com/apache/seatunnel/commit/eac76b4e52.3.4
[Feature][File Connector]optionrule FILE_FORMAT_TYPE is text/csv ,add parameter BaseSinkConfig.ENABLE_HEADER_WRITE: #5566 (#5567)https://github.com/apache/seatunnel/commit/0e02db7682.3.4
[Feature][Connector V2][File] Add config of 'file_filter_pattern', which used for filtering files. (#5153)https://github.com/apache/seatunnel/commit/a3c13e59e2.3.3
[Feature][ConnectorV2]add file excel sink and source (#4164)https://github.com/apache/seatunnel/commit/e3b97ae5d2.3.2
Change file type to file_format_type in file source/sink (#4249)https://github.com/apache/seatunnel/commit/973a2fae32.3.1
Merge branch 'dev' into merge/cdchttps://github.com/apache/seatunnel/commit/4324ee1912.3.1
[Improve][Project] Code format with spotless plugin.https://github.com/apache/seatunnel/commit/423b583032.3.1
[improve][api] Refactoring schema parse (#4157)https://github.com/apache/seatunnel/commit/b2f573a132.3.1
[Improve][build] Give the maven module a human readable name (#4114)https://github.com/apache/seatunnel/commit/d7cd601052.3.1
[Improve][Project] Code format with spotless plugin. (#4101)https://github.com/apache/seatunnel/commit/a2ab166562.3.1
[Feature][Connector-V2][File] Support compress (#3899)https://github.com/apache/seatunnel/commit/55602f6b12.3.1
[Feature][Connector] add get source method to all source connector (#3846)https://github.com/apache/seatunnel/commit/417178fb82.3.1
[Improve][Connector-V2][File] Improve file connector option rule and document (#3812)https://github.com/apache/seatunnel/commit/bd76077662.3.1
[Feature][Shade] Add seatunnel hadoop3 uber (#3755)https://github.com/apache/seatunnel/commit/5a024bdf82.3.0
[Hotfix][OptionRule] Fix option rule about all connectors (#3592)https://github.com/apache/seatunnel/commit/226dc6a112.3.0
[Improve][Connector-V2][File] Unified excetion for file source & sink connectors (#3525)https://github.com/apache/seatunnel/commit/031e8e2632.3.0
[Feature][Connector-V2][File] Add option and factory for file connectors (#3375)https://github.com/apache/seatunnel/commit/db286e8632.3.0
[Improve][Connector-V2][File] Improve code structure (#3238)https://github.com/apache/seatunnel/commit/dd5c353882.3.0
[Connector-V2][ElasticSearch] Add ElasticSearch Source/Sink Factory (#3325)https://github.com/apache/seatunnel/commit/38254e3f22.3.0
[Improve][Connector-V2][File] Support parse field from file path (#2985)https://github.com/apache/seatunnel/commit/0bc12085c2.3.0-beta
[Improve][connector][file] Support user-defined schema for reading text file (#2976)https://github.com/apache/seatunnel/commit/1c05ee0d72.3.0-beta
[Improve][Connector] Improve write parquet (#2943)https://github.com/apache/seatunnel/commit/8fd9663942.3.0-beta
[Fix][Connector-V2] Fix HiveSource Connector read orc table error (#2845)https://github.com/apache/seatunnel/commit/61720306e2.2.0-beta
[Improve][Connector-V2] Improve read parquet (#2841)https://github.com/apache/seatunnel/commit/e19bc82f92.2.0-beta
[Bug][Connector-V2] Fix error option (#2775)https://github.com/apache/seatunnel/commit/488e561ee2.2.0-beta
[Improve][Connector-V2] Refactor local file sink connector code structure (#2655)https://github.com/apache/seatunnel/commit/6befd599a2.2.0-beta
[#2606]Dependency management split (#2630)https://github.com/apache/seatunnel/commit/fc047be692.2.0-beta
[chore][connector-common] Rename SeatunnelSchema to SeaTunnelSchema (#2538)https://github.com/apache/seatunnel/commit/7dc2a27382.2.0-beta
[Feature][Connector-V2] Local file json support (#2465)https://github.com/apache/seatunnel/commit/65a92f2492.2.0-beta
[Feature][Connector-V2] Add local file connector source (#2419)https://github.com/apache/seatunnel/commit/eff595c452.2.0-beta
[Improve][Connector-V2] Refactor the package of local file connector (#2403)https://github.com/apache/seatunnel/commit/a538daed52.2.0-beta
[Feature][Connector-V2] Add json file sink & json format (#2385)https://github.com/apache/seatunnel/commit/dd68c06b02.2.0-beta
[Imporve][Connector-V2] Remove redundant type judge logic because of pr #2315 (#2370)https://github.com/apache/seatunnel/commit/42e8c25e52.2.0-beta
[Feature][Connector-V2] Support orc file format in file connector (#2369)https://github.com/apache/seatunnel/commit/f44fe1e032.2.0-beta
[improve][UT] Upgrade junit to 5.+ (#2305)https://github.com/apache/seatunnel/commit/362319ff32.2.0-beta
[Connector-V2] Add parquet writer in file connector (#2273)https://github.com/apache/seatunnel/commit/c95cc72cf2.2.0-beta
[checkstyle] Improved validation scope of MagicNumber (#2194)https://github.com/apache/seatunnel/commit/6d08b5f362.2.0-beta
[Connector-V2] Add Hive sink connector v2 (#2158)https://github.com/apache/seatunnel/commit/23ad4ee732.2.0-beta
[Connector-V2] Add File Sink Connector (#2117)https://github.com/apache/seatunnel/commit/e2283da642.2.0-beta