跳到主要内容
版本:Next

FtpFile

Ftp 文件 Source 连接器

支持的引擎

Spark
Flink
SeaTunnel Zeta

关键特性

描述

从 FTP 文件服务器读取数据。

:::提示

如果您使用 Spark/Flink,为了使用此连接器,您必须确保您的 Spark/Flink 集群已经集成了 Hadoop。测试的 Hadoop 版本为 2.x。 如果您使用 SeaTunnel Engine,当您下载并安装 SeaTunnel Engine 时,它会自动集成 Hadoop 的 jar 包。您可以在 ${SEATUNNEL_HOME}/lib 目录下检查 jar 包以确认这一点。

:::

配置项

名称类型是否必填默认值
hoststring-
portint-
userstring-
passwordstring-
pathstring-
file_format_typestring-
connection_modestringactive_local
delimiter/field_delimiterstring\001
read_columnslist-
parse_partition_from_pathbooleantrue
date_formatstringyyyy-MM-dd
datetime_formatstringyyyy-MM-dd HH:mm:ss
time_formatstringHH:mm:ss
skip_header_row_numberlong0
schemaconfig-
sheet_namestring-
xml_row_tagstring-
xml_use_attr_formatboolean-
file_filter_patternstring-
compress_codecstringnone
archive_compress_codecstringnone
encodingstringUTF-8
null_formatstring-
common-options-

host [string]

目标 FTP 主机地址,必填项。

port [int]

目标 FTP 端口,必填项。

user [string]

目标 FTP 用户名,必填项。

password [string]

目标 FTP 密码,必填项。

path [string]

源文件路径。

file_filter_pattern [string]

文件过滤模式,用于过滤文件。

该模式遵循标准正则表达式。详情请参考:https://en.wikipedia.org/wiki/Regular_expression. 以下是一些示例。

文件结构示例:

/data/seatunnel/20241001/report.txt
/data/seatunnel/20241007/abch202410.csv
/data/seatunnel/20241002/abcg202410.csv
/data/seatunnel/20241005/old_data.csv
/data/seatunnel/20241012/logo.png

匹配规则示例:

示例 1匹配所有 .txt 文件,正则表达式:

/data/seatunnel/20241001/.*\.txt

该示例匹配结果为:

/data/seatunnel/20241001/report.txt

示例 2匹配所有以 abc 开头的文件,正则表达式:

/data/seatunnel/20241002/abc.*

该示例匹配结果为:

/data/seatunnel/20241007/abch202410.csv
/data/seatunnel/20241002/abcg202410.csv

示例 3匹配所有以 abc 开头的文件,且第四个字符为 h 或 g,正则表达式:

/data/seatunnel/20241007/abc[h,g].*

该示例匹配结果为:

/data/seatunnel/20241007/abch202410.csv

示例 4匹配第三级文件夹以 202410 开头且文件以 .csv 结尾的文件,正则表达式:

/data/seatunnel/202410\d*/.*\.csv

该示例匹配结果为:

/data/seatunnel/20241007/abch202410.csv
/data/seatunnel/20241002/abcg202410.csv
/data/seatunnel/20241005/old_data.csv

file_format_type [string]

文件类型,支持以下文件类型:

text csv parquet orc json excel xml binary

如果您将文件类型指定为 json,您还需要指定 schema 选项以告诉连接器如何将数据解析为您所需的行。

例如:

上游数据如下:


{"code": 200, "data": "get success", "success": true}

您应按如下方式指定 schema:


schema {
fields {
code = int
data = string
success = boolean
}
}

连接器将生成如下数据:

codedatasuccess
200get successtrue

如果您将文件类型指定为 textcsv,您可以选择是否指定 schema 信息。

例如,上游数据如下:


tyrantlucifer#26#male

如果您不指定数据 schema,连接器将按如下方式处理上游数据:

content
tyrantlucifer#26#male

如果您指定数据 schema,您还需要指定 field_delimiter 选项(CSV 文件类型除外)。

您应按如下方式指定 schema 和分隔符:


field_delimiter = "#"
schema {
fields {
name = string
age = int
gender = string
}
}

连接器将生成如下数据:

nameagegender
tyrantlucifer26male

如果您将文件类型指定为 binary,SeaTunnel 可以同步任何格式的文件, 例如压缩包、图片等。简而言之,任何文件都可以同步到目标位置。 在这种情况下,您需要确保源和接收端同时使用 binary 格式进行文件同步。 您可以在下面的示例中找到具体用法。

connection_mode [string]

目标 FTP 连接模式,默认为主动模式,支持以下模式:

active_local passive_local

delimiter/field_delimiter [string]

delimiter 参数将在 2.3.5 版本后弃用,请使用 field_delimiter 代替。

仅在文件格式为 text 时需要配置。

字段分隔符,用于告诉连接器如何切分字段。

默认值为 \001,与 Hive 的默认分隔符相同。

parse_partition_from_path [boolean]

控制是否从文件路径中解析分区键和值。

例如,如果您从路径 ftp://hadoop-cluster/tmp/seatunnel/parquet/name=tyrantlucifer/age=26 读取文件,

文件中的每条记录数据将添加以下两个字段:

nameage
tyrantlucifer26

提示:不要在 schema 选项中定义分区字段

date_format [string]

日期类型格式,用于告诉连接器如何将字符串转换为日期,支持以下格式:

yyyy-MM-dd yyyy.MM.dd yyyy/MM/dd

默认值为 yyyy-MM-dd

datetime_format [string]

日期时间类型格式,用于告诉连接器如何将字符串转换为日期时间,支持以下格式:

yyyy-MM-dd HH:mm:ss yyyy.MM.dd HH:mm:ss yyyy/MM/dd HH:mm:ss yyyyMMddHHmmss

默认值为 yyyy-MM-dd HH:mm:ss

time_format [string]

时间类型格式,用于告诉连接器如何将字符串转换为时间,支持以下格式:

HH:mm:ss HH:mm:ss.SSS

默认值为 HH:mm:ss

skip_header_row_number [long]

跳过前几行,仅适用于 txt 和 csv 文件。

例如,设置如下:

skip_header_row_number = 2

SeaTunnel 将从源文件中跳过前 2 行。

schema [config]

仅在文件格式类型为 text、json、excel、xml 或 csv(或其他无法从元数据中读取 schema 的格式)时需要配置。

上游数据的 schema 信息。

read_columns [list]

数据源的读取列列表,用户可以使用它来实现字段投影。

sheet_name [string]

读取工作簿中的工作表,仅在文件格式类型为 excel 时使用。

xml_row_tag [string]

仅在文件格式为 xml 时需要配置。

指定 XML 文件中数据行的标签名称。

xml_use_attr_format [boolean]

仅在文件格式为 xml 时需要配置。

指定是否使用标签属性格式处理数据。

compress_codec [string]

文件的压缩编解码器,支持的详细信息如下:

  • txt: lzo none
  • json: lzo none
  • csv: lzo none
  • orc/parquet:
    自动识别压缩类型,无需额外设置。

archive_compress_codec [string]

归档文件的压缩编解码器,支持的详细信息如下:

archive_compress_codec文件格式归档压缩后缀
ZIPtxt,json,excel,xml.zip
TARtxt,json,excel,xml.tar
TAR_GZtxt,json,excel,xml.tar.gz
GZtxt,json,excel,xml.gz
NONEall.*

注意:gz 压缩的 excel 文件需要压缩原始文件或指定文件后缀,例如 e2e.xls ->e2e_test.xls.gz

encoding [string]

仅在文件格式类型为 json、text、csv、xml 时使用。 读取文件的编码。此参数将通过 Charset.forName(encoding) 解析。

null_format [string]

仅在文件格式类型为 text 时使用。 用于定义哪些字符串可以表示为 null。

例如:\N

通用选项

源插件的通用参数,详情请参考 源通用选项

示例


FtpFile {
path = "/tmp/seatunnel/sink/text"
host = "192.168.31.48"
port = 21
user = tyrantlucifer
password = tianchao
file_format_type = "text"
schema = {
name = string
age = int
}
field_delimiter = "#"
}

多表配置


FtpFile {
tables_configs = [
{
schema {
table = "student"
}
path = "/tmp/seatunnel/sink/text"
host = "192.168.31.48"
port = 21
user = tyrantlucifer
password = tianchao
file_format_type = "parquet"
},
{
schema {
table = "teacher"
}
path = "/tmp/seatunnel/sink/text"
host = "192.168.31.48"
port = 21
user = tyrantlucifer
password = tianchao
file_format_type = "parquet"
}
]
}


FtpFile {
tables_configs = [
{
schema {
fields {
name = string
age = int
}
}
path = "/apps/hive/demo/student"
file_format_type = "json"
},
{
schema {
fields {
name = string
age = int
}
}
path = "/apps/hive/demo/teacher"
file_format_type = "json"
}
}

传输二进制文件


env {
parallelism = 1
job.mode = "BATCH"
}

source {
FtpFile {
host = "192.168.31.48"
port = 21
user = tyrantlucifer
password = tianchao
path = "/seatunnel/read/binary/"
file_format_type = "binary"
}
}
sink {
// 您可以将本地文件传输到 s3/hdfs/oss 等。
FtpFile {
host = "192.168.31.48"
port = 21
user = tyrantlucifer
password = tianchao
path = "/seatunnel/read/binary2/"
file_format_type = "binary"
}
}

过滤文件

env {
parallelism = 1
job.mode = "BATCH"
}

source {
FtpFile {
host = "192.168.31.48"
port = 21
user = tyrantlucifer
password = tianchao
path = "/seatunnel/read/binary/"
file_format_type = "binary"
// 文件示例 abcD2024.csv
file_filter_pattern = "abc[DX]*.*"
}
}

sink {
Console {
}
}

变更日志

Change Log
ChangeCommitVersion
[Feature][Connector-V2] Add filename_extension parameter for read/write file (#8769)https://github.com/apache/seatunnel/commit/78b23c0ef5dev
[Improve] restruct connector common options (#8634)https://github.com/apache/seatunnel/commit/f3499a6eebdev
[Feature][Connector-V2] Support create emtpy file when no data (#8543)https://github.com/apache/seatunnel/commit/275db78918dev
[Feature][Connector-V2] Support single file mode in file sink (#8518)https://github.com/apache/seatunnel/commit/e893deed50dev
[Improve][Connector-V2] Add some debug log when create dir in (S)FTP (#8286)https://github.com/apache/seatunnel/commit/8687bb8e912.3.9
[Feature][File] Support config null format for text file read (#8109)https://github.com/apache/seatunnel/commit/2dbf02df472.3.9
[Fix][Connector-V2][FTP] Fix FTP connector connection_mode is not effective (#7865)https://github.com/apache/seatunnel/commit/26c528a5ed2.3.9
[Feature][Restapi] Allow metrics information to be associated to logical plan nodes (#7786)https://github.com/apache/seatunnel/commit/6b7c53d03c2.3.9
[Feature][Connector-V2]Ftp file source support multiple table (#7795)https://github.com/apache/seatunnel/commit/22fe27a3d62.3.9
[Improve][Connector-V2] Support read archive compress file (#7633)https://github.com/apache/seatunnel/commit/3f98cd8a162.3.8
[Feature][Connector-V2] Ftp file sink suport multiple table and save mode (#7665)https://github.com/apache/seatunnel/commit/4f812e12ae2.3.8
[Improve][Files] Support write fixed/timestamp as int96 of parquet (#6971)https://github.com/apache/seatunnel/commit/1a48a9c4932.3.6
[Feature][Connector-V2] Supports the transfer of any file (#6826)https://github.com/apache/seatunnel/commit/c1401787b32.3.6
Add support for XML file type to various file connectors such as SFTP, FTP, LocalFile, HdfsFile, and more. (#6327)https://github.com/apache/seatunnel/commit/ec533ecd9a2.3.5
[Feature][Connectors-v2-file-ftp] FTP source/sink add ftp connection mode (#6077) (#6099)https://github.com/apache/seatunnel/commit/f6bcc4d59d2.3.4
[Refactor][File Connector] Put Multiple Table File API to File Base Module (#6033)https://github.com/apache/seatunnel/commit/c324d663b42.3.4
Support using multiple hadoop account (#5903)https://github.com/apache/seatunnel/commit/d69d88d1aa2.3.4
[Improve][Common] Introduce new error define rule (#5793)https://github.com/apache/seatunnel/commit/9d1b2582b22.3.4
[Improve][connector-file] unifiy option between file source/sink and update document (#5680)https://github.com/apache/seatunnel/commit/8d87cf8fc42.3.4
[Feature] Support LZO compress on File Read (#5083)https://github.com/apache/seatunnel/commit/a4a19010962.3.4
[Feature][Connector-V2][File] Support read empty directory (#5591)https://github.com/apache/seatunnel/commit/1f58f224a02.3.4
Support config column/primaryKey/constraintKey in schema (#5564)https://github.com/apache/seatunnel/commit/eac76b4e502.3.4
[Feature][File Connector]optionrule FILE_FORMAT_TYPE is text/csv ,add parameter BaseSinkConfig.ENABLE_HEADER_WRITE: #5566 (#5567)https://github.com/apache/seatunnel/commit/0e02db768d2.3.4
[Feature][Connector V2][File] Add config of 'file_filter_pattern', which used for filtering files. (#5153)https://github.com/apache/seatunnel/commit/a3c13e59eb2.3.3
[Feature][ConnectorV2]add file excel sink and source (#4164)https://github.com/apache/seatunnel/commit/e3b97ae5d22.3.2
Change file type to file_format_type in file source/sink (#4249)https://github.com/apache/seatunnel/commit/973a2fae3c2.3.1
Merge branch 'dev' into merge/cdchttps://github.com/apache/seatunnel/commit/4324ee19122.3.1
[Improve][Project] Code format with spotless plugin.https://github.com/apache/seatunnel/commit/423b5830382.3.1
[improve][api] Refactoring schema parse (#4157)https://github.com/apache/seatunnel/commit/b2f573a13e2.3.1
[Improve][build] Give the maven module a human readable name (#4114)https://github.com/apache/seatunnel/commit/d7cd6010512.3.1
[Improve][Project] Code format with spotless plugin. (#4101)https://github.com/apache/seatunnel/commit/a2ab1665612.3.1
[Feature][Connector-V2][File] Support compress (#3899)https://github.com/apache/seatunnel/commit/55602f6b1c2.3.1
[Feature][Connector] add get source method to all source connector (#3846)https://github.com/apache/seatunnel/commit/417178fb842.3.1
[Improve][Connector-V2][File] Improve file connector option rule and document (#3812)https://github.com/apache/seatunnel/commit/bd760776692.3.1
[Feature][Shade] Add seatunnel hadoop3 uber (#3755)https://github.com/apache/seatunnel/commit/5a024bdf8f2.3.0
[Hotfix][OptionRule] Fix option rule about all connectors (#3592)https://github.com/apache/seatunnel/commit/226dc6a1192.3.0
[Improve][Connector-V2][File] Unified excetion for file source & sink connectors (#3525)https://github.com/apache/seatunnel/commit/031e8e263c2.3.0
[Feature][Connector-V2][File] Add option and factory for file connectors (#3375)https://github.com/apache/seatunnel/commit/db286e86312.3.0
[Improve][Connector-V2][File] Improve code structure (#3238)https://github.com/apache/seatunnel/commit/dd5c3538812.3.0
[Connector-V2][ElasticSearch] Add ElasticSearch Source/Sink Factory (#3325)https://github.com/apache/seatunnel/commit/38254e3f262.3.0
[Core][Improve] Fix some sonar check error (#3240)https://github.com/apache/seatunnel/commit/8664bb53a52.3.0
[Improve][Connector-V2][File] Support parse field from file path (#2985)https://github.com/apache/seatunnel/commit/0bc12085c22.3.0-beta
[Improve][connector][file] Support user-defined schema for reading text file (#2976)https://github.com/apache/seatunnel/commit/1c05ee0d7e2.3.0-beta
[Improve][Connector] Improve write parquet (#2943)https://github.com/apache/seatunnel/commit/8fd966394b2.3.0-beta
[Fix][Connector-V2] Fix HiveSource Connector read orc table error (#2845)https://github.com/apache/seatunnel/commit/61720306e72.2.0-beta
[Improve][Connector-V2] Improve read parquet (#2841)https://github.com/apache/seatunnel/commit/e19bc82f9b2.2.0-beta
[Imporve][Connector-V2] Refactor ftp sink & Add ftp file source (#2774)https://github.com/apache/seatunnel/commit/4aacbcdd1f2.2.0-beta
[Feature][File connector] Support ftp file sink (#2483)https://github.com/apache/seatunnel/commit/a87e5de80a2.2.0-beta