Skip to content

Commit

Permalink
Merge pull request #1264 from RichardShan/dev-0.3
Browse files Browse the repository at this point in the history
Dev-0.3 bug fix and rebuild source
  • Loading branch information
RichardShan authored Oct 29, 2019
2 parents 7fead19 + 0c30220 commit fdd6915
Show file tree
Hide file tree
Showing 19 changed files with 530 additions and 186 deletions.
21 changes: 14 additions & 7 deletions server/src/main/java/edp/core/common/jdbc/ESDataSource.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import com.alibaba.druid.pool.ElasticSearchDruidDataSourceFactory;
import com.alibaba.druid.util.StringUtils;
import edp.core.exception.SourceException;
import edp.core.model.JdbcSourceInfo;
import edp.core.utils.CollectionUtils;
import edp.core.utils.SourceUtils;
import lombok.extern.slf4j.Slf4j;

Expand All @@ -42,16 +44,16 @@ private ESDataSource() {

private static volatile Map<String, DataSource> esDataSourceMap = new HashMap<>();

public static synchronized DataSource getDataSource(String jdbcUrl, String userename, String password, JdbcDataSource jdbcDataSource) throws SourceException {
String key = SourceUtils.getKey(jdbcUrl, userename, password, null, false);
public static synchronized DataSource getDataSource(JdbcSourceInfo jdbcSourceInfo, JdbcDataSource jdbcDataSource) throws SourceException {
String key = SourceUtils.getKey(jdbcSourceInfo.getJdbcUrl(), jdbcSourceInfo.getUsername(), jdbcSourceInfo.getPassword(), null, false);
if (!esDataSourceMap.containsKey(key) || null == esDataSourceMap.get(key)) {
Properties properties = new Properties();
properties.setProperty(PROP_URL, jdbcUrl.trim());
if (!StringUtils.isEmpty(userename)) {
properties.setProperty(PROP_USERNAME, userename);
properties.setProperty(PROP_URL, jdbcSourceInfo.getJdbcUrl().trim());
if (!StringUtils.isEmpty(jdbcSourceInfo.getUsername())) {
properties.setProperty(PROP_USERNAME, jdbcSourceInfo.getUsername());
}
if (!StringUtils.isEmpty(password)) {
properties.setProperty(PROP_PASSWORD, password);
if (!StringUtils.isEmpty(jdbcSourceInfo.getPassword())) {
properties.setProperty(PROP_PASSWORD, jdbcSourceInfo.getPassword());
}
properties.setProperty(PROP_MAXACTIVE, String.valueOf(jdbcDataSource.getMaxActive()));
properties.setProperty(PROP_INITIALSIZE, String.valueOf(jdbcDataSource.getInitialSize()));
Expand All @@ -64,6 +66,11 @@ public static synchronized DataSource getDataSource(String jdbcUrl, String usere
properties.setProperty(PROP_TESTONBORROW, String.valueOf(jdbcDataSource.isTestOnBorrow()));
properties.setProperty(PROP_TESTONRETURN, String.valueOf(jdbcDataSource.isTestOnReturn()));
properties.put(PROP_CONNECTIONPROPERTIES, "client.transport.ignore_cluster_name=true");

if (!CollectionUtils.isEmpty(jdbcSourceInfo.getProperties())) {
jdbcSourceInfo.getProperties().forEach(dict -> properties.setProperty(dict.getKey(), dict.getValue()));
}

try {
dataSource = ElasticSearchDruidDataSourceFactory.createDataSource(properties);
esDataSourceMap.put(key, dataSource);
Expand Down
47 changes: 34 additions & 13 deletions server/src/main/java/edp/core/common/jdbc/JdbcDataSource.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import edp.core.consts.Consts;
import edp.core.enums.DataTypeEnum;
import edp.core.exception.SourceException;
import edp.core.model.JdbcSourceInfo;
import edp.core.utils.CollectionUtils;
import edp.core.utils.ServerUtils;
import edp.core.utils.SourceUtils;
import edp.davinci.core.config.SpringContextHolder;
Expand All @@ -34,6 +36,7 @@

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

import static edp.core.consts.Consts.JDBC_DATASOURCE_DEFAULT_VERSION;

Expand Down Expand Up @@ -94,8 +97,12 @@ public class JdbcDataSource {

private static volatile Map<String, DruidDataSource> dataSourceMap = new HashMap<>();

public synchronized void removeDatasource(String jdbcUrl, String username, String password, String version, boolean isExt) {
String key = SourceUtils.getKey(jdbcUrl, username, password, version, isExt);
public synchronized void removeDatasource(JdbcSourceInfo jdbcSourceInfo) {
String key = SourceUtils.getKey(jdbcSourceInfo.getJdbcUrl(),
jdbcSourceInfo.getUsername(),
jdbcSourceInfo.getPassword(),
jdbcSourceInfo.getDbVersion(),
jdbcSourceInfo.isExt());

if (dataSourceMap.containsKey(key)) {
DruidDataSource druidDataSource = dataSourceMap.get(key);
Expand All @@ -104,8 +111,12 @@ public synchronized void removeDatasource(String jdbcUrl, String username, Strin
}
}

public synchronized DruidDataSource getDataSource(String jdbcUrl, String username, String password, String database, String version, boolean isExt) throws SourceException {
String key = SourceUtils.getKey(jdbcUrl, username, password, version, isExt);
public synchronized DruidDataSource getDataSource(JdbcSourceInfo jdbcSourceInfo) throws SourceException {
String key = SourceUtils.getKey(jdbcSourceInfo.getJdbcUrl(),
jdbcSourceInfo.getUsername(),
jdbcSourceInfo.getPassword(),
jdbcSourceInfo.getDbVersion(),
jdbcSourceInfo.isExt());

if (dataSourceMap.containsKey(key) && dataSourceMap.get(key) != null) {
DruidDataSource druidDataSource = dataSourceMap.get(key);
Expand All @@ -118,26 +129,31 @@ public synchronized DruidDataSource getDataSource(String jdbcUrl, String usernam

DruidDataSource instance = new DruidDataSource();

if (StringUtils.isEmpty(version) || !isExt || JDBC_DATASOURCE_DEFAULT_VERSION.equals(version)) {
String className = SourceUtils.getDriverClassName(jdbcUrl, null);
if (StringUtils.isEmpty(jdbcSourceInfo.getDbVersion()) ||
!jdbcSourceInfo.isExt() || JDBC_DATASOURCE_DEFAULT_VERSION.equals(jdbcSourceInfo.getDbVersion())) {

String className = SourceUtils.getDriverClassName(jdbcSourceInfo.getJdbcUrl(), null);
try {
Class.forName(className);
} catch (ClassNotFoundException e) {
throw new SourceException("Unable to get driver instance for jdbcUrl: " + jdbcUrl);
throw new SourceException("Unable to get driver instance for jdbcUrl: " + jdbcSourceInfo.getJdbcUrl());
}

instance.setDriverClassName(className);

} else {
String path = ((ServerUtils) SpringContextHolder.getBean(ServerUtils.class)).getBasePath()
+ String.format(Consts.PATH_EXT_FORMATER, database, version);
+ String.format(Consts.PATH_EXT_FORMATER, jdbcSourceInfo.getDatabase(), jdbcSourceInfo.getDbVersion());
instance.setDriverClassLoader(ExtendedJdbcClassLoader.getExtJdbcClassLoader(path));
}

instance.setUrl(jdbcUrl.trim());
instance.setUsername(jdbcUrl.toLowerCase().contains(DataTypeEnum.ELASTICSEARCH.getFeature()) ? null : username);
instance.setPassword((jdbcUrl.toLowerCase().contains(DataTypeEnum.PRESTO.getFeature()) || jdbcUrl.toLowerCase().contains(DataTypeEnum.ELASTICSEARCH.getFeature())) ?
null : password);
instance.setUrl(jdbcSourceInfo.getJdbcUrl());
instance.setUsername(jdbcSourceInfo.getUsername());

if (!jdbcSourceInfo.getJdbcUrl().toLowerCase().contains(DataTypeEnum.PRESTO.getFeature())) {
instance.setPassword(jdbcSourceInfo.getPassword());
}

instance.setInitialSize(initialSize);
instance.setMinIdle(minIdle);
instance.setMaxActive(maxActive);
Expand All @@ -149,7 +165,12 @@ public synchronized DruidDataSource getDataSource(String jdbcUrl, String usernam
instance.setTestOnReturn(testOnReturn);
instance.setConnectionErrorRetryAttempts(connectionErrorRetryAttempts);
instance.setBreakAfterAcquireFailure(breakAfterAcquireFailure);
// instance.setQueryTimeout(queryTimeout / 1000);

if (!CollectionUtils.isEmpty(jdbcSourceInfo.getProperties())) {
Properties properties = new Properties();
jdbcSourceInfo.getProperties().forEach(dict -> properties.setProperty(dict.getKey(), dict.getValue()));
instance.setConnectProperties(properties);
}

try {
instance.init();
Expand Down
7 changes: 4 additions & 3 deletions server/src/main/java/edp/core/model/BaseSource.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,13 @@
import edp.davinci.model.Source;
import lombok.extern.slf4j.Slf4j;

import java.util.List;


@Slf4j
public abstract class BaseSource extends RecordInfo<Source> {


public abstract String getJdbcUrl();


public abstract String getUsername();

public abstract String getPassword();
Expand All @@ -38,5 +37,7 @@ public abstract class BaseSource extends RecordInfo<Source> {

public abstract String getDbVersion();

public abstract List<Dict> getProperties();

public abstract boolean isExt();
}
28 changes: 28 additions & 0 deletions server/src/main/java/edp/core/model/Dict.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* <<
* Davinci
* ==
* Copyright (C) 2016 - 2019 EDP
* ==
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* >>
*
*/

package edp.core.model;

import lombok.Data;

@Data
public class Dict {
private String key;
private String value;
}
108 changes: 108 additions & 0 deletions server/src/main/java/edp/core/model/JdbcSourceInfo.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* <<
* Davinci
* ==
* Copyright (C) 2016 - 2019 EDP
* ==
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* >>
*
*/

package edp.core.model;

import lombok.Getter;

import java.util.List;

@Getter
public class JdbcSourceInfo {
private String jdbcUrl;

private String username;

private String password;

private String database;

private String dbVersion;

private List<Dict> properties;

private boolean ext;

private JdbcSourceInfo(String jdbcUrl, String username, String password, String database, String dbVersion, List<Dict> properties, boolean ext) {
this.jdbcUrl = jdbcUrl;
this.username = username;
this.password = password;
this.database = database;
this.dbVersion = dbVersion;
this.properties = properties;
this.ext = ext;
}


public static final class JdbcSourceInfoBuilder {
private String jdbcUrl;
private String username;
private String password;
private String database;
private String dbVersion;
private List<Dict> properties;
private boolean ext;

private JdbcSourceInfoBuilder() {
}

public static JdbcSourceInfoBuilder aJdbcSourceInfo() {
return new JdbcSourceInfoBuilder();
}

public JdbcSourceInfoBuilder withJdbcUrl(String jdbcUrl) {
this.jdbcUrl = jdbcUrl;
return this;
}

public JdbcSourceInfoBuilder withUsername(String username) {
this.username = username;
return this;
}

public JdbcSourceInfoBuilder withPassword(String password) {
this.password = password;
return this;
}

public JdbcSourceInfoBuilder withDatabase(String database) {
this.database = database;
return this;
}

public JdbcSourceInfoBuilder withDbVersion(String dbVersion) {
this.dbVersion = dbVersion;
return this;
}

public JdbcSourceInfoBuilder withProperties(List<Dict> dicts) {
this.properties = dicts;
return this;
}

public JdbcSourceInfoBuilder withExt(boolean ext) {
this.ext = ext;
return this;
}

public JdbcSourceInfo build() {
return new JdbcSourceInfo(jdbcUrl, username, password, database, dbVersion, properties, ext);
}
}
}
39 changes: 23 additions & 16 deletions server/src/main/java/edp/core/utils/CustomDataSourceUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.util.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static edp.core.consts.Consts.JDBC_DATASOURCE_DEFAULT_VERSION;

Expand All @@ -55,55 +58,59 @@ public static CustomDataSource getInstance(String jdbcUrl, String version) {


public static void loadAllFromYaml(String yamlPath) throws Exception {

if (StringUtils.isEmpty(yamlPath)) {
return;
}

File yamlFile = new File(yamlPath);
if (!yamlFile.exists() || !yamlFile.isFile() || !yamlFile.canRead()) {
return;
}

Yaml yaml = new Yaml();
HashMap<String, Object> loads = yaml.loadAs(new BufferedReader(new FileReader(yamlFile)), HashMap.class);
if(CollectionUtils.isEmpty(loads)) {

if (CollectionUtils.isEmpty(loads)) {
return;
}

ObjectMapper mapper = new ObjectMapper();

for (Map.Entry<String, Object> entry : loads.entrySet()) {

CustomDataSource customDataSource = mapper.convertValue(entry.getValue(), CustomDataSource.class);

if (StringUtils.isEmpty(customDataSource.getName()) || StringUtils.isEmpty(customDataSource.getDriver())) {
throw new Exception("Load custom datasource error: name or driver cannot be EMPTY");
}

if ("null".equals(customDataSource.getName().trim().toLowerCase())) {
throw new Exception("Load custom datasource error: invalid name");
}

if ("null".equals(customDataSource.getDriver().trim().toLowerCase())) {
throw new Exception("Load custom datasource error: invalid driver");
}

if (StringUtils.isEmpty(customDataSource.getDesc())) {
customDataSource.setDesc(customDataSource.getName());
}

if ("null".equals(customDataSource.getDesc().trim().toLowerCase())) {
customDataSource.setDesc(customDataSource.getName());
}

if (StringUtils.isEmpty(customDataSource.getKeyword_prefix()) || StringUtils.isEmpty(customDataSource.getKeyword_suffix())) {
throw new Exception("Load custom datasource error: keyword prefixes and suffixes must be configured in pairs.");
if (!StringUtils.isEmpty(customDataSource.getKeyword_prefix()) || !StringUtils.isEmpty(customDataSource.getKeyword_suffix())) {
if (StringUtils.isEmpty(customDataSource.getKeyword_prefix()) || StringUtils.isEmpty(customDataSource.getKeyword_suffix())) {
throw new Exception("Load custom datasource error: keyword prefixes and suffixes must be configured in pairs.");
}
}

if (StringUtils.isEmpty(customDataSource.getAlias_prefix()) || StringUtils.isEmpty(customDataSource.getAlias_suffix())) {
throw new Exception("Load custom datasource error: alias prefixes and suffixes must be configured in pairs.");
if (!StringUtils.isEmpty(customDataSource.getAlias_prefix()) || !StringUtils.isEmpty(customDataSource.getAlias_suffix())) {
if (StringUtils.isEmpty(customDataSource.getAlias_prefix()) || StringUtils.isEmpty(customDataSource.getAlias_suffix())) {
throw new Exception("Load custom datasource error: alias prefixes and suffixes must be configured in pairs.");
}
}

List<String> versoins = null;
Expand Down
Loading

0 comments on commit fdd6915

Please sign in to comment.