跳到主要内容
版本:Next

MongoDB

MongoDB Sink Connector

Support Those Engines

Spark
Flink
SeaTunnel Zeta

Key features

Tips

1.If you want to use CDC-written features, recommend enable the upsert-enable configuration.

Description

The MongoDB Connector provides the ability to read and write data from and to MongoDB. This document describes how to set up the MongoDB connector to run data writers against MongoDB.

Supported DataSource Info

In order to use the Mongodb connector, the following dependencies are required. They can be downloaded via install-plugin.sh or from the Maven central repository.

DatasourceSupported VersionsDependency
MongoDBuniversalDownload

Data Type Mapping

The following table lists the field data type mapping from MongoDB BSON type to Seatunnel data type.

Seatunnel Data TypeMongoDB BSON Type
STRINGObjectId
STRINGString
BOOLEANBoolean
BINARYBinary
INTEGERInt32
TINYINTInt32
SMALLINTInt32
BIGINTInt64
DOUBLEDouble
FLOATDouble
DECIMALDecimal128
DateDate
TimestampTimestamp[Date]
ROWObject
ARRAYArray

Tips

1.When using SeaTunnel to write Date and Timestamp types to MongoDB, both will produce a Date data type in MongoDB, but the precision will be different. The data generated by the SeaTunnel Date type has second-level precision, while the data generated by the SeaTunnel Timestamp type has millisecond-level precision.
2.When using the DECIMAL type in SeaTunnel, be aware that the maximum range cannot exceed 34 digits, which means you should use decimal(34, 18).

Sink Options

NameTypeRequiredDefaultDescription
uriStringYes-The MongoDB standard connection uri. eg. mongodb://user:password@hosts:27017/database?readPreference=secondary&slaveOk=true.
databaseStringYes-The name of MongoDB database to read or write.
collectionStringYes-The name of MongoDB collection to read or write.
buffer-flush.max-rowsStringNo1000Specifies the maximum number of buffered rows per batch request.
buffer-flush.intervalStringNo30000Specifies the maximum interval of buffered rows per batch request, the unit is millisecond.
retry.maxStringNo3Specifies the max number of retry if writing records to database failed.
retry.intervalDurationNo1000Specifies the retry time interval if writing records to database failed, the unit is millisecond.
upsert-enableBooleanNofalseWhether to write documents via upsert mode.
primary-keyListNo-The primary keys for upsert/update. Keys are in ["id","name",...] format for properties.
transactionBooleanNofalseWhether to use transactions in MongoSink (requires MongoDB 4.2+).
common-optionsNo-Source plugin common parameters, please refer to Source Common Options for details
data_save_modeStringNoAPPEND_DATAThe data saving mode of mongodb,Option introduction,DROP_DATA:The collection will be cleared before inserting data;APPEND_DATA:Append data ;ERROR_WHEN_DATA_EXISTS:An error will be reported if there is data in the collection.

Tips

1.The data flushing logic of the MongoDB Sink Connector is jointly controlled by three parameters: buffer-flush.max-rows, buffer-flush.interval, and checkpoint.interval.
Data flushing will be triggered if any of these conditions are met.
2.Compatible with the historical parameter upsert-key. If upsert-key is set, please do not set primary-key.

How to Create a MongoDB Data Synchronization Jobs

The following example demonstrates how to create a data synchronization job that writes randomly generated data to a MongoDB database:

# Set the basic configuration of the task to be performed
env {
parallelism = 1
job.mode = "BATCH"
checkpoint.interval = 1000
}

source {
FakeSource {
row.num = 2
bigint.min = 0
bigint.max = 10000000
split.num = 1
split.read-interval = 300
schema {
fields {
c_bigint = bigint
}
}
}
}

sink {
MongoDB{
uri = mongodb://user:password@127.0.0.1:27017
database = "test"
collection = "test"
}
}

Parameter Interpretation

MongoDB Database Connection URI Examples

Unauthenticated single node connection:

mongodb://127.0.0.0:27017/mydb

Replica set connection:

mongodb://127.0.0.0:27017/mydb?replicaSet=xxx

Authenticated replica set connection:

mongodb://admin:password@127.0.0.0:27017/mydb?replicaSet=xxx&authSource=admin

Multi-node replica set connection:

mongodb://127.0.0..1:27017,127.0.0..2:27017,127.0.0.3:27017/mydb?replicaSet=xxx

Sharded cluster connection:

mongodb://127.0.0.0:27017/mydb

Multiple mongos connections:

mongodb://192.168.0.1:27017,192.168.0.2:27017,192.168.0.3:27017/mydb

Note: The username and password in the URI must be URL-encoded before being concatenated into the connection string.

Buffer Flush

sink {
MongoDB {
uri = "mongodb://user:password@127.0.0.1:27017"
database = "test_db"
collection = "users"
buffer-flush.max-rows = 2000
buffer-flush.interval = 1000
}
}

Although MongoDB has fully supported multi-document transactions since version 4.2, it doesn't mean that everyone should use them recklessly. Transactions are equivalent to locks, node coordination, additional overhead, and performance impact. Instead, the principle for using transactions should be: avoid using them if possible. The necessity for using transactions can be greatly avoided by designing systems rationally.

Idempotent Writes

By specifying a clear primary key and using the upsert method, exactly-once write semantics can be achieved.

If primary-key and upsert-enable is defined in the configuration, the MongoDB sink will use upsert semantics instead of regular INSERT statements. We combine the primary keys declared in upsert-key as the MongoDB reserved primary key and use upsert mode for writing to ensure idempotent writes. In the event of a failure, Seatunnel jobs will recover from the last successful checkpoint and reprocess, which may result in duplicate message processing during recovery. It is highly recommended to use upsert mode, as it helps to avoid violating database primary key constraints and generating duplicate data if records need to be reprocessed.

sink {
MongoDB {
uri = "mongodb://user:password@127.0.0.1:27017"
database = "test_db"
collection = "users"
upsert-enable = true
primary-key = ["name","status"]
}
}

Changelog

Change Log
ChangeCommitVersion
[Improve][API] Optimize the enumerator API semantics and reduce lock calls at the connector level (#9671)https://github.com/apache/seatunnel/commit/9212a771402.3.12
[fix][connector-mango] fix split with avgSize zero error (#9255)https://github.com/apache/seatunnel/commit/564863b9332.3.11
[Feature][Checkpoint] Add check script for source/sink state class serialVersionUID missing (#9118)https://github.com/apache/seatunnel/commit/4f5adeb1c72.3.11
[Fix][MongoDB] The Long type cannot handle string values in scientific notation (#8783)https://github.com/apache/seatunnel/commit/00f550e3d02.3.11
[Improve] sink mongodb schema is not required (#8887)https://github.com/apache/seatunnel/commit/3cfe8c12b92.3.10
[Improve] restruct connector common options (#8634)https://github.com/apache/seatunnel/commit/f3499a6eeb2.3.10
[Fix][Connector-Mongodb] close MongodbClient when close MongodbReader (#8592)https://github.com/apache/seatunnel/commit/06b2fc0e062.3.10
[Improve][dist]add shade check rule (#8136)https://github.com/apache/seatunnel/commit/51ef8000162.3.9
[Bug][connectors-v2] fix mongodb bson convert exception (#8044)https://github.com/apache/seatunnel/commit/b222c13f2f2.3.9
[Hotfix][Connector-v2] Fix the ClassCastException for connector-mongodb (#7586)https://github.com/apache/seatunnel/commit/dc43370e8c2.3.8
[Improve][Test][Connector-V2][MongoDB] Add few test cases for BsonToRowDataConverters (#7579)https://github.com/apache/seatunnel/commit/a797041e5d2.3.8
[Improve][Connector-V2][MongoDB] A BsonInt32 will be convert to a long type (#7567)https://github.com/apache/seatunnel/commit/adf26c20c52.3.8
[Improve][Connector-V2][MongoDB] Support to convert to double from any numeric type (#6997)https://github.com/apache/seatunnel/commit/c5159a27602.3.6
[bugfix][connector-mongodb] fix mongodb null value write (#6967)https://github.com/apache/seatunnel/commit/c5ecda50f82.3.6
[Improve][MongoDB] Implement TableSourceFactory to create mongodb source (#5813)https://github.com/apache/seatunnel/commit/59cccb60972.3.4
[Improve][Common] Introduce new error define rule (#5793)https://github.com/apache/seatunnel/commit/9d1b2582b22.3.4
[Improve] Remove use SeaTunnelSink::getConsumedType method and mark it as deprecated (#5755)https://github.com/apache/seatunnel/commit/8de74081002.3.4
[bugfix][mongodb] Fixed unsupported exception caused by bsonNull (#5659)https://github.com/apache/seatunnel/commit/cab864aa4d2.3.4
Support config column/primaryKey/constraintKey in schema (#5564)https://github.com/apache/seatunnel/commit/eac76b4e502.3.4
[Hotfix] Fix com.google.common.base.Preconditions to seatunnel shade one (#5284)https://github.com/apache/seatunnel/commit/ed5eadcf732.3.3
[Improve][Connector-v2][Mongodb]sink support transaction update/writing (#5034)https://github.com/apache/seatunnel/commit/b1203c905e2.3.3
[Hotfix][Connector-V2][Mongodb] Compatible with historical parameters (#4997)https://github.com/apache/seatunnel/commit/31db35bee72.3.3
[Improve][Connector-v2][Mongodb]Optimize reading logic (#5001)https://github.com/apache/seatunnel/commit/830196d8b72.3.3
[Hotfix][Connector-V2][Mongodb] Fix document error content and remove redundant code (#4982)https://github.com/apache/seatunnel/commit/526197af672.3.3
[Feature][connector-v2][mongodb] mongodb support cdc sink (#4833)https://github.com/apache/seatunnel/commit/cb651cd7f32.3.3
[Feature][Connector-v2][Mongodb]Refactor mongodb connector (#4620)https://github.com/apache/seatunnel/commit/5b1a843e402.3.2
Merge branch 'dev' into merge/cdchttps://github.com/apache/seatunnel/commit/4324ee19122.3.1
[Improve][Project] Code format with spotless plugin.https://github.com/apache/seatunnel/commit/423b5830382.3.1
[improve][api] Refactoring schema parse (#4157)https://github.com/apache/seatunnel/commit/b2f573a13e2.3.1
[Improve][build] Give the maven module a human readable name (#4114)https://github.com/apache/seatunnel/commit/d7cd6010512.3.1
[Improve][Project] Code format with spotless plugin. (#4101)https://github.com/apache/seatunnel/commit/a2ab1665612.3.1
[Feature][Connector] add get source method to all source connector (#3846)https://github.com/apache/seatunnel/commit/417178fb842.3.1
[Feature][API & Connector & Doc] add parallelism and column projection interface (#3829)https://github.com/apache/seatunnel/commit/b9164b8ba12.3.1
[Improve] mongodb connector v2 add source query capability (#3697)https://github.com/apache/seatunnel/commit/8a7fe6fcb62.3.1
[Hotfix][OptionRule] Fix option rule about all connectors (#3592)https://github.com/apache/seatunnel/commit/226dc6a1192.3.0
[Improve][Connector-V2][MongoDB] Unified exception for MongoDB source & sink connector (#3522)https://github.com/apache/seatunnel/commit/5af632e32b2.3.0
[Feature][Connector V2] expose configurable options in MongoDB (#3347)https://github.com/apache/seatunnel/commit/ffd5778efc2.3.0
[Improve][all] change Log to @Slf4j (#3001)https://github.com/apache/seatunnel/commit/6016100f122.3.0-beta
[Improve][Connector-V2] Improve mongodb connector (#2778)https://github.com/apache/seatunnel/commit/efbf793fa52.2.0-beta
[DEV][Api] Replace SeaTunnelContext with JobContext and remove singleton pattern (#2706)https://github.com/apache/seatunnel/commit/cbf82f755c2.2.0-beta
[Feature][Connector-V2] Add mongodb connecter sink (#2694)https://github.com/apache/seatunnel/commit/51c28a33872.2.0-beta
[Feature][Connector-V2] Add mongodb connecter source (#2596)https://github.com/apache/seatunnel/commit/3ee8a8a6192.2.0-beta