跳到主要内容
版本:2.3.2

Hive

Hive sink connector

Description

Write data to Hive.

提示

In order to use this connector, You must ensure your spark/flink cluster already integrated hive. The tested hive version is 2.3.9.

If you use SeaTunnel Engine, You need put seatunnel-hadoop3-3.1.4-uber.jar and hive-exec-2.3.9.jar in $SEATUNNEL_HOME/lib/ dir.

Key features

By default, we use 2PC commit to ensure exactly-once

  • file format
    • text
    • csv
    • parquet
    • orc
    • json
  • compress codec
    • lzo

Options

nametyperequireddefault value
table_namestringyes-
metastore_uristringyes-
compress_codecstringnonone
hdfs_site_pathstringno-
kerberos_principalstringno-
kerberos_keytab_pathstringno-
common-optionsno-

table_name [string]

Target Hive table name eg: db1.table1

metastore_uri [string]

Hive metastore uri

hdfs_site_path [string]

The path of hdfs-site.xml, used to load ha configuration of namenodes

kerberos_principal [string]

The principal of kerberos

kerberos_keytab_path [string]

The keytab path of kerberos

common options

Sink plugin common parameters, please refer to Sink Common Options for details

Example


Hive {
table_name = "default.seatunnel_orc"
metastore_uri = "thrift://namenode001:9083"
}

example 1

We have a source table like this:

create table test_hive_source(
test_tinyint TINYINT,
test_smallint SMALLINT,
test_int INT,
test_bigint BIGINT,
test_boolean BOOLEAN,
test_float FLOAT,
test_double DOUBLE,
test_string STRING,
test_binary BINARY,
test_timestamp TIMESTAMP,
test_decimal DECIMAL(8,2),
test_char CHAR(64),
test_varchar VARCHAR(64),
test_date DATE,
test_array ARRAY<INT>,
test_map MAP<STRING, FLOAT>,
test_struct STRUCT<street:STRING, city:STRING, state:STRING, zip:INT>
)
PARTITIONED BY (test_par1 STRING, test_par2 STRING);

We need read data from the source table and write to another table:

create table test_hive_sink_text_simple(
test_tinyint TINYINT,
test_smallint SMALLINT,
test_int INT,
test_bigint BIGINT,
test_boolean BOOLEAN,
test_float FLOAT,
test_double DOUBLE,
test_string STRING,
test_binary BINARY,
test_timestamp TIMESTAMP,
test_decimal DECIMAL(8,2),
test_char CHAR(64),
test_varchar VARCHAR(64),
test_date DATE
)
PARTITIONED BY (test_par1 STRING, test_par2 STRING);

The job config file can like this:

env {
# You can set flink configuration here
parallelism = 3
job.name="test_hive_source_to_hive"
}

source {
Hive {
table_name = "test_hive.test_hive_source"
metastore_uri = "thrift://ctyun7:9083"
}
}

sink {
# choose stdout output plugin to output data to console

Hive {
table_name = "test_hive.test_hive_sink_text_simple"
metastore_uri = "thrift://ctyun7:9083"
}
}

Changelog

2.2.0-beta 2022-09-26

  • Add Hive Sink Connector

2.3.0-beta 2022-10-20

  • [Improve] Hive Sink supports automatic partition repair (3133)

2.3.0 2022-12-30

  • [BugFix] Fixed the following bugs that failed to write data to files (3258)
    • When field from upstream is null it will throw NullPointerException
    • Sink columns mapping failed
    • When restore writer from states getting transaction directly failed

Next version

  • [Improve] Support kerberos authentication (3840)
  • [Improve] Added partition_dir_expression validation logic (3886)