Skip to content

Commit 31db35b

Browse files
[Hotfix][Connector-V2][Mongodb] Compatible with historical parameters (apache#4997)
1 parent f8fefa1 commit 31db35b

File tree

7 files changed

+252
-3
lines changed

7 files changed

+252
-3
lines changed

docs/en/connector-v2/sink/MongoDB.md

+3-2
Original file line numberDiff line numberDiff line change
@@ -73,10 +73,11 @@ The following table lists the field data type mapping from MongoDB BSON type to
7373
| retry.interval | Duration | No | 1000 | Specifies the retry time interval if writing records to database failed, the unit is millisecond. |
7474
| upsert-enable | Boolean | No | false | Whether to write documents via upsert mode. |
7575
| primary-key | List | No | - | The primary keys for upsert/update. Keys are in `["id","name",...]` format for properties. |
76+
| common-options | | No | - | Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details |
7677

77-
**Tips**
78+
### Tips
7879

79-
> 1.The data flushing logic of the MongoDB Sink Connector is jointly controlled by three parameters: `buffer-flush.max-rows`, `buffer-flush.interval`, and `checkpoint.interval`.
80+
> 1.The data flushing logic of the MongoDB Sink Connector is jointly controlled by three parameters: `buffer-flush.max-rows`, `buffer-flush.interval`, and `checkpoint.interval`.<br/>
8081
> Data flushing will be triggered if any of these conditions are met.<br/>
8182
> 2.Compatible with the historical parameter `upsert-key`. If `upsert-key` is set, please do not set `primary-key`.<br/>
8283

docs/en/connector-v2/source/MongoDB.md

+5
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,11 @@ For specific types in MongoDB, we use Extended JSON format to map them to SeaTun
7979
| fetch.size | Int | No | 2048 | Set the number of documents obtained from the server for each batch. Setting the appropriate batch size can improve query performance and avoid the memory pressure caused by obtaining a large amount of data at one time. |
8080
| max.time-min | Long | No | 600 | This parameter is a MongoDB query option that limits the maximum execution time for query operations. The value of maxTimeMin is in Minute. If the execution time of the query exceeds the specified time limit, MongoDB will terminate the operation and return an error. |
8181
| flat.sync-string | Boolean | No | true | By utilizing flatSyncString, only one field attribute value can be set, and the field type must be a String. This operation will perform a string mapping on a single MongoDB data entry. |
82+
| common-options | | No | - | Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details |
83+
84+
### Tips
85+
86+
> 1.The parameter `match.query` is compatible with the historical old version parameter `matchQuery`, and they are equivalent replacements.<br/>
8287
8388
## How to Create a MongoDB Data Synchronization Jobs
8489

seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/config/MongodbConfig.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,8 @@ public class MongodbConfig {
5656
Options.key("match.query")
5757
.stringType()
5858
.noDefaultValue()
59-
.withDescription("Mongodb's query syntax.");
59+
.withDescription("Mongodb's query syntax.")
60+
.withFallbackKeys("matchQuery");
6061

6162
public static final Option<String> PROJECTION =
6263
Options.key("match.projection")

seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/source/MongodbSource.java

+12
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import com.google.auto.service.AutoService;
4747

4848
import java.util.ArrayList;
49+
import java.util.List;
4950

5051
import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig.CONNECTOR_IDENTITY;
5152

@@ -111,6 +112,17 @@ public void prepare(Config pluginConfig) throws PrepareFailException {
111112
splitStrategyBuilder.setMatchQuery(
112113
BsonDocument.parse(pluginConfig.getString(MongodbConfig.MATCH_QUERY.key())));
113114
}
115+
116+
List<String> fallbackKeys = MongodbConfig.MATCH_QUERY.getFallbackKeys();
117+
fallbackKeys.forEach(
118+
key -> {
119+
if (pluginConfig.hasPath(key)) {
120+
splitStrategyBuilder.setMatchQuery(
121+
BsonDocument.parse(
122+
pluginConfig.getString(MongodbConfig.MATCH_QUERY.key())));
123+
}
124+
});
125+
114126
if (pluginConfig.hasPath(MongodbConfig.SPLIT_KEY.key())) {
115127
splitStrategyBuilder.setSplitKey(pluginConfig.getString(MongodbConfig.SPLIT_KEY.key()));
116128
}

seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java

+34
Original file line numberDiff line numberDiff line change
@@ -140,4 +140,38 @@ public void testMongodbSourceSplit(TestContainer container)
140140
.collect(Collectors.toList()));
141141
clearDate(MONGODB_SPLIT_RESULT_TABLE);
142142
}
143+
144+
@TestTemplate
145+
public void testCompatibleParameters(TestContainer container)
146+
throws IOException, InterruptedException {
147+
// `upsert-key` compatible test
148+
Container.ExecResult insertResult =
149+
container.executeJob("/updateIT/fake_source_to_updateMode_insert_mongodb.conf");
150+
Assertions.assertEquals(0, insertResult.getExitCode(), insertResult.getStderr());
151+
152+
Container.ExecResult updateResult =
153+
container.executeJob("/compatibleParametersIT/fake_source_to_update_mongodb.conf");
154+
Assertions.assertEquals(0, updateResult.getExitCode(), updateResult.getStderr());
155+
156+
Container.ExecResult assertResult =
157+
container.executeJob("/updateIT/update_mongodb_to_assert.conf");
158+
Assertions.assertEquals(0, assertResult.getExitCode(), assertResult.getStderr());
159+
160+
clearDate(MONGODB_UPDATE_TABLE);
161+
162+
// `matchQuery` compatible test
163+
Container.ExecResult queryResult =
164+
container.executeJob("/matchIT/mongodb_matchQuery_source_to_assert.conf");
165+
Assertions.assertEquals(0, queryResult.getExitCode(), queryResult.getStderr());
166+
167+
Assertions.assertIterableEquals(
168+
TEST_MATCH_DATASET.stream()
169+
.filter(x -> x.get("c_int").equals(2))
170+
.peek(e -> e.remove("_id"))
171+
.collect(Collectors.toList()),
172+
readMongodbData(MONGODB_MATCH_RESULT_TABLE).stream()
173+
.peek(e -> e.remove("_id"))
174+
.collect(Collectors.toList()));
175+
clearDate(MONGODB_MATCH_RESULT_TABLE);
176+
}
143177
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
env {
19+
execution.parallelism = 1
20+
job.mode = "BATCH"
21+
#spark config
22+
spark.app.name = "SeaTunnel"
23+
spark.executor.instances = 1
24+
spark.executor.cores = 1
25+
spark.executor.memory = "1g"
26+
spark.master = local
27+
}
28+
29+
source {
30+
FakeSource {
31+
row.num = 5
32+
int.template = [2]
33+
result_table_name = "mongodb_table"
34+
schema = {
35+
fields {
36+
c_map = "map<string, string>"
37+
c_array = "array<int>"
38+
c_string = string
39+
c_boolean = boolean
40+
c_int = int
41+
c_bigint = bigint
42+
c_double = double
43+
c_bytes = bytes
44+
c_date = date
45+
c_decimal = "decimal(33, 18)"
46+
c_timestamp = timestamp
47+
c_row = {
48+
c_map = "map<string, string>"
49+
c_array = "array<int>"
50+
c_string = string
51+
c_boolean = boolean
52+
c_int = int
53+
c_bigint = bigint
54+
c_double = double
55+
c_bytes = bytes
56+
c_date = date
57+
c_decimal = "decimal(33, 18)"
58+
c_timestamp = timestamp
59+
}
60+
}
61+
}
62+
}
63+
}
64+
65+
sink {
66+
MongoDB {
67+
uri = "mongodb://e2e_mongodb:27017/test_db?retryWrites=true"
68+
database = "test_db"
69+
collection = "test_update_table"
70+
upsert-enable = true
71+
// compatible parameters
72+
upsert-key = ["c_int"]
73+
source_table_name = "mongodb_table"
74+
schema = {
75+
fields {
76+
c_map = "map<string, string>"
77+
c_array = "array<int>"
78+
c_string = string
79+
c_boolean = boolean
80+
c_int = int
81+
c_bigint = bigint
82+
c_double = double
83+
c_bytes = bytes
84+
c_date = date
85+
c_decimal = "decimal(33, 18)"
86+
c_timestamp = timestamp
87+
c_row = {
88+
c_map = "map<string, string>"
89+
c_array = "array<int>"
90+
c_string = string
91+
c_boolean = boolean
92+
c_int = int
93+
c_bigint = bigint
94+
c_double = double
95+
c_bytes = bytes
96+
c_date = date
97+
c_decimal = "decimal(33, 18)"
98+
c_timestamp = timestamp
99+
}
100+
}
101+
}
102+
}
103+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
env {
19+
execution.parallelism = 1
20+
job.mode = "BATCH"
21+
#spark config
22+
spark.app.name = "SeaTunnel"
23+
spark.executor.instances = 1
24+
spark.executor.cores = 1
25+
spark.executor.memory = "1g"
26+
spark.master = local
27+
}
28+
29+
source {
30+
MongoDB {
31+
uri = "mongodb://e2e_mongodb:27017/test_db"
32+
database = "test_db"
33+
collection = "test_match_op_db"
34+
result_table_name = "mongodb_table"
35+
// compatible parameters
36+
matchQuery = "{c_int: 2}"
37+
cursor.no-timeout = true
38+
fetch.size = 1000
39+
max.time-min = 100
40+
schema = {
41+
fields {
42+
c_map = "map<string, string>"
43+
c_array = "array<int>"
44+
c_string = string
45+
c_boolean = boolean
46+
c_int = int
47+
c_bigint = bigint
48+
c_double = double
49+
c_row = {
50+
c_map = "map<string, string>"
51+
c_array = "array<int>"
52+
c_string = string
53+
c_boolean = boolean
54+
c_int = int
55+
c_bigint = bigint
56+
c_double = double
57+
}
58+
}
59+
}
60+
}
61+
}
62+
63+
sink {
64+
Console {
65+
source_table_name = "mongodb_table"
66+
}
67+
MongoDB {
68+
uri = "mongodb://e2e_mongodb:27017/test_db?retryWrites=true"
69+
database = "test_db"
70+
collection = "test_match_op_result_db"
71+
source_table_name = "mongodb_table"
72+
schema = {
73+
fields {
74+
c_map = "map<string, string>"
75+
c_array = "array<int>"
76+
c_string = string
77+
c_boolean = boolean
78+
c_int = int
79+
c_bigint = bigint
80+
c_double = double
81+
c_row = {
82+
c_map = "map<string, string>"
83+
c_array = "array<int>"
84+
c_string = string
85+
c_boolean = boolean
86+
c_int = int
87+
c_bigint = bigint
88+
c_double = double
89+
}
90+
}
91+
}
92+
}
93+
}

0 commit comments

Comments
 (0)