Skip to content

Commit c5ecda5

Browse files
authored
[bugfix][connector-mongodb] fix mongodb null value write (apache#6967)
1 parent 80cf913 commit c5ecda5

File tree

4 files changed

+138
-7
lines changed

4 files changed

+138
-7
lines changed

seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/serde/RowDataToBsonConverters.java

+1-5
Original file line numberDiff line numberDiff line change
@@ -92,11 +92,7 @@ private static SerializableFunction<Object, BsonValue> wrapIntoNullSafeInternalC
9292
@Override
9393
public BsonValue apply(Object value) {
9494
if (value == null || NULL.equals(type.getSqlType())) {
95-
throw new MongodbConnectorException(
96-
UNSUPPORTED_DATA_TYPE,
97-
"The column type is <"
98-
+ type
99-
+ ">, but a null value is being written into it");
95+
return new BsonNull();
10096
} else {
10197
return internalConverter.apply(value);
10298
}

seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/AbstractMongodbIT.java

+28-2
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,8 @@ public abstract class AbstractMongodbIT extends TestSuiteBase implements TestRes
5858

5959
protected static final List<Document> TEST_SPLIT_DATASET = generateTestDataSet(10);
6060

61+
protected static final List<Document> TEST_NULL_DATASET = generateTestDataSetWithNull(10);
62+
6163
protected static final String MONGODB_IMAGE = "mongo:latest";
6264

6365
protected static final String MONGODB_CONTAINER_HOST = "e2e_mongodb";
@@ -70,6 +72,10 @@ public abstract class AbstractMongodbIT extends TestSuiteBase implements TestRes
7072

7173
protected static final String MONGODB_SPLIT_TABLE = "test_split_op_db";
7274

75+
protected static final String MONGODB_NULL_TABLE = "test_null_op_db";
76+
77+
protected static final String MONGODB_NULL_TABLE_RESULT = "test_null_op_db_result";
78+
7379
protected static final String MONGODB_MATCH_RESULT_TABLE = "test_match_op_result_db";
7480

7581
protected static final String MONGODB_SPLIT_RESULT_TABLE = "test_split_op_result_db";
@@ -101,15 +107,18 @@ public void initConnection() {
101107
protected void initSourceData() {
102108
MongoCollection<Document> sourceMatchTable =
103109
client.getDatabase(MONGODB_DATABASE).getCollection(MONGODB_MATCH_TABLE);
104-
105110
sourceMatchTable.deleteMany(new Document());
106111
sourceMatchTable.insertMany(TEST_MATCH_DATASET);
107112

108113
MongoCollection<Document> sourceSplitTable =
109114
client.getDatabase(MONGODB_DATABASE).getCollection(MONGODB_SPLIT_TABLE);
110-
111115
sourceSplitTable.deleteMany(new Document());
112116
sourceSplitTable.insertMany(TEST_SPLIT_DATASET);
117+
118+
MongoCollection<Document> sourceNullTable =
119+
client.getDatabase(MONGODB_DATABASE).getCollection(MONGODB_NULL_TABLE);
120+
sourceNullTable.deleteMany(new Document());
121+
sourceNullTable.insertMany(TEST_NULL_DATASET);
113122
}
114123

115124
protected void clearDate(String table) {
@@ -169,6 +178,23 @@ public static List<Document> generateTestDataSet(int count) {
169178
return dataSet;
170179
}
171180

181+
public static List<Document> generateTestDataSetWithNull(int count) {
182+
List<Document> dataSet = new ArrayList<>();
183+
184+
for (int i = 0; i < count; i++) {
185+
dataSet.add(
186+
new Document("c_map", null)
187+
.append("c_array", null)
188+
.append("c_string", null)
189+
.append("c_boolean", null)
190+
.append("c_int", null)
191+
.append("c_bigint", null)
192+
.append("c_double", null)
193+
.append("c_row", null));
194+
}
195+
return dataSet;
196+
}
197+
172198
protected static String randomString() {
173199
int length = RANDOM.nextInt(10) + 1;
174200
StringBuilder sb = new StringBuilder(length);

seatunnel-e2e/seatunnel-connector-v2-e2e/connector-mongodb-e2e/src/test/java/org/apache/seatunnel/e2e/connector/v2/mongodb/MongodbIT.java

+20
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@
1717

1818
package org.apache.seatunnel.e2e.connector.v2.mongodb;
1919

20+
import org.apache.seatunnel.e2e.common.container.EngineType;
2021
import org.apache.seatunnel.e2e.common.container.TestContainer;
22+
import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
2123

2224
import org.bson.Document;
2325
import org.junit.jupiter.api.Assertions;
@@ -43,6 +45,24 @@ public void testMongodbSourceAndSink(TestContainer container)
4345
clearDate(MONGODB_SINK_TABLE);
4446
}
4547

48+
@TestTemplate
49+
@DisabledOnContainer(
50+
value = {},
51+
type = {EngineType.FLINK, EngineType.SPARK},
52+
disabledReason = "Currently SPARK and FLINK do not support mongodb null value write")
53+
public void testMongodbNullValue(TestContainer container)
54+
throws IOException, InterruptedException {
55+
Container.ExecResult nullResult = container.executeJob("/mongodb_null_value.conf");
56+
Assertions.assertEquals(0, nullResult.getExitCode(), nullResult.getStderr());
57+
Assertions.assertIterableEquals(
58+
TEST_NULL_DATASET.stream().peek(e -> e.remove("_id")).collect(Collectors.toList()),
59+
readMongodbData(MONGODB_NULL_TABLE_RESULT).stream()
60+
.peek(e -> e.remove("_id"))
61+
.collect(Collectors.toList()));
62+
clearDate(MONGODB_NULL_TABLE);
63+
clearDate(MONGODB_NULL_TABLE_RESULT);
64+
}
65+
4666
@TestTemplate
4767
public void testMongodbSourceMatch(TestContainer container)
4868
throws IOException, InterruptedException {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
env {
19+
parallelism = 1
20+
job.mode = "BATCH"
21+
#spark config
22+
spark.app.name = "SeaTunnel"
23+
spark.executor.instances = 1
24+
spark.executor.cores = 1
25+
spark.executor.memory = "1g"
26+
spark.master = local
27+
}
28+
29+
source {
30+
MongoDB {
31+
uri = "mongodb://e2e_mongodb:27017/test_db"
32+
database = "test_db"
33+
collection = "test_null_op_db"
34+
match.projection = "{ c_bigint:0 }"
35+
result_table_name = "mongodb_null_table"
36+
cursor.no-timeout = true
37+
fetch.size = 1000
38+
max.time-min = 100
39+
schema = {
40+
fields {
41+
c_map = "map<string, string>"
42+
c_array = "array<int>"
43+
c_string = string
44+
c_boolean = boolean
45+
c_int = int
46+
c_bigint = bigint
47+
c_double = double
48+
c_row = {
49+
c_map = "map<string, string>"
50+
c_array = "array<int>"
51+
c_string = string
52+
c_boolean = boolean
53+
c_int = int
54+
c_bigint = bigint
55+
c_double = double
56+
}
57+
}
58+
}
59+
}
60+
}
61+
62+
sink {
63+
MongoDB {
64+
uri = "mongodb://e2e_mongodb:27017/test_db?retryWrites=true"
65+
database = "test_db"
66+
collection = "test_null_op_db_result"
67+
schema = {
68+
fields {
69+
c_map = "map<string, string>"
70+
c_array = "array<int>"
71+
c_string = string
72+
c_boolean = boolean
73+
c_int = int
74+
c_bigint = bigint
75+
c_double = double
76+
c_row = {
77+
c_map = "map<string, string>"
78+
c_array = "array<int>"
79+
c_string = string
80+
c_boolean = boolean
81+
c_int = int
82+
c_bigint = bigint
83+
c_double = double
84+
}
85+
}
86+
}
87+
}
88+
89+
}

0 commit comments

Comments
 (0)