Milvus
Milvus sink connector
Description
This Milvus sink connector write data to Milvus or Zilliz Cloud, it has the following features:
- support read and write data by partition
- support write dynamic schema data from Metadata Column
- json data will be converted to json string and sink as json as well
- retry automatically to bypass ratelimit and grpc limit
Key Features
Data Type Mapping
| Milvus Data Type | SeaTunnel Data Type |
|---|---|
| INT8 | TINYINT |
| INT16 | SMALLINT |
| INT32 | INT |
| INT64 | BIGINT |
| FLOAT | FLOAT |
| DOUBLE | DOUBLE |
| BOOL | BOOLEAN |
| JSON | STRING |
| ARRAY | ARRAY |
| VARCHAR | STRING |
| FLOAT_VECTOR | FLOAT_VECTOR |
| BINARY_VECTOR | BINARY_VECTOR |
| FLOAT16_VECTOR | FLOAT16_VECTOR |
| BFLOAT16_VECTOR | BFLOAT16_VECTOR |
| SPARSE_FLOAT_VECTOR | SPARSE_FLOAT_VECTOR |
Sink Options
| Name | Type | Required | Default | Description |
|---|---|---|---|---|
| url | String | Yes | - | The URL to connect to Milvus or Zilliz Cloud. |
| token | String | Yes | - | User:password |
| database | String | No | - | Write data to which database, default is source database. |
| schema_save_mode | enum | No | CREATE_SCHEMA_WHEN_NOT_EXIST | Auto create table when table not exist. |
| enable_auto_id | boolean | No | false | Primary key column enable autoId. |
| enable_upsert | boolean | No | false | Upsert data not insert. |
| enable_dynamic_field | boolean | No | true | Enable create table with dynamic field. |
| batch_size | int | No | 1000 | Write batch size. |
| partition_key | String | No | Milvus partition key field | |
| create_index | boolean | No | false | Automatically create vector indexes for collection to improve query performance. |
| load_collection | boolean | No | false | Load collection into Milvus memory for immediate query availability. |
| collection_description | Map<String, String> | No | {} | Collection descriptions map where key is collection name and value is description. |
Task Example
Basic Configuration
sink {
Milvus {
url = "http://127.0.0.1:19530"
token = "username:password"
batch_size = 1000
}
}
Advanced Configuration with Index and Loading
sink {
Milvus {
url = "http://127.0.0.1:19530"
token = "username:password"
batch_size = 1000
create_index = true
load_collection = true
collection_description = {
"user_vectors" = "User embedding vectors for recommendation"
"product_vectors" = "Product feature vectors for search"
}
}
}
Changelog
Change Log
| Change | Commit | Version |
|---|---|---|
| [Feature][Transform-V2] Support vector series sql function (#9765) | https://github.com/apache/seatunnel/commit/a40114cf7a | 2.3.12 |
| [Improve][Connector-milvus]update milvus-sdk-java to 2.5.11 (#9710) | https://github.com/apache/seatunnel/commit/08ebbaa8bd | 2.3.12 |
| [Chore] fix typos filed -> field (#9757) | https://github.com/apache/seatunnel/commit/e3e1c67d29 | 2.3.12 |
| [Improve][Connector-V2] Optimize Milvus doc and e2e test case (#9766) | https://github.com/apache/seatunnel/commit/e67466f73e | 2.3.12 |
| [Improve][API] Optimize the enumerator API semantics and reduce lock calls at the connector level (#9671) | https://github.com/apache/seatunnel/commit/9212a77140 | 2.3.12 |
| [Improve][API] Add metadata schema into catalog table (#9586) | https://github.com/apache/seatunnel/commit/385814e7f1 | 2.3.12 |
| [Feature][Transform] Support define sink column type (#9114) | https://github.com/apache/seatunnel/commit/ab7119e507 | 2.3.11 |
| [Feature][Checkpoint] Add check script for source/sink state class serialVersionUID missing (#9118) | https://github.com/apache/seatunnel/commit/4f5adeb1c7 | 2.3.11 |
| [improve] milvus options (#9165) | https://github.com/apache/seatunnel/commit/5247e17640 | 2.3.11 |
| [Fix][Connector-V2] Fix load state check in MilvusSourceReader to consider partition-level status (#8937) | https://github.com/apache/seatunnel/commit/bde235090b | 2.3.10 |
| [Improve][dist]add shade check rule (#8136) | https://github.com/apache/seatunnel/commit/51ef800016 | 2.3.9 |
| [Improve][Core] Refactor common options of column/row (#7911) | https://github.com/apache/seatunnel/commit/d1582afee6 | 2.3.9 |
| [Feature][connector-milvus] update milvus connector to support dynamic schema, failed retry, etc. (#7885) | https://github.com/apache/seatunnel/commit/6a31f91729 | 2.3.9 |
| [Feature][Restapi] Allow metrics information to be associated to logical plan nodes (#7786) | https://github.com/apache/seatunnel/commit/6b7c53d03c | 2.3.9 |
| [Fix][Connector-V2] Fix known directory create and delete ignore issues (#7700) | https://github.com/apache/seatunnel/commit/e2fb679577 | 2.3.8 |
| [Improve][Connector-V2] Optimize milvus code (#7691) | https://github.com/apache/seatunnel/commit/1eddb8e1b1 | 2.3.8 |
| [Improve][Connector-V2] Optimize milvus-connector config code (#7658) | https://github.com/apache/seatunnel/commit/f831f7a5ec | 2.3.8 |
| [Improve][Connector-V2] update vectorType (#7446) | https://github.com/apache/seatunnel/commit/1bba72385b | 2.3.8 |
| [Improve][API] Move catalog open to SaveModeHandler (#7439) | https://github.com/apache/seatunnel/commit/8c2c5c79a1 | 2.3.8 |
| [Feature][Connector-V2] Fake Source support produce vector data (#7401) | https://github.com/apache/seatunnel/commit/6937d10ac3 | 2.3.8 |
| [Feature][Connector-V2][Milvus] Support Milvus source & sink (#7158) | https://github.com/apache/seatunnel/commit/0c69b9166e | 2.3.6 |