MongoDB & MongoDB CDC
On this page
Background Information
MongoDB is a document-oriented unstructured database that simplifies application development and expansion. The following table describes the capabilities supported by the MongoDB connector.
Features
A MongoDB Change Data Capture (CDC) source table is a streaming source table of MongoDB databases. The MongoDB connector for a MongoDB CDC source table is referred to as a MongoDB CDC connector. The MongoDB CDC connector reads full historical data from a MongoDB database and then reads operations log data. This way, data accuracy is ensured. If an error occurs, the exactly-once semantics can be used to ensure data accuracy. The MongoDB CDC connector can use the Change Stream API to efficiently capture document changes in MongoDB databases and collections, monitor document insertion, modification, replacement, and deletion events, and convert the events into changelog streams that can be processed by Flink. The MongoDB CDC connector provides the following features:
- Efficiently monitors document changes by using the Change Stream API that is supported in MongoDB 3.6.
- Ensures the exactly-once semantics for deployments that fail at any stage.
- Supports full and incremental data monitoring. After the snapshot reading phase is complete, fully managed Flink automatically switches to the incremental reading phase.
- Supports parallel reading in the initial snapshot phase. Only MongoDB 4.0 or later supports this feature.
- Supports the following startup modes:
- initial: If the MongoDB CDC connector starts for the first time, the connector performs an initial snapshot for the monitored database table and continues to read the latest operations log data.
- latest-offset: If the MongoDB CDC connector starts for the first time, the connector does not perform a snapshot for the monitored database table. The connector only reads data from the end of the operations log data. This indicates that the connector can read only data changes after the connector starts.
- timestamp: The MongoDB CDC connector skips the snapshot reading phase and reads the operations log data events from a specific timestamp. Only MongoDB 4.0 or later supports this mode.
- Supports full changelog event streams. Only MongoDB 6.0 or later supports this feature.
Prerequisites
- MongoDB CDC source table
- The MongoDB CDC connector can read data from self-managed MongoDB databases.
- The replica set feature is enabled for the MongoDB database that you want to monitor. This ensures that you can use the basic features of the MongoDB CDC connector. For more information, see Replication.
- The preimage and postimage features are enabled for the MongoDB database if you want to use the full changelog event stream feature. For more information, see Document Preimages.
- If the authentication feature of MongoDB is enabled, you must use a MongoDB user that has the following database permissions:
- splitVector
- listDatabases
- listCollections
- collStats
- find
- changeStream
- Permissions to access the config.collections and config.chunks collections
- MongoDB dimension table and result table
- A MongoDB database and table are created.
- An IP address whitelist is configured to access MongoDB.
Limits
- MongoDB CDC source table
- Only the VERA engine compatible with Flink 1.17 or later supports the MongoDB CDC connector.
- For a MongoDB database whose version is earlier than 4.0, you cannot set the scan.startup.mode parameter to timestamp.
- For a MongoDB database whose version is earlier than 6.0, full changelog event streams cannot be generated.
- MongoDB dimension table
- Only the VERA engine compatible with Flink 1.17 or later supports MongoDB dimension tables.
- MongoDB result table
- If no primary key is defined in the DDL statement that is used to create a result table, data can only be inserted into the result table but cannot be updated in or deleted from the result table.
Syntax
1CREATE TABLE tableName(
2 _id STRING,
3 [columnName dataType,]*
4 PRIMARY KEY(_id) NOT ENFORCED
5) WITH (
6 'connector' = 'mongodb',
7 'hosts' = 'localhost:27017',
8 'username' = 'mongouser',
9 'password' = '${secret_values.password}',
10 'database' = 'testdb',
11 'collection' = 'testcoll'
12)
13When you create a MongoDB CDC source table, you must declare the _id STRING column as the unique primary key.
Parameters in the WITH Clause
Common Parameters
Parameters Only for Source Tables
Parameters Only for Dimension Tables
Parameters Only for Result Tables
Data Type Mappings
CDC Source Table
Dimension Table and Result Table
Sample Code
Sample Code for a CDC Source Table
1CREATE TEMPORARY TABLE mongo_source (
2 `_id` STRING, --must be declared
3 name STRING,
4 weight DECIMAL,
5 tags ARRAY<STRING>,
6 price ROW<amount DECIMAL, currency STRING>,
7 suppliers ARRAY<ROW<name STRING, address STRING>>,
8 PRIMARY KEY(_id) NOT ENFORCED
9) WITH (
10 'connector' = 'mongodb',
11 'hosts' = 'dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,',
12 'username' = 'root',
13 'password' = '${secret_values.password}',
14 'database' = 'flinktest',
15 'collection' = 'flinkcollection'
16);
17
18CREATE TEMPORARY TABLE productssink (
19 name STRING,
20 weight DECIMAL,
21 tags ARRAY<STRING>,
22 price_amount DECIMAL,
23 suppliers_name STRING
24) WITH (
25 'connector' = 'print',
26 'logger' = 'true'
27);
28
29INSERT INTO productssink
30SELECT
31 name,
32 weight,
33 tags,
34 price.amount,
35 suppliers[1].name
36FROM
37 mongo_source;Sample Code for a Dimension Table
1CREATE TEMPORARY TABLE datagen_source (
2 id STRING,
3 a int,
4 b BIGINT,
5 `proctime` AS PROCTIME()
6) WITH (
7 'connector' = 'datagen'
8);
9
10CREATE TEMPORARY TABLE mongo_dim (
11 `_id` STRING,
12 name STRING,
13 weight DECIMAL,
14 tags ARRAY<STRING>,
15 price ROW<amount DECIMAL, currency STRING>,
16 suppliers ARRAY<ROW<name STRING, address STRING>>,
17 PRIMARY KEY(_id) NOT ENFORCED
18) WITH (
19 'connector' = 'mongodb',
20 'hosts' = 'dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,',
21 'username' = 'root',
22 'password' = '${secret_values.password}',
23 'database' = 'flinktest',
24 'collection' = 'flinkcollection',
25 'lookup.cache' = 'PARTIAL',
26 'lookup.partial-cache.expire-after-access' = '10min',
27 'lookup.partial-cache.expire-after-write' = '10min',
28 'lookup.partial-cache.max-rows' = '100'
29);
30
31CREATE TEMPORARY TABLE blackhole_sink (
32 name STRING,
33 weight DECIMAL,
34 tags ARRAY<STRING>,
35 price_amount DECIMAL,
36 suppliers_name STRING
37) WITH (
38 'connector' = 'print',
39 'logger' = 'true'
40);
41
42INSERT INTO productssink
43SELECT
44 T.id,
45 T.a,
46 T.b,
47 H.name
48FROM
49 datagen_source AS T JOIN mongo_dim FOR SYSTEM_TIME AS OF T.`proctime` AS H ON T.id = H._id;Sample Code for a Result Table
1CREATE TEMPORARY TABLE datagen_source (
2 `_id` STRING,
3 name STRING,
4 weight DECIMAL,
5 tags ARRAY<STRING>,
6 price ROW<amount DECIMAL, currency STRING>,
7 suppliers ARRAY<ROW<name STRING, address STRING>>
8) WITH (
9 'connector' = 'datagen'
10);
11
12CREATE TEMPORARY TABLE mongo_sink (
13 `_id` STRING,
14 name STRING,
15 weight DECIMAL,
16 tags ARRAY<STRING>,
17 price ROW<amount DECIMAL, currency STRING>,
18 suppliers ARRAY<ROW<name STRING, address STRING>>,
19 PRIMARY KEY(_id) NOT ENFORCED
20) WITH (
21 'connector' = 'mongodb',
22 'hosts' = 'dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,dds-bp169b982fc25****.mongodb.rds.aliyuncs.com:3717,',
23 'username' = 'root',
24 'password' = '${secret_values.password}',
25 'database' = 'flinktest',
26 'collection' = 'flinkcollection'
27);
28
29INSERT INTO mongo_sink
30SELECT * FROM datagen_source;Mongo CDC DataStream API
If you want to call a DataStream API to read or write data, you must use a DataStream connector of the related type to access fully managed Flink. For more information about how to configure a DataStream connector, see Usage of DataStream connectors.
Create a DataStream API program and use MongoDBSource. Sample code:
1MongoDBSource.builder()
2 .hosts("mongo.example.com:27017")
3 .username("mongouser")
4 .password("mongopasswd")
5 .databaseList("testdb")
6 .collectionList("testcoll")
7 .startupOptions(StartupOptions.initial())
8 .deserializer(new JsonDebeziumDeserializationSchema())
9 .build();To enable the incremental snapshot feature when you use the DataStream API, use the MongoDBSource#builder() method in the com.ververica.cdc.connectors.mongodb.source package during the construction of the MongoDBSource data source. If you do not need to enable the incremental snapshot feature, use the MongoDBSource#builder() method in the com.ververica.cdc.connectors.mongodb package.
The following table describes the parameters that you must configure during the construction of the MongoDBSource data source.