LocalFile
本地文件接收器
描述
将数据输出到本地文件。
如果你使用的是 spark/flink,为了使用此连接器,你必须确保你的 spark/flink 集群已集成 hadoop。已测试的 hadoop 版本是 2.x。
如果你使用 SeaTunnel Engine,它会在下载和安装 SeaTunnel Engine 时自动集成 hadoop jar。你可以在 ${SEATUNNEL_HOME}/lib 下检查 jar 包以确认这一点。
主要特性
使用二进制文件格式读取和写入任何格式的文件,例如视频、图片等。简而言之,任何文件都可以同步到目标位置。
默认情况下,我们使用 2PC 提交以确保
精确一次。文件格式类型
- 文本
- csv
- parquet
- orc
- json
- excel
- xml
- 二进制
- canal_json
- debezium_json
- maxwell_json
选项
| 名称 | 类型 | 是否必需 | 默认值 | 描述 |
|---|---|---|---|---|
| path | string | 是 | - | 目标目录路径 |
| tmp_path | string | 否 | /tmp/seatunnel | 结果文件将首先写入临时路径,然后使用 mv 将临时目录提交到目标目录。 |
| custom_filename | boolean | 否 | false | 是否需要自定义文件名 |
| file_name_expression | string | 否 | "${transactionId}" | 仅在 custom_filename 为 true 时使用 |
| filename_time_format | string | 否 | "yyyy.MM.dd" | 仅在 custom_filename 为 true 时使用 |
| file_format_type | string | 否 | "csv" | 文件格式类型 |
| filename_extension | string | 否 | - | 使用自定义的文件扩展名覆盖默认的文件扩展名。 例如:.xml, .json, dat, .customtype |
| field_delimiter | string | 否 | '\001' | 仅在 file_format_type 为 text 时使用 |
| row_delimiter | string | 否 | "\n" | 仅在 file_format_type 为 text、csv、json 时使用 |
| have_partition | boolean | 否 | false | 是否需要处理分区 |
| partition_by | array | 否 | - | 仅在 have_partition 为 true 时使用 |
| partition_dir_expression | string | 否 | "${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/" | 仅在 have_partition 为 true 时使用 |
| is_partition_field_write_in_file | boolean | 否 | false | 仅在 have_partition 为 true 时使用 |
| sink_columns | array | 否 | 当此参数为空时,所有字段都是 sink 列 | |
| is_enable_transaction | boolean | 否 | true | 是否启用事务 |
| batch_size | int | 否 | 1000000 | 批量大小 |
| single_file_mode | boolean | 否 | false | 每个并行度只会输出一个文件,当此参数开启时,batch_size就不会生效。输出的文件名没有文件块后缀。 |
| create_empty_file_when_no_data | boolean | 否 | false | 当上游没有数据同步时,依然生成对应的数据文件。 |
| compress_codec | string | 否 | none | 压缩编码 |
| common-options | object | 否 | - | 常见选项 |
| max_rows_in_memory | int | 否 | - | 仅在 file_format_type 为 excel 时使用 |
| sheet_name | string | 否 | Sheet${随机数} | 仅在 file_format_type 为 excel 时使用 |
| csv_string_quote_mode | enum | 否 | MINIMAL | 仅在文件格式为 CSV 时使用。 |
| xml_root_tag | string | 否 | RECORDS | 仅在 file_format 为 xml 时使用 |
| xml_row_tag | string | 否 | RECORD | 仅在 file_format 为 xml 时使用 |
| xml_use_attr_format | boolean | 否 | - | 仅在 file_format 为 xml 时使用 |
| parquet_avro_write_timestamp_as_int96 | boolean | 否 | false | 仅在 file_format 为 parquet 时使用 |
| parquet_avro_write_fixed_as_int96 | array | 否 | - | 仅在 file_format 为 parquet 时使用 |
| enable_header_write | boolean | 否 | false | 仅在 file_format_type 为 text,csv 时使用。 false:不写入表头,true:写入表头。 |
| encoding | string | 否 | "UTF-8" | 仅在 file_format_type 为 json,text,csv,xml 时使用 |
path [string]
目标目录路径是必需的,你可以通过使用 ${database_name}、${table_name} 和 ${schema_name} 将上游的 CatalogTable 注入到路径中。
custom_filename [boolean]
是否自定义文件名
file_name_expression [string]
仅在 custom_filename 为 true 时使用
file_name_expression 描述将创建到 path 中的文件表达式。我们可以在 file_name_expression 中添加变量 ${now} 或 ${uuid},例如 test_${uuid}_${now},${now} 表示当前时间,其格式可以通过指定 filename_time_format 选项来定义。
请注意,如果 is_enable_transaction 为 true,我们将自动在文件名的头部添加 ${transactionId}_。
filename_time_format [string]
仅在 custom_filename 为 true 时使用
当 file_name_expression 参数中的格式为 xxxx-${now} 时,filename_time_format 可以指定路径的时间格式,默认值为 yyyy.MM.dd。常用的时间格式如下所示:
| 符号 | 描述 |
|---|---|
| y | 年 |
| M | 月 |
| d | 日 |
| H | 小时 (0-23) |
| m | 分钟 |
| s | 秒 |
file_format_type [string]
我们支持以下文件类型:
text csv parquet orc json excel xml binary canal_json debezium_json maxwell_json
请注意,最终的文件名将以 file_format_type 的后缀结尾,文本文件的后缀是 txt。
field_delimiter [string]
数据行中列之间的分隔符。仅在 text 文件格式下需要。
row_delimiter [string]
文件中行之间的分隔符。仅在 text、csv、json 文件格式下需要。
have_partition [boolean]
是否需要处理分区。
partition_by [array]
仅在 have_partition 为 true 时使用。
基于选定字段进行数据分区。
partition_dir_expression [string]
仅在 have_partition 为 true 时使用。
如果指定了 partition_by,我们将基于分区信息生成相应的分区目录,最终文件将放置在分区目录中。
默认的 partition_dir_expression 是 ${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/。k0 是第一个分区字段,v0 是第一个分区字段的值。
is_partition_field_write_in_file [boolean]
仅在 have_partition 为 true 时使用。
如果 is_partition_field_write_in_file 为 true,分区字段及其值将写入数据文件。
例如,如果你想写入一个 Hive 数据文件,其值应该为 false。
sink_columns [array]
需要写入文件的列,默认值为从 Transform 或 Source 获取的所有列。字段的顺序决定了实际写入文件的顺序。
is_enable_transaction [boolean]
如果 is_enable_transaction 为 true,我们将确保数据在写入目标目录时不会丢失或重复。
请注意,如果 is_enable_transaction 为 true,我们将自动在文件名前添加 ${transactionId}_。
目前仅支持 true。
batch_size [int]
文件中的最大行数。对于 SeaTunnel Engine,文件中的行数由 batch_size 和 checkpoint.interval 共同决定。如果 checkpoint.interval 的值足够大,sink writer 将在文件中的行数超过 batch_size 时写入文件。如果 checkpoint.interval 很小,当触发新检查点时,sink writer 将创建一个新文件。
compress_codec [string]
文件的压缩编码,支持的压缩编码如下所示:
- txt:
lzonone - json:
lzonone - csv:
lzonone - orc:
lzosnappylz4zlibnone - parquet:
lzosnappylz4gzipbrotlizstdnone
提示:excel 类型不支持任何压缩格式
常见选项
Sink 插件的常见参数,请参阅 Sink 常见选项 获取详细信息。
max_rows_in_memory [int]
当文件格式为 Excel 时,内存中可以缓存的数据项最大数量。
sheet_name [string]
工作簿的表名。
csv_string_quote_mode [string]
当文件格式为 CSV 时,CSV 的字符串引号模式。
- ALL:所有字符串字段都会加引号。
- MINIMAL:仅为包含特殊字符(如字段分隔符、引号字符或行分隔符字符串中的任何字符)的字段加引号。
- NONE:从不为字段加引号。当数据中包含分隔符时,输出会在前面加上转义字符。如果未设置转义字符,则格式验证会抛出异常。
xml
_root_tag [string]
指定 XML 文件中根元素的标签名。
xml_row_tag [string]
指定 XML 文件中数据行的标签名。
xml_use_attr_format [boolean]
指定是否使用标签属性格式处理数据。
parquet_avro_write_timestamp_as_int96 [boolean]
支持从时间戳写入 Parquet INT96,仅对 parquet 文件有效。
parquet_avro_write_fixed_as_int96 [array]
支持从 12 字节字段写入 Parquet INT96,仅对 parquet 文件有效。
enable_header_write [boolean]
仅在 file_format_type 为 text,csv 时使用。false:不写入表头,true:写入表头。
encoding [string]
仅在 file_format_type 为 json,text,csv,xml 时使用。文件写入的编码。该参数将通过 Charset.forName(encoding) 解析。
示例
对于 orc 文件格式的简单配置
LocalFile {
path = "/tmp/hive/warehouse/test2"
file_format_type = "orc"
}
对于带有 encoding 的 json、text、csv 或 xml 文件格式
LocalFile {
path = "/tmp/hive/warehouse/test2"
file_format_type = "text"
encoding = "gbk"
}
对于带有 sink_columns 的 parquet 文件格式
LocalFile {
path = "/tmp/hive/warehouse/test2"
file_format_type = "parquet"
sink_columns = ["name","age"]
}
对于带有 have_partition、custom_filename 和 sink_columns 的 text 文件格式
LocalFile {
path = "/tmp/hive/warehouse/test2"
file_format_type = "text"
field_delimiter = "\t"
row_delimiter = "\n"
have_partition = true
partition_by = ["age"]
partition_dir_expression = "${k0}=${v0}"
is_partition_field_write_in_file = true
custom_filename = true
file_name_expression = "${transactionId}_${now}"
filename_time_format = "yyyy.MM.dd"
sink_columns = ["name","age"]
is_enable_transaction = true
}
对于带有 sheet_name 和 max_rows_in_memory 的 excel 文件格式
LocalFile {
path="/tmp/seatunnel/excel"
sheet_name = "Sheet1"
max_rows_in_memory = 1024
partition_dir_expression="${k0}=${v0}"
is_partition_field_write_in_file=true
file_name_expression="${transactionId}_${now}"
file_format_type="excel"
filename_time_format="yyyy.MM.dd"
is_enable_transaction=true
}
对于从上游提取源元数据,可以在路径中使用 ${database_name}、${table_name} 和 ${schema_name}。
LocalFile {
path = "/tmp/hive/warehouse/${table_name}"
file_format_type = "parquet"
sink_columns = ["name","age"]
}
变更日志
Change Log
| Change | Commit | Version |
|---|---|---|
| [Improve][Connector-V2] File Source Support filtering files by last modified time. (#9526) | https://github.com/apache/seatunnel/commit/cde4c3d410 | 2.3.12 |
| [Feature][Format] Improve maxwell_json,canal_json,debezium_json format add ts_ms and table (#9701) | https://github.com/apache/seatunnel/commit/fb8444b946 | 2.3.12 |
| [Improve][Connector-V2] Add customizable row delimiter support for text file processing (#9608) | https://github.com/apache/seatunnel/commit/7898e62e01 | 2.3.12 |
| [Feature][Sink] File support new format: maxwell_json,canal_json,debezium_json (#9278) (#9336) | https://github.com/apache/seatunnel/commit/a1bfbb20dd | 2.3.12 |
| [Improve][Connector-V2] Support maxcompute sink writer with timestamp field type (#9234) | https://github.com/apache/seatunnel/commit/a513c495e3 | 2.3.12 |
| [improve] update file connectors config (#9034) | https://github.com/apache/seatunnel/commit/8041d59dc2 | 2.3.11 |
| [Improve] Refactor file enumerator to prevent duplicate put split (#8989) | https://github.com/apache/seatunnel/commit/fdf1beae9c | 2.3.11 |
| [Improve][File] Add row_delimiter options into text file sink (#9017) | https://github.com/apache/seatunnel/commit/92aa855a34 | 2.3.11 |
| Revert " [improve] update localfile connector config" (#9018) | https://github.com/apache/seatunnel/commit/cdc79e13ad | 2.3.10 |
| [improve] update localfile connector config (#8765) | https://github.com/apache/seatunnel/commit/def369a85f | 2.3.10 |
[Feature][Connector-V2] Add filename_extension parameter for read/write file (#8769) | https://github.com/apache/seatunnel/commit/78b23c0ef5 | 2.3.10 |
| [Improve] restruct connector common options (#8634) | https://github.com/apache/seatunnel/commit/f3499a6eeb | 2.3.10 |
| [Feature][Connector-V2] Support create emtpy file when no data (#8543) | https://github.com/apache/seatunnel/commit/275db78918 | 2.3.10 |
| [Feature][Connector-V2] Support single file mode in file sink (#8518) | https://github.com/apache/seatunnel/commit/e893deed50 | 2.3.10 |
| [Feature][File] Support config null format for text file read (#8109) | https://github.com/apache/seatunnel/commit/2dbf02df47 | 2.3.9 |
| [Improve][API] Unified tables_configs and table_list (#8100) | https://github.com/apache/seatunnel/commit/84c0b8d660 | 2.3.9 |
| [Feature][Restapi] Allow metrics information to be associated to logical plan nodes (#7786) | https://github.com/apache/seatunnel/commit/6b7c53d03c | 2.3.9 |
| [Improve][Connector-V2] Support read archive compress file (#7633) | https://github.com/apache/seatunnel/commit/3f98cd8a16 | 2.3.8 |
| [Improve][Connector] Add multi-table sink option check (#7360) | https://github.com/apache/seatunnel/commit/2489f6446b | 2.3.7 |
| [Feature][Core] Support using upstream table placeholders in sink options and auto replacement (#7131) | https://github.com/apache/seatunnel/commit/c4ca74122c | 2.3.6 |
| [feature][connector-file-local] add save mode function for localfile (#7080) | https://github.com/apache/seatunnel/commit/7b2f538310 | 2.3.6 |
| [Improve][Files] Support write fixed/timestamp as int96 of parquet (#6971) | https://github.com/apache/seatunnel/commit/1a48a9c493 | 2.3.6 |
[Chore] Fix file spell errors (#6606) | https://github.com/apache/seatunnel/commit/2599d3b736 | 2.3.5 |
| [Feature][Connectors-V2][File]support assign encoding for file source/sink (#6489) | https://github.com/apache/seatunnel/commit/d159fbe086 | 2.3.5 |
| Add support for XML file type to various file connectors such as SFTP, FTP, LocalFile, HdfsFile, and more. (#6327) | https://github.com/apache/seatunnel/commit/ec533ecd9a | 2.3.5 |
| [Feature][OssFile Connector] Make Oss implement source factory and sink factory (#6062) | https://github.com/apache/seatunnel/commit/1a8e9b4554 | 2.3.4 |
| Add multiple table file sink to base (#6049) | https://github.com/apache/seatunnel/commit/085e0e5fc3 | 2.3.4 |
| [Refactor][File Connector] Put Multiple Table File API to File Base Module (#6033) | https://github.com/apache/seatunnel/commit/c324d663b4 | 2.3.4 |
| Support using multiple hadoop account (#5903) | https://github.com/apache/seatunnel/commit/d69d88d1aa | 2.3.4 |
| [Feature] LocalFile sink support multiple table (#5931) | https://github.com/apache/seatunnel/commit/0fdf45f94d | 2.3.4 |
| [Feature] LocalFileSource support multiple table | https://github.com/apache/seatunnel/commit/72be6663ad | 2.3.4 |
| [Improve][Common] Introduce new error define rule (#5793) | https://github.com/apache/seatunnel/commit/9d1b2582b2 | 2.3.4 |
| [Improve][connector-file] unifiy option between file source/sink and update document (#5680) | https://github.com/apache/seatunnel/commit/8d87cf8fc4 | 2.3.4 |
| [Feature][Connector-V2][File] Support read empty directory (#5591) | https://github.com/apache/seatunnel/commit/1f58f224a0 | 2.3.4 |
| Support config column/primaryKey/constraintKey in schema (#5564) | https://github.com/apache/seatunnel/commit/eac76b4e50 | 2.3.4 |
| [Feature][File Connector]optionrule FILE_FORMAT_TYPE is text/csv ,add parameter BaseSinkConfig.ENABLE_HEADER_WRITE: #5566 (#5567) | https://github.com/apache/seatunnel/commit/0e02db768d | 2.3.4 |
| [Feature][Connector V2][File] Add config of 'file_filter_pattern', which used for filtering files. (#5153) | https://github.com/apache/seatunnel/commit/a3c13e59eb | 2.3.3 |
| [Feature][ConnectorV2]add file excel sink and source (#4164) | https://github.com/apache/seatunnel/commit/e3b97ae5d2 | 2.3.2 |
| Change file type to file_format_type in file source/sink (#4249) | https://github.com/apache/seatunnel/commit/973a2fae3c | 2.3.1 |
| Merge branch 'dev' into merge/cdc | https://github.com/apache/seatunnel/commit/4324ee1912 | 2.3.1 |
| [Improve][Project] Code format with spotless plugin. | https://github.com/apache/seatunnel/commit/423b583038 | 2.3.1 |
| [improve][api] Refactoring schema parse (#4157) | https://github.com/apache/seatunnel/commit/b2f573a13e | 2.3.1 |
| [Improve][build] Give the maven module a human readable name (#4114) | https://github.com/apache/seatunnel/commit/d7cd601051 | 2.3.1 |
| [Improve][Project] Code format with spotless plugin. (#4101) | https://github.com/apache/seatunnel/commit/a2ab166561 | 2.3.1 |
| [Feature][Connector-V2][File] Support compress (#3899) | https://github.com/apache/seatunnel/commit/55602f6b1c | 2.3.1 |
| [Feature][Connector] add get source method to all source connector (#3846) | https://github.com/apache/seatunnel/commit/417178fb84 | 2.3.1 |
| [Improve][Connector-V2][File] Improve file connector option rule and document (#3812) | https://github.com/apache/seatunnel/commit/bd76077669 | 2.3.1 |
| [Feature][Shade] Add seatunnel hadoop3 uber (#3755) | https://github.com/apache/seatunnel/commit/5a024bdf8f | 2.3.0 |
| [Hotfix][OptionRule] Fix option rule about all connectors (#3592) | https://github.com/apache/seatunnel/commit/226dc6a119 | 2.3.0 |
| [Improve][Connector-V2][File] Unified excetion for file source & sink connectors (#3525) | https://github.com/apache/seatunnel/commit/031e8e263c | 2.3.0 |
| [Feature][Connector-V2][File] Add option and factory for file connectors (#3375) | https://github.com/apache/seatunnel/commit/db286e8631 | 2.3.0 |
| [Improve][Connector-V2][File] Improve code structure (#3238) | https://github.com/apache/seatunnel/commit/dd5c353881 | 2.3.0 |
| [Connector-V2][ElasticSearch] Add ElasticSearch Source/Sink Factory (#3325) | https://github.com/apache/seatunnel/commit/38254e3f26 | 2.3.0 |
| [Improve][Connector-V2][File] Support parse field from file path (#2985) | https://github.com/apache/seatunnel/commit/0bc12085c2 | 2.3.0-beta |
| [Improve][connector][file] Support user-defined schema for reading text file (#2976) | https://github.com/apache/seatunnel/commit/1c05ee0d7e | 2.3.0-beta |
| [Improve][Connector] Improve write parquet (#2943) | https://github.com/apache/seatunnel/commit/8fd966394b | 2.3.0-beta |
| [Fix][Connector-V2] Fix HiveSource Connector read orc table error (#2845) | https://github.com/apache/seatunnel/commit/61720306e7 | 2.2.0-beta |
| [Improve][Connector-V2] Improve read parquet (#2841) | https://github.com/apache/seatunnel/commit/e19bc82f9b | 2.2.0-beta |
| [Bug][Connector-V2] Fix error option (#2775) | https://github.com/apache/seatunnel/commit/488e561eef | 2.2.0-beta |
| [Improve][Connector-V2] Refactor local file sink connector code structure (#2655) | https://github.com/apache/seatunnel/commit/6befd599a1 | 2.2.0-beta |
| [#2606]Dependency management split (#2630) | https://github.com/apache/seatunnel/commit/fc047be69b | 2.2.0-beta |
| [chore][connector-common] Rename SeatunnelSchema to SeaTunnelSchema (#2538) | https://github.com/apache/seatunnel/commit/7dc2a27388 | 2.2.0-beta |
| [Feature][Connector-V2] Local file json support (#2465) | https://github.com/apache/seatunnel/commit/65a92f2496 | 2.2.0-beta |
| [Feature][Connector-V2] Add local file connector source (#2419) | https://github.com/apache/seatunnel/commit/eff595c452 | 2.2.0-beta |
| [Improve][Connector-V2] Refactor the package of local file connector (#2403) | https://github.com/apache/seatunnel/commit/a538daed5c | 2.2.0-beta |
| [Feature][Connector-V2] Add json file sink & json format (#2385) | https://github.com/apache/seatunnel/commit/dd68c06b0a | 2.2.0-beta |
| [Imporve][Connector-V2] Remove redundant type judge logic because of pr #2315 (#2370) | https://github.com/apache/seatunnel/commit/42e8c25e50 | 2.2.0-beta |
| [Feature][Connector-V2] Support orc file format in file connector (#2369) | https://github.com/apache/seatunnel/commit/f44fe1e033 | 2.2.0-beta |
| [improve][UT] Upgrade junit to 5.+ (#2305) | https://github.com/apache/seatunnel/commit/362319ff3e | 2.2.0-beta |
| [Connector-V2] Add parquet writer in file connector (#2273) | https://github.com/apache/seatunnel/commit/c95cc72cfa | 2.2.0-beta |
| [checkstyle] Improved validation scope of MagicNumber (#2194) | https://github.com/apache/seatunnel/commit/6d08b5f369 | 2.2.0-beta |
| [Connector-V2] Add Hive sink connector v2 (#2158) | https://github.com/apache/seatunnel/commit/23ad4ee735 | 2.2.0-beta |
| [Connector-V2] Add File Sink Connector (#2117) | https://github.com/apache/seatunnel/commit/e2283da64f | 2.2.0-beta |