跳到主要内容
版本:Next

CosFile

Cos 文件接收器连接器

描述

将数据输出到cos文件系统.

:::提示

如果你使用spark/flink,为了使用这个连接器,你必须确保你的spark/flilk集群已经集成了hadoop。测试的hadoop版本是2.x

如果你使用SeaTunnel Engine,当你下载并安装SeaTunnel引擎时,它会自动集成hadoop jar。您可以在${SEATUNNEL_HOME}/lib下检查jar包以确认这一点.

要使用此连接器,您需要将hadoop cos-{hadoop.version}-{version}.jar和cos_api-bundle-{version}.jar位于${SEATUNNEL_HOME}/lib目录中,下载:Hoop cos发布. 它只支持hadoop 2.6.5+和8.0.2版本+.

:::

关键特性

默认情况下,我们使用2PC commit来确保 精确一次

  • 文件格式类型
    • text
    • csv
    • parquet
    • orc
    • json
    • excel
    • xml
    • binary

选项

名称类型必需默认值描述
pathstring-
tmp_pathstring/tmp/seatunnel结果文件将首先写入tmp路径,然后使用“mv”将tmp目录提交到目标目录。需要一个COS目录.
bucketstring-
secret_idstring-
secret_keystring-
regionstring-
custom_filenamebooleanfalse是否需要自定义文件名
file_name_expressionstring"${transactionId}"仅在custom_filename为true时使用
filename_time_formatstring"yyyy.MM.dd"仅在custom_filename为true时使用
file_format_typestring"csv"
field_delimiterstring'\001'仅在file_format为text时使用
row_delimiterstring"\n"仅在file_format为text时使用
have_partitionbooleanfalse是否需要处理分区.
partition_byarray-只有在have_partition为true时才使用
partition_dir_expressionstring"${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/"只有在have_partition为true时才使用
is_partition_field_write_in_filebooleanfalse只有在have_partition为true时才使用
sink_columnsarray当此参数为空时,所有字段都是接收列
is_enable_transactionbooleantrue
batch_sizeint1000000
compress_codecstringnone
common-optionsobject-
max_rows_in_memoryint-仅在file_format为excel时使用.
sheet_namestringSheet${Random number}仅在file_format为excel时使用.
csv_string_quote_modeenumMINIMAL仅在file_format为csv时使用.
xml_root_tagstringRECORDS仅在file_format为xml时使用.
xml_row_tagstringRECORD仅在file_format为xml时使用.
xml_use_attr_formatboolean-仅在file_format为xml时使用.
single_file_modebooleanfalse每个并行处理只会输出一个文件。启用此参数后,batch_size将不会生效。输出文件名没有文件块后缀.
create_empty_file_when_no_databooleanfalse当上游没有数据同步时,仍然会生成相应的数据文件.
parquet_avro_write_timestamp_as_int96booleanfalse仅在file_format为parquet时使用.
parquet_avro_write_fixed_as_int96array-仅在file_format为parquet时使用.
encodingstring"UTF-8"仅当file_format_type为json、text、csv、xml时使用.

path [string]

目标目录路径是必需的.

bucket [string]

cos文件系统的bucket地址,例如:cosn://seatunnel-test-1259587829

secret_id [string]

cos文件系统的密钥id.

secret_key [string]

cos文件系统的密钥.

region [string]

cos文件系统的分区.

custom_filename [boolean]

是否自定义文件名

file_name_expression [string]

仅在 custom_filenametrue时使用

file_name_expression描述了将在path中创建的文件表达式。我们可以在file_name_expression中添加变量${now}${uuid},类似于test_${uuid}_${now}${now}表示当前时间,其格式可以通过指定选项filename_time_format来定义.

请注意,如果is_enable_transactiontrue,我们将自动添加${transactionId}_在文件的开头

filename_time_format [string]

仅在 custom_filenametrue 时使用`

file_name_expression 参数中的格式为 xxxx-${now} 时,filename_time_format 可以指定路径的时间格式,默认值为 yyyy.MM.dd。常用的时间格式如下:

符号描述
y
M
d
H时 (0-23)
m
s

file_format_type [string]

我们支持以下文件类型:

text csv parquet orc json excel xml binary

请注意,最终文件名将以 file_format 的后缀结尾, 文本文件的后缀为 txt.

field_delimiter [string]

数据行中列之间的分隔符. 仅需要 text 文件格式.

row_delimiter [string]

文件中行之间的分隔符. 只需要 text 文件格式.

have_partition [boolean]

是否需要处理分区.

partition_by [array]

仅在 have_partitiontrue 时使用.

基于选定字段对数据进行分区.

partition_dir_expression [string]

仅在 have_partitiontrue 时使用.

如果指定了 partition_by ,我们将根据分区信息生成相应的分区目录,并将最终文件放置在分区目录中。 默认的 partition_dir_expression${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/. k0 是第一个分区字段 , v0 是第一个划分字段的值.

is_partition_field_write_in_file [boolean]

仅在 have_partitiontrue 时使用.

如果 is_partition_field_write_in_filetrue, 分区字段及其值将写入数据文件.

例如,如果你想写一个Hive数据文件,它的值应该是 false.

sink_columns [array]

哪些列需要写入文件,默认值是从 TransformSource 获取的所有列. 字段的顺序决定了文件实际写入的顺序.

is_enable_transaction [boolean]

如果 is_enable_transactiontrue, 我们将确保数据在写入目标目录时不会丢失或重复.

请注意,如果 is_enable_transactiontrue, 我们将自动添加 ${transactionId}_ 在文件的开头.

现在只支持 true .

batch_size [int]

文件中的最大行数。对于SeaTunnel引擎,文件中的行数由 batch_sizecheckpoint.interval 共同决定. 如果 checkpoint.interval 的值足够大, 接收器写入程序将在文件中写入行,直到文件中的行大于 batch_size. 如果 checkpoint.interval 较小, 则接收器写入程序将在新的检查点触发时创建一个新文件.

compress_codec [string]

文件的压缩编解码器和支持的详细信息如下所示:

  • txt: lzo none
  • json: lzo none
  • csv: lzo none
  • orc: lzo snappy lz4 zlib none
  • parquet: lzo snappy lz4 gzip brotli zstd none

Tips: excel 类型不支持任何压缩格式

common options

接收器写入插件常用参数,请参考 Sink Common Options 了解详细信息.

max_rows_in_memory [int]

当文件格式为Excel时,内存中可以缓存的最大数据项数.

sheet_name [string]

编写工作簿的工作表

csv_string_quote_mode [string]

当文件格式为CSV时,CSV的字符串引用模式.

  • ALL: 所有字符串字段都将被引用.
  • MINIMAL: 引号字段包含特殊字符,如字段分隔符、引号字符或行分隔符字符串中的任何字符.
  • NONE: 从不引用字段。当分隔符出现在数据中时,打印机会用转义符作为前缀。如果未设置转义符,格式验证将抛出异常.

xml_root_tag [string]

指定XML文件中根元素的标记名.

xml_row_tag [string]

指定XML文件中数据行的标记名称.

xml_use_attr_format [boolean]

指定是否使用标记属性格式处理数据.

parquet_avro_write_timestamp_as_int96 [boolean]

支持从时间戳写入Parquet INT96,仅适用于拼花地板文件.

parquet_avro_write_fixed_as_int96 [array]

支持从12字节字段写入Parquet INT96,仅适用于拼花地板文件.

encoding [string]

仅当file_format_type为json、text、csv、xml时使用. 要写入的文件的编码。此参数将由Charset.forName(encoding) 解析.

示例

对于具有 have_partitioncustom_filenamesink_columns 的文本文件格式


CosFile {
path="/sink"
bucket = "cosn://seatunnel-test-1259587829"
secret_id = "xxxxxxxxxxxxxxxxxxx"
secret_key = "xxxxxxxxxxxxxxxxxxx"
region = "ap-chengdu"
file_format_type = "text"
field_delimiter = "\t"
row_delimiter = "\n"
have_partition = true
partition_by = ["age"]
partition_dir_expression = "${k0}=${v0}"
is_partition_field_write_in_file = true
custom_filename = true
file_name_expression = "${transactionId}_${now}"
filename_time_format = "yyyy.MM.dd"
sink_columns = ["name","age"]
is_enable_transaction = true
}

适用于带有have_partitionsink_columns的parquet 文件格式`


CosFile {
path="/sink"
bucket = "cosn://seatunnel-test-1259587829"
secret_id = "xxxxxxxxxxxxxxxxxxx"
secret_key = "xxxxxxxxxxxxxxxxxxx"
region = "ap-chengdu"
have_partition = true
partition_by = ["age"]
partition_dir_expression = "${k0}=${v0}"
is_partition_field_write_in_file = true
file_format_type = "parquet"
sink_columns = ["name","age"]
}

对于orc文件格式的简单配置


CosFile {
path="/sink"
bucket = "cosn://seatunnel-test-1259587829"
secret_id = "xxxxxxxxxxxxxxxxxxx"
secret_key = "xxxxxxxxxxxxxxxxxxx"
region = "ap-chengdu"
file_format_type = "orc"
}

变更日志

下一个版本

  • 添加文件cos接收器连接器 (4979)