Skip to content

Commit

Permalink
Merge pull request #1 from lianneli/master
Browse files Browse the repository at this point in the history
Add hive metadata collector
  • Loading branch information
Functor10 authored Mar 4, 2019
2 parents 63f069a + 1fedd9c commit dafa91b
Show file tree
Hide file tree
Showing 5 changed files with 118 additions and 35 deletions.
18 changes: 16 additions & 2 deletions core/src/main/java/com/qihoo/qsql/metadata/MetadataClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,17 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Provide methods to fetch data from metastore.
*/
//TODO replace with spring+mybatis
public class MetadataClient implements AutoCloseable {

private static final Logger LOGGER = LoggerFactory.getLogger(MetadataClient.class);

//TODO think about more than one metastore
private static Properties properties;

Expand All @@ -48,7 +52,8 @@ public MetadataClient() throws SQLException {
*/
public DatabaseValue getBasicDatabaseInfoById(Long dbId) {
DatabaseValue databaseValue = null;
String sql = String.format("select DB_ID,NAME,DB_TYPE from DBS where DB_ID ='%d'", dbId);
String sql = String.format("select DB_ID, NAME, DB_TYPE from DBS where DB_ID ='%d'", dbId);
LOGGER.debug("getBasicDatabaseInfoById sql is {}", sql);
try (PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
try (ResultSet resultSet = preparedStatement.executeQuery()) {
if (resultSet != null && resultSet.next()) {
Expand All @@ -72,7 +77,8 @@ public DatabaseValue getBasicDatabaseInfoById(Long dbId) {
*/
public DatabaseValue getBasicDatabaseInfo(String databaseName) {
DatabaseValue databaseValue = null;
String sql = String.format("select DB_ID, `DESC`, NAME,DB_TYPE from DBS where name ='%s'", databaseName);
String sql = String.format("select DB_ID, `DESC`, NAME, DB_TYPE from DBS where name ='%s'", databaseName);
LOGGER.debug("getBasicDatabaseInfo sql is {}", sql);
try (PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
try (ResultSet resultSet = preparedStatement.executeQuery()) {
if (resultSet != null && resultSet.next()) {
Expand All @@ -97,6 +103,7 @@ public DatabaseValue getBasicDatabaseInfo(String databaseName) {
public Long insertBasicDatabaseInfo(DatabaseValue value) {
String sql = String.format("INSERT INTO DBS(NAME, DB_TYPE, `DESC`) VALUES('%s', '%s', '%s')",
value.getName(), value.getDbType(), value.getDesc());
LOGGER.debug("insertBasicDatabaseInfo sql is {}", sql);
try (PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
preparedStatement.execute();
} catch (SQLException ex) {
Expand All @@ -118,6 +125,7 @@ public void insertDatabaseSchema(List<DatabaseParamValue> values) {
.reduce((left, right) -> left + ", " + right).orElse("()");
String sql = String.format("INSERT INTO DATABASE_PARAMS(DB_ID, PARAM_KEY, PARAM_VALUE) VALUES %s",
waitedForInsert);
LOGGER.debug("insertDatabaseSchema sql is {}", sql);
try (PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
preparedStatement.execute();
} catch (SQLException ex) {
Expand All @@ -134,6 +142,7 @@ public void insertDatabaseSchema(List<DatabaseParamValue> values) {
public List<DatabaseParamValue> getDatabaseSchema(Long databaseId) {
List<DatabaseParamValue> databaseParams = new ArrayList<>();
String sql = String.format("select PARAM_KEY,PARAM_VALUE from DATABASE_PARAMS where DB_ID='%d'", databaseId);
LOGGER.debug("getDatabaseSchema sql is {}", sql);
try (PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
try (ResultSet resultSet = preparedStatement.executeQuery()) {
if (resultSet != null) {
Expand All @@ -160,6 +169,7 @@ public List<DatabaseParamValue> getDatabaseSchema(Long databaseId) {
public Long insertTableSchema(TableValue value) {
String sql = String.format("INSERT INTO TBLS(DB_ID, TBL_NAME) VALUES(%s, '%s')",
value.getDbId(), value.getTblName());
LOGGER.debug("insertTableSchema sql is {}", sql);
try (PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
preparedStatement.execute();
} catch (SQLException ex) {
Expand All @@ -177,6 +187,7 @@ public Long insertTableSchema(TableValue value) {
public List<TableValue> getTableSchema(String tableName) {
List<TableValue> tbls = new ArrayList<>();
String sql = String.format("select DB_ID,TBL_ID,TBL_NAME from TBLS where TBL_NAME='%s'", tableName);
LOGGER.debug("getTableSchema sql is {}", sql);
try (PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
try (ResultSet resultSet = preparedStatement.executeQuery()) {
if (resultSet != null) {
Expand Down Expand Up @@ -209,6 +220,7 @@ public void insertFieldsSchema(List<ColumnValue> columns) {
.reduce((left, right) -> left + ", " + right).orElse("()");
String sql = String.format(
"INSERT INTO COLUMNS(CD_ID, COLUMN_NAME,TYPE_NAME,INTEGER_IDX) VALUES %s", waitedForInsert);
LOGGER.debug("insertFieldsSchema sql is {}", sql);
try (PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
preparedStatement.execute();
} catch (SQLException ex) {
Expand All @@ -221,6 +233,7 @@ public void insertFieldsSchema(List<ColumnValue> columns) {
*/
public void deleteFieldsSchema(Long tbId) {
String sql = String.format("DELETE FROM COLUMNS WHERE CD_ID = %s", tbId.toString());
LOGGER.debug("deleteFieldsSchema sql is {}", sql);
try (PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
preparedStatement.execute();
} catch (SQLException ex) {
Expand All @@ -239,6 +252,7 @@ public List<ColumnValue> getFieldsSchema(Long tableId) {
String sql = String.format(""
+ "SELECT COLUMN_NAME,TYPE_NAME,INTEGER_IDX FROM COLUMNS WHERE CD_ID='%s' ORDER BY "
+ "INTEGER_IDX", tableId);
LOGGER.debug("getFieldsSchema sql is {}", sql);
try (PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
try (ResultSet resultSet = preparedStatement.executeQuery()) {
if (resultSet != null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.qihoo.qsql.metadata.collect;

import com.qihoo.qsql.metadata.collect.dto.JdbcProp;
import com.qihoo.qsql.metadata.collect.dto.HiveProp;
import com.qihoo.qsql.metadata.entity.ColumnValue;
import com.qihoo.qsql.metadata.entity.DatabaseParamValue;
import com.qihoo.qsql.metadata.entity.DatabaseValue;
Expand All @@ -11,20 +11,18 @@
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Objects;
import org.apache.commons.lang3.StringUtils;

//TODO Extract jdbc metadata collector
public class HiveCollector extends MetadataCollector {

//read from mysql database.
private JdbcProp prop;
private HiveProp prop;
private Connection connection;

HiveCollector(JdbcProp prop, String filterRegexp) throws SQLException, ClassNotFoundException {
HiveCollector(HiveProp prop, String filterRegexp) throws SQLException, ClassNotFoundException {
super(filterRegexp);
this.prop = prop;
Class.forName(prop.getJdbcDriver());
Expand All @@ -43,9 +41,11 @@ protected DatabaseValue convertDatabaseValue() {

@Override
protected List<DatabaseParamValue> convertDatabaseParamValue(Long dbId) {
DatabaseParamValue value = new DatabaseParamValue();
DatabaseParamValue value = new DatabaseParamValue(dbId);
value.setParamKey("cluster").setParamValue("default");
return Collections.singletonList(value);
List<DatabaseParamValue> values = new ArrayList<>();
values.add(value);
return values;
}

@Override
Expand All @@ -59,8 +59,32 @@ protected TableValue convertTableValue(Long dbId, String tableName) {

@Override
protected List<ColumnValue> convertColumnValue(Long tbId, String tableName, String dbName) {
//need a big join
return Collections.emptyList();

List<ColumnValue> columns = new ArrayList<>();
// read Columns
String sql = String.format(""
+ "SELECT COLUMNS_V2.* "
+ "FROM COLUMNS_V2, SDS, TBLS, DBS "
+ "WHERE COLUMNS_V2.CD_ID = SDS.CD_ID "
+ "AND SDS.SD_ID = TBLS.SD_ID "
+ "AND TBLS.DB_ID = DBS.DB_ID "
+ "AND TBLS.TBL_NAME='%s' "
+ "AND DBS.NAME = '%s' ",
tableName, dbName);

columns.addAll(readColumnAndPartitions(tbId, sql));
// read Paritions
String sql2 = String.format(""
+ "SELECT PARTITION_KEYS.* "
+ "FROM PARTITION_KEYS, TBLS, DBS "
+ "WHERE PARTITION_KEYS.TBL_ID = TBLS.TBL_ID "
+ "AND TBLS.DB_ID = DBS.DB_ID "
+ "AND TBLS.TBL_NAME='%s' "
+ "AND DBS.NAME = '%s' ",
tableName, dbName);
columns.addAll(readColumnAndPartitions(tbId, sql2));

return columns;
}

@Override
Expand All @@ -70,7 +94,7 @@ protected List<String> getTableNameList() {
}

try (PreparedStatement preparedStatement = connection.prepareStatement(
String.format("SHOW TABLES LIKE '%s'", filterRegexp))) {
String.format("SELECT TBL_NAME FROM TBLS WHERE TBL_NAME LIKE '%s'", filterRegexp))) {
ResultSet resultSet = preparedStatement.executeQuery();
List<String> tableNames = new ArrayList<>();
while (resultSet.next()) {
Expand All @@ -83,11 +107,16 @@ protected List<String> getTableNameList() {
}

private String getDatabasePosition() {
if (prop.getDbName() == null) {
throw new RuntimeException("Error when extracting dbName from property, "
+ "please check properties");
}
try (PreparedStatement preparedStatement =
connection.prepareStatement("SELECT CURRENT_DATABASE()")) {
connection.prepareStatement("SELECT NAME FROM DBS WHERE NAME = \"" + prop.getDbName() + "\"")) {
ResultSet resultSet = preparedStatement.executeQuery();
if (! resultSet.next()) {
throw new RuntimeException("Execute `SELECT CURRENT_DATABASE()` failed!!");
throw new RuntimeException("Execute `SELECT NAME FROM DBS WHERE NAME = "
+ prop.getDbName() + " ` failed!!");
}
String database = resultSet.getString(1);
if (Objects.isNull(database)) {
Expand All @@ -98,4 +127,23 @@ private String getDatabasePosition() {
throw new RuntimeException(ex);
}
}

private List<ColumnValue> readColumnAndPartitions(Long tbId, String sql) {
List<ColumnValue> columns = new ArrayList<>();
try (PreparedStatement preparedStatement = connection.prepareStatement(sql)) {
ResultSet resultSet = preparedStatement.executeQuery();
while (resultSet.next()) {
ColumnValue value = new ColumnValue();
value.setColumnName(resultSet.getString(3));
value.setTypeName(resultSet.getString(4));
value.setCdId(tbId);
value.setIntegerIdx(resultSet.getInt(5));
value.setComment("Who am I");
columns.add(value);
}
} catch (SQLException ex) {
throw new RuntimeException(ex);
}
return columns;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.qihoo.qsql.metadata.MetadataClient;
import com.qihoo.qsql.metadata.collect.dto.ElasticsearchProp;
import com.qihoo.qsql.metadata.collect.dto.HiveProp;
import com.qihoo.qsql.metadata.collect.dto.JdbcProp;
import com.qihoo.qsql.metadata.entity.ColumnValue;
import com.qihoo.qsql.metadata.entity.DatabaseParamValue;
Expand All @@ -16,15 +17,17 @@
import org.slf4j.LoggerFactory;

public abstract class MetadataCollector {

private static final Logger LOGGER = LoggerFactory.getLogger(MetadataCollector.class);
private static ObjectMapper mapper = new ObjectMapper();
private MetadataClient client = new MetadataClient();
String filterRegexp;

static {
PropertiesReader.configLogger();
}

String filterRegexp;
private MetadataClient client = new MetadataClient();

MetadataCollector(String filterRegexp) throws SQLException {
this.filterRegexp = filterRegexp;
}
Expand All @@ -41,7 +44,7 @@ public static MetadataCollector create(String json, String dataSource, String re
mapper.readValue(json, JdbcProp.class), regexp);
case "hive":
return new HiveCollector(
mapper.readValue(json, JdbcProp.class), regexp);
mapper.readValue(json, HiveProp.class), regexp);
case "mysql":
return new MysqlCollector(
mapper.readValue(json, JdbcProp.class), regexp);
Expand All @@ -57,6 +60,20 @@ public static MetadataCollector create(String json, String dataSource, String re
}
}

/**
* entrance.
*/
public static void main(String[] args) throws SQLException {
if (args.length == 0) {
throw new RuntimeException("Required conn info at least");
}

LOGGER.info("Input params: properties({}), type({}), filter regex({})",
args[0], args[1], args[2]);
MetadataCollector.create(args[0], args[1], args[2]).execute();
System.exit(0);
}

/**
* .
*/
Expand Down Expand Up @@ -117,18 +134,4 @@ public void execute() throws SQLException {
protected abstract List<ColumnValue> convertColumnValue(Long tbId, String tableName, String dbName);

protected abstract List<String> getTableNameList();

/**
* entrance.
*/
public static void main(String[] args) throws SQLException {
if (args.length == 0) {
throw new RuntimeException("Required conn info at least");
}

LOGGER.info("Input params: properties({}), type({}), filter regex({})",
args[0], args[1], args[2]);
MetadataCollector.create(args[0], args[1], args[2]).execute();
System.exit(0);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.qihoo.qsql.metadata.collect.dto;

import javax.validation.constraints.NotNull;

public class HiveProp extends JdbcProp {

@NotNull
private String dbName;


public String getDbName() {
return dbName;
}

public void setDbName(String dbName) {
this.dbName = dbName;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@

public class JdbcProp {
@NotNull
private String jdbcDriver;
protected String jdbcDriver;
@NotNull
private String jdbcUrl;
protected String jdbcUrl;
@NotNull
private String jdbcUser;
protected String jdbcUser;
@NotNull
private String jdbcPassword;
protected String jdbcPassword;

public String getJdbcDriver() {
return jdbcDriver;
Expand Down

0 comments on commit dafa91b

Please sign in to comment.