跳到主要内容
版本:Next

S3Redshift

S3Redshift的作用是将数据写入S3,然后使用Redshift的COPY命令将数据从S3导入Redshift。

描述

将数据输出到AWS Redshift。

提示:

我们基于[S3File](S3File.md)来实现这个连接器。因此,您可以使用与S3File相同的配置。 为了支持更多的文件类型,我们进行了一些权衡,因此我们使用HDFS协议对S3进行内部访问,而这个连接器需要一些hadoop依赖。 它只支持hadoop版本2.6.5+

主要特性

默认情况下,我们使用2PC commit来确保“精确一次”`

  • 文件格式类型
    • text
    • csv
    • parquet
    • orc
    • json

参数

名称类型是否必填默认值
jdbc_urlstring-
jdbc_userstring-
jdbc_passwordstring-
execute_sqlstring-
pathstring-
bucketstring-
access_keystring-
access_secretstring-
hadoop_s3_propertiesmap-
file_name_expressionstring"${transactionId}"
file_format_typestring"text"
filename_time_formatstring"yyyy.MM.dd"
field_delimiterstring'\001'
row_delimiterstring"\n"
partition_byarray-
partition_dir_expressionstring"${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/"
is_partition_field_write_in_filebooleanfalse
sink_columnsarray当此参数为空时,所有字段都是sink列
is_enable_transactionbooleantrue
batch_sizeint1000000
common-options-

jdbc_url

连接到Redshift数据库的JDBC URL。

jdbc_user

连接到Redshift数据库的用户名。

jdbc_password

连接到Redshift数据库的密码。

execute_sql

数据写入S3后要执行的SQL。

示例:


COPY target_table FROM 's3://yourbucket${path}' IAM_ROLE 'arn:XXX' REGION 'your region' format as json 'auto';

target_table是Redshift中的表名。

${path}是写入S3的文件的路径。请确认您的sql包含此变量。并且不需要替换它。我们将在执行sql时替换它。 IAM_ROLE是有权访问S3的角色。 format是写入S3的文件的格式。请确认此格式与您在配置中设置的文件格式相同。

请参阅Redshift COPY了解更多详情。

请确认该角色有权访问S3。

path [string]

目标目录路径是必填项。

bucket [string]

s3文件系统的bucket地址,例如:s3n://seatunnel-test,如果使用s3a协议,则此参数应为s3a://seatunnel-test

access_key [string]

s3文件系统的access_key。如果未设置此参数,请确认凭据提供程序链可以正确进行身份验证,您可以检查这个hadoop-aws

access_secret [string]

s3文件系统的access_secret。如果未设置此参数,请确认凭据提供程序链可以正确进行身份验证,您可以检查这个hadoop-aws

hadoop_s3_properties [map]

如果您需要添加其他选项,可以在此处添加并参考Hadoop-AWS

hadoop_s3_properties {
"fs.s3a.aws.credentials.provider" = "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider"
}

file_name_expression [string]

file_name_expression描述了将在path中创建的文件表达式。我们可以在file_name_expression中添加变量${now}${uuid},类似于test_${uuid}_${now}${now}表示当前时间,其格式可以通过指定选项filename_time_format来定义。 请注意,如果is_enable_transactiontrue,我们将自动添加${transactionId}_在文件的开头。

file_format_type [string]

我们支持以下文件类型:

text csv parquet orc json

请注意,最终文件名将以file_format_type的后缀结尾,文本文件的后缀为“txt”。

filename_time_format [string]

file_name_expression参数中的格式为xxxx-${now}时,filename_time_format可以指定路径的时间格式,默认值为yyyy.MM.dd。常用的时间格式如下:

SymbolDescription
yYear
MMonth
dDay of month
HHour in day (0-23)
mMinute in hour
sSecond in minute

请参阅Java SimpleDateFormat了解详细的时间格式语法。

field_delimiter [string]

数据行中列之间的分隔符。仅被“text”和“csv”文件格式需要。

row_delimiter [string]

文件中行之间的分隔符。仅被“text”和“csv”文件格式需要。

partition_by [array]

基于选定字段对数据进行分区

partition_dir_expression [string]

如果指定了partition_by,我们将根据分区信息生成相应的分区目录,并将最终文件放置在分区目录中。

默认的partition_dir_expression${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/k0是第一个分区字段,v0是第一个划分字段的值。

is_partition_field_write_in_file [boolean]

如果is_partition_field_write_in_filetrue,则分区字段及其值将写入数据文件。

例如,如果你想写一个Hive数据文件,它的值应该是“false”。

sink_columns [array]

哪些列需要写入文件,默认值是从“Transform”或“Source”获取的所有列。 字段的顺序决定了文件实际写入的顺序。

is_enable_transaction [boolean]

如果is_enable_transaction为true,我们将确保数据在写入目标目录时不会丢失或重复。 请注意,如果is_enable_transactiontrue,我们将自动添加${transactionId}_在文件的开头。 现在只支持“true”。

batch_size [int]

文件中的最大行数。对于SeaTunnel引擎,文件中的行数由“batch_size”和“checkpoint.interval”共同决定。如果“checkpoint.interval”的值足够大,sink writer将在文件中写入行,直到文件中的行大于“batch_size”。如果“checkpoint.interval”较小,则接收器写入程序将在新的检查点触发时创建一个新文件。

common options

Sink插件常用参数,请参考[Sink Common Options](../sink-common-options.md)了解详细信息。

示例

用于 text 文件格式


S3Redshift {
jdbc_url = "jdbc:redshift://xxx.amazonaws.com.cn:5439/xxx"
jdbc_user = "xxx"
jdbc_password = "xxxx"
execute_sql="COPY table_name FROM 's3://test${path}' IAM_ROLE 'arn:aws-cn:iam::xxx' REGION 'cn-north-1' removequotes emptyasnull blanksasnull maxerror 100 delimiter '|' ;"
access_key = "xxxxxxxxxxxxxxxxx"
secret_key = "xxxxxxxxxxxxxxxxx"
bucket = "s3a://seatunnel-test"
tmp_path = "/tmp/seatunnel"
path="/seatunnel/text"
row_delimiter="\n"
partition_dir_expression="${k0}=${v0}"
is_partition_field_write_in_file=true
file_name_expression="${transactionId}_${now}"
file_format_type = "text"
filename_time_format="yyyy.MM.dd"
is_enable_transaction=true
hadoop_s3_properties {
"fs.s3a.aws.credentials.provider" = "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider"
}
}

用于 parquet 文件格式


S3Redshift {
jdbc_url = "jdbc:redshift://xxx.amazonaws.com.cn:5439/xxx"
jdbc_user = "xxx"
jdbc_password = "xxxx"
execute_sql="COPY table_name FROM 's3://test${path}' IAM_ROLE 'arn:aws-cn:iam::xxx' REGION 'cn-north-1' format as PARQUET;"
access_key = "xxxxxxxxxxxxxxxxx"
secret_key = "xxxxxxxxxxxxxxxxx"
bucket = "s3a://seatunnel-test"
tmp_path = "/tmp/seatunnel"
path="/seatunnel/parquet"
row_delimiter="\n"
partition_dir_expression="${k0}=${v0}"
is_partition_field_write_in_file=true
file_name_expression="${transactionId}_${now}"
file_format_type = "parquet"
filename_time_format="yyyy.MM.dd"
is_enable_transaction=true
hadoop_s3_properties {
"fs.s3a.aws.credentials.provider" = "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider"
}
}

用于 orc 文件格式


S3Redshift {
jdbc_url = "jdbc:redshift://xxx.amazonaws.com.cn:5439/xxx"
jdbc_user = "xxx"
jdbc_password = "xxxx"
execute_sql="COPY table_name FROM 's3://test${path}' IAM_ROLE 'arn:aws-cn:iam::xxx' REGION 'cn-north-1' format as ORC;"
access_key = "xxxxxxxxxxxxxxxxx"
secret_key = "xxxxxxxxxxxxxxxxx"
bucket = "s3a://seatunnel-test"
tmp_path = "/tmp/seatunnel"
path="/seatunnel/orc"
row_delimiter="\n"
partition_dir_expression="${k0}=${v0}"
is_partition_field_write_in_file=true
file_name_expression="${transactionId}_${now}"
file_format_type = "orc"
filename_time_format="yyyy.MM.dd"
is_enable_transaction=true
hadoop_s3_properties {
"fs.s3a.aws.credentials.provider" = "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider"
}
}

变更日志

2.3.0-beta 2022-10-20