跳到主要内容
版本:Next

CosFile

CosFile source 连接器

支持引擎

Spark
Flink
SeaTunnel Zeta

关键特性

在pollNext调用中读取拆分的所有数据。读取的拆分内容将保存在快照中。

描述

从阿里云Cos文件系统读取数据。

:::提示

如果你使用spark/flink,为了使用这个连接器,你必须确保你的spark/flilk集群已经集成了hadoop。测试的hadoop版本是2.x

如果你使用SeaTunnel Engine,当你下载并安装SeaTunnel引擎时,它会自动集成hadoop jar。您可以在${SEATUNNEL_HOME}/lib下检查jar包以确认这一点.

要使用此连接器,您需要将hadoop-cos-{hadoop.version}-{version}.jar和cos_api-bundle-{version}.jar位于${SEATUNNEL_HOME}/lib目录中,下载:Hadoop-Cos-release. 它只支持hadoop 2.6.5+和8.0.2版本+.

:::

选项

名称类型必需默认值
pathstring-
file_format_typestring-
bucketstring-
secret_idstring-
secret_keystring-
regionstring-
read_columnslist-
delimiter/field_delimiterstring\001
parse_partition_from_pathbooleantrue
skip_header_row_numberlong0
date_formatstringyyyy-MM-dd
datetime_formatstringyyyy-MM-dd HH:mm:ss
time_formatstringHH:mm:ss
schemaconfig-
sheet_namestring-
xml_row_tagstring-
xml_use_attr_formatboolean-
file_filter_patternstring
compress_codecstringnone
archive_compress_codecstringnone
encodingstringUTF-8
common-options-

path [string]

源文件路径。

file_format_type [string]

文件类型,支持以下文件类型:

text csv parquet orc json excel xml binary

如果您将文件类型设置为“json”,您还应该分配模式选项,告诉连接器如何将数据解析到所需的行。

例如:

上游数据如下:


{"code": 200, "data": "get success", "success": true}

您还可以将多条数据保存在一个文件中,并按换行符拆分它们:


{"code": 200, "data": "get success", "success": true}
{"code": 300, "data": "get failed", "success": false}

您应该按如下方式设置schema架构:


schema {
fields {
code = int
data = string
success = boolean
}
}

连接器将按如下方式生成数据:

codedatasuccess
200get successtrue

如果您将文件类型指定为“parquet” “orc”,则不需要模式选项,连接器可以自动找到上游数据的模式。

如果将文件类型指定为“text” “csv”,则可以选择是否指定schema架构信息。

例如,上游数据如下:


tyrantlucifer#26#male

如果不指定数据schema模式,连接器将按如下方式处理上游数据:

content
tyrantlucifer#26#male

如果指定数据模式,除了CSV文件类型外,还应指定“field_delimiter”选项

您应该按如下方式分配模式和分隔符:


field_delimiter = "#"
schema {
fields {
name = string
age = int
gender = string
}
}

连接器将按如下方式生成数据:

nameagegender
tyrantlucifer26male

如果将文件类型指定为“二进制”,SeaTunnel可以同步任何格式的文件, 例如压缩包、图片等。简而言之,任何文件都可以同步到目标位置。 根据此要求,您需要确保源端和目标端使用“二进制”格式进行文件同步同时。您可以在下面的示例中找到具体用法。

bucket [string]

Cos文件系统的bucket地址,例如: cos://tyrantlucifer-image-bed

secret_id [string]

Cos文件系统的秘密id。

secret_key [string]

Cos文件系统的密钥。

region [string]

cos文件系统的region。

read_columns [list]

读取数据源的列的列表,用户可以使用它来实现字段映射。

delimiter/field_delimiter [string]

delimiter 参数在2.3.5版本后将弃用,请改用field_delimiter

仅当file_format为文本时才需要配置。

字段分隔符,用于告诉连接器如何对字段进行切片和切块

默认值“\001”,与配置单元的默认分隔符相同

parse_partition_from_path [boolean]

控制是否从文件路径解析分区键和值

例如,如果从路径读取文件cosn://hadoop-cluster/tmp/seatunnel/parquet/name=tyrantlucifer/age=26

文件中的每个记录数据都将添加这两个字段:

nameage
tyrantlucifer26

提示:不要在schema选项中定义分区字段

skip_header_row_number [long]

跳过前几行,但仅限于txt和csv。

例如,设置如下:

skip_header_row_number = 2

那么SeaTunnel将跳过源文件的前两行

date_format [string]

日期类型格式,用于告诉连接器如何将字符串转换为日期,支持以下格式:

yyyy-MM-dd yyyy.MM.dd yyyy/MM/dd

default yyyy-MM-dd

datetime_format [string]

Datetime类型格式,用于告诉连接器如何将字符串转换为日期时间,支持以下格式:

yyyy-MM-dd HH:mm:ss yyyy.MM.dd HH:mm:ss yyyy/MM/dd HH:mm:ss yyyyMMddHHmmss

default yyyy-MM-dd HH:mm:ss

time_format [string]

时间类型格式,用于告诉连接器如何将字符串转换为时间,支持以下格式:

HH:mm:ss HH:mm:ss.SSS

default HH:mm:ss

schema [config]

仅当file_format_type为文本、json、excel、xml或csv(或我们无法从元数据中读取模式的其他格式)时才需要配置。

fields [Config]

上游数据的模式。

sheet_name [string]

仅当file_format为excel时才需要配置。

阅读工作簿的纸张。

xml_row_tag [string]

仅当file_format为xml时才需要配置。

指定XML文件中数据行的标记名称。

xml_use_attr_format [boolean]

仅当file_format为xml时才需要配置。 指定是否使用标记属性格式处理数据。

file_filter_pattern [string]

过滤模式,用于过滤文件。

该模式遵循标准正则表达式。详情请参阅https://en.wikipedia.org/wiki/Regular_expression. 有一些例子。

文件结构示例:

/data/seatunnel/20241001/report.txt
/data/seatunnel/20241007/abch202410.csv
/data/seatunnel/20241002/abcg202410.csv
/data/seatunnel/20241005/old_data.csv
/data/seatunnel/20241012/logo.png

匹配规则示例:

示例1匹配所有.txt文件,正则表达式:

/data/seatunnel/20241001/.*\.txt

此示例匹配的结果为:

/data/seatunnel/20241001/report.txt

示例2:匹配所有以abc开头的文件,正则表达式:

/data/seatunnel/20241002/abc.*

此示例匹配的结果为:

/data/seatunnel/20241007/abch202410.csv
/data/seatunnel/20241002/abcg202410.csv

示例3匹配所有以abc开头的文件,第四个字符是h或g,正则表达式:

/data/seatunnel/20241007/abc[h,g].*

此示例匹配的结果为:

/data/seatunnel/20241007/abch202410.csv

示例4:匹配以202410开头的三级文件夹和以.csv结尾的文件,正则表达式:

/data/seatunnel/202410\d*/.*\.csv

此示例匹配的结果为:

/data/seatunnel/20241007/abch202410.csv
/data/seatunnel/20241002/abcg202410.csv
/data/seatunnel/20241005/old_data.csv

compress_codec [string]

文件的压缩编解码器和支持的详细信息如下所示:

  • txt: lzo none
  • json: lzo none
  • csv: lzo none
  • orc/parquet:
    自动识别压缩类型,无需额外设置。

archive_compress_codec [string]

归档文件的压缩编解码器和支持的详细信息如下所示:

archive_compress_codecfile_formatarchive_compress_suffix
ZIPtxt,json,excel,xml.zip
TARtxt,json,excel,xml.tar
TAR_GZtxt,json,excel,xml.tar.gz
GZtxt,json,excel,xml.gz
NONEall.*

注意:gz压缩的excel文件需要压缩原始文件或指定文件后缀,如e2e.xls->e2e_test.xls.gz

encoding [string]

仅当file_format_type为json、text、csv、xml时使用。 要读取的文件的编码。此参数将由Charset.forName(encoding)解析。

common options

源插件常用参数,详见[源端通用选项](../Source-common-Options.md)。

例如


CosFile {
path = "/seatunnel/orc"
bucket = "cosn://seatunnel-test-1259587829"
secret_id = "xxxxxxxxxxxxxxxxxxx"
secret_key = "xxxxxxxxxxxxxxxxxxx"
region = "ap-chengdu"
file_format_type = "orc"
}


CosFile {
path = "/seatunnel/json"
bucket = "cosn://seatunnel-test-1259587829"
secret_id = "xxxxxxxxxxxxxxxxxxx"
secret_key = "xxxxxxxxxxxxxxxxxxx"
region = "ap-chengdu"
file_format_type = "json"
schema {
fields {
id = int
name = string
}
}
}

传输二进制文件


env {
parallelism = 1
job.mode = "BATCH"
}

source {
CosFile {
bucket = "cosn://seatunnel-test-1259587829"
secret_id = "xxxxxxxxxxxxxxxxxxx"
secret_key = "xxxxxxxxxxxxxxxxxxx"
region = "ap-chengdu"
path = "/seatunnel/read/binary/"
file_format_type = "binary"
}
}
sink {
// 您可以将本地文件传输到s3/hdfs/oss等。
CosFile {
bucket = "cosn://seatunnel-test-1259587829"
secret_id = "xxxxxxxxxxxxxxxxxxx"
secret_key = "xxxxxxxxxxxxxxxxxxx"
region = "ap-chengdu"
path = "/seatunnel/read/binary2/"
file_format_type = "binary"
}
}

Filter File

env {
parallelism = 1
job.mode = "BATCH"
}

source {
CosFile {
bucket = "cosn://seatunnel-test-1259587829"
secret_id = "xxxxxxxxxxxxxxxxxxx"
secret_key = "xxxxxxxxxxxxxxxxxxx"
region = "ap-chengdu"
path = "/seatunnel/read/binary/"
file_format_type = "binary"
// file example abcD2024.csv
file_filter_pattern = "abc[DX]*.*"
}
}

sink {
Console {
}
}

Changelog

下一个版本

-添加文件cos源连接器(4979)