Skip to content

Commit

Permalink
[bugfix][reader][mongodb] read timeout with replica cluster (wgzhao#…
Browse files Browse the repository at this point in the history
…1037)

when deploy mongodb cluster as replica set, it reports timeout, refs wgzhao#1036
  • Loading branch information
wgzhao authored May 24, 2024
1 parent 4897020 commit a13cfa5
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,7 @@ public class KeyConstant
public static final String UPPER_BOUND = "upperBound";
public static final String IS_OBJECT_ID = "isObjectId";

/*
* 批量获取的记录数
*/
// public static final String BATCH_SIZE = "batchSize"

/**
* MongoDB的_id
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@

package com.wgzhao.addax.plugin.reader.mongodbreader;

import com.mongodb.MongoClient;

import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.MongoDatabase;
Expand Down Expand Up @@ -65,9 +66,9 @@ public static class Job

private MongoClient mongoClient;

private boolean isNullOrEmpty(String obj)
private boolean notNullAndEmpty(String obj)
{
return obj == null || obj.isEmpty();
return obj != null && !obj.isEmpty();
}

@Override
Expand Down Expand Up @@ -97,7 +98,7 @@ public void init()
throw AddaxException.asAddaxException(MongoDBReaderErrorCode.ILLEGAL_VALUE,
"The configuration column must be required and DOES NOT support \"*\" yet");
}
if (!isNullOrEmpty((userName)) && !isNullOrEmpty((password))) {
if (notNullAndEmpty((userName)) && notNullAndEmpty((password))) {
this.mongoClient = MongoUtil.initCredentialMongoClient(addressList, userName, password, authDb);
}
else {
Expand Down Expand Up @@ -129,9 +130,9 @@ public static class Task
private boolean isObjectId = true;
private int fetchSize;

private boolean isNullOrEmpty(String obj)
private boolean notNullAndEmpty(String obj)
{
return obj == null || obj.isEmpty();
return obj != null && !obj.isEmpty();
}

@Override
Expand Down Expand Up @@ -161,7 +162,7 @@ else if (upperBound.equals("max")) {
filter.append(KeyConstant.MONGO_PRIMARY_ID, new Document("$gte", isObjectId ? new ObjectId(lowerBound.toString()) : lowerBound)
.append("$lt", isObjectId ? new ObjectId(upperBound.toString()) : upperBound));
}
if (!isNullOrEmpty((query))) {
if (notNullAndEmpty((query))) {
Document queryFilter = Document.parse(query);
filter = new Document("$and", Arrays.asList(filter, queryFilter));
}
Expand Down Expand Up @@ -250,7 +251,7 @@ public void init()
this.collection = connConf.getString(KeyConstant.MONGO_COLLECTION_NAME);
String authDb = connConf.getString(KeyConstant.MONGO_AUTH_DB, this.database);
List<Object> addressList = connConf.getList(KeyConstant.MONGO_ADDRESS, Object.class);
if (!isNullOrEmpty((userName)) && !isNullOrEmpty((password))) {
if (notNullAndEmpty((userName)) && notNullAndEmpty((password))) {
this.mongoClient = MongoUtil.initCredentialMongoClient(addressList, userName, password, authDb);
}
else {
Expand All @@ -264,10 +265,4 @@ public void destroy()
//
}
}

public static void main(String[] args)
{
String a = "12";
System.out.println(Double.parseDouble(a));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@

package com.wgzhao.addax.plugin.reader.mongodbreader.util;

import com.mongodb.MongoClient;

import com.mongodb.MongoCommandException;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.wgzhao.addax.common.exception.AddaxException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,16 @@

package com.wgzhao.addax.plugin.reader.mongodbreader.util;

import com.mongodb.MongoClientOptions;
import com.mongodb.MongoClientSettings;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.wgzhao.addax.common.exception.AddaxException;
import com.wgzhao.addax.common.util.Configuration;
import com.wgzhao.addax.plugin.reader.mongodbreader.KeyConstant;
import com.wgzhao.addax.plugin.reader.mongodbreader.MongoDBReaderErrorCode;
import com.mongodb.MongoClient;
import com.mongodb.MongoCredential;
import com.mongodb.ServerAddress;

import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;

/**
Expand All @@ -45,22 +42,7 @@ private MongoUtil() {}

public static MongoClient initMongoClient(List<Object> addressList)
{

if (addressList == null || addressList.isEmpty()) {
throw AddaxException.asAddaxException(MongoDBReaderErrorCode.ILLEGAL_VALUE, "不合法参数");
}
try {
return new MongoClient(parseServerAddress(addressList));
}
catch (UnknownHostException e) {
throw AddaxException.asAddaxException(MongoDBReaderErrorCode.ILLEGAL_ADDRESS, "不合法的地址");
}
catch (NumberFormatException e) {
throw AddaxException.asAddaxException(MongoDBReaderErrorCode.ILLEGAL_VALUE, "不合法参数");
}
catch (Exception e) {
throw AddaxException.asAddaxException(MongoDBReaderErrorCode.UNKNOWN_EXCEPTION, "未知异常");
}
return initCredentialMongoClient(addressList, "", "", null);
}

public static MongoClient initCredentialMongoClient(List<Object> addressList, String userName, String password, String database)
Expand All @@ -70,11 +52,24 @@ public static MongoClient initCredentialMongoClient(List<Object> addressList, St
throw AddaxException.asAddaxException(MongoDBReaderErrorCode.ILLEGAL_VALUE, "不合法参数");
}
try {
MongoCredential credential = MongoCredential.createCredential(userName, database, password.toCharArray());
return new MongoClient(parseServerAddress(addressList), Collections.singletonList(credential));
}
catch (UnknownHostException e) {
throw AddaxException.asAddaxException(MongoDBReaderErrorCode.ILLEGAL_ADDRESS, "不合法的地址");
MongoCredential credential = null;
if (! userName.isEmpty() && ! password.isEmpty()) {
credential = MongoCredential.createCredential(userName, database, password.toCharArray());
}
MongoClientSettings.Builder mongoBuilder = MongoClientSettings.builder()
.applyToClusterSettings(builder -> {
try {
builder.hosts(parseServerAddress(addressList));
}
catch (UnknownHostException e) {
throw AddaxException.asAddaxException(MongoDBReaderErrorCode.ILLEGAL_ADDRESS, "不合法的地址");
}
});
if (credential != null) {
mongoBuilder.credential(credential);
}
return MongoClients.create(mongoBuilder.build());

}
catch (NumberFormatException e) {
throw AddaxException.asAddaxException(MongoDBReaderErrorCode.ILLEGAL_VALUE, "不合法参数");
Expand Down

0 comments on commit a13cfa5

Please sign in to comment.