Skip to content

Commit

Permalink
IMPALA-5259: Add REFRESH FUNCTIONS <db> statement
Browse files Browse the repository at this point in the history
Before this patch, Impala relied on INVALIDATE METADATA to load
externally added UDFs from HMS. The problem with this approach is that
INVALIDATE METADATA affects all databases and tables in the entire
cluster.

In this patch, we add a REFRESH FUNCTIONS <db> statement that reloads
the functions of a database from HMS. We return a list of updated and
removed db functions to the issuing Impalad in order to update its
local catalog cache.

Testing:
- Ran a private build which passed.

Change-Id: I3625c88bb51cca833f3293c224d3f0feb00e6e0b
Reviewed-on: http://gerrit.cloudera.org:8080/6878
Reviewed-by: Taras Bobrovytsky <[email protected]>
Tested-by: Impala Public Jenkins
  • Loading branch information
tbobrovytsky authored and jenkins committed May 25, 2017
1 parent 6ce0068 commit df708e1
Show file tree
Hide file tree
Showing 12 changed files with 323 additions and 63 deletions.
3 changes: 3 additions & 0 deletions common/thrift/CatalogService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,9 @@ struct TResetMetadataRequest {
// If set, refreshes the specified partition, otherwise
// refreshes the whole table
5: optional list<CatalogObjects.TPartitionKeyValue> partition_spec

// If set, refreshes functions in the specified database.
6: optional string db_name
}

// Response from TResetMetadataRequest
Expand Down
10 changes: 6 additions & 4 deletions fe/src/main/cup/sql-parser.cup
Original file line number Diff line number Diff line change
Expand Up @@ -661,13 +661,15 @@ overwrite_val ::=

reset_metadata_stmt ::=
KW_INVALIDATE KW_METADATA
{: RESULT = new ResetMetadataStmt(null, false, null); :}
{: RESULT = ResetMetadataStmt.createInvalidateStmt(null); :}
| KW_INVALIDATE KW_METADATA table_name:table
{: RESULT = new ResetMetadataStmt(table, false, null); :}
{: RESULT = ResetMetadataStmt.createInvalidateStmt(table); :}
| KW_REFRESH table_name:table
{: RESULT = new ResetMetadataStmt(table, true, null); :}
{: RESULT = ResetMetadataStmt.createRefreshTableStmt(table, null); :}
| KW_REFRESH table_name:table partition_spec:partition
{: RESULT = new ResetMetadataStmt(table, true, partition); :}
{: RESULT = ResetMetadataStmt.createRefreshTableStmt(table, partition); :}
| KW_REFRESH KW_FUNCTIONS ident_or_default:db
{: RESULT = ResetMetadataStmt.createRefreshFunctionsStmt(db); :}
;

explain_stmt ::=
Expand Down
61 changes: 45 additions & 16 deletions fe/src/main/java/org/apache/impala/analysis/ResetMetadataStmt.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,16 @@
import com.google.common.base.Preconditions;

/**
* Representation of a REFRESH/INVALIDATE METADATA statement.
* Representation of the following statements:
* INVALIDATE METADATA
* INVALIDATE METADATA <table>
* REFRESH <table>
* REFRESH <table> PARTITION <partition>
* REFRESH FUNCTIONS <database>
*/
public class ResetMetadataStmt extends StatementBase {
// Updated during analysis. Null if invalidating the entire catalog.
// Updated during analysis. Null if invalidating the entire catalog or refreshing
// database functions.
private TableName tableName_;

// true if it is a REFRESH statement.
Expand All @@ -39,16 +45,36 @@ public class ResetMetadataStmt extends StatementBase {
// not null when refreshing a single partition
private final PartitionSpec partitionSpec_;

public ResetMetadataStmt(TableName name, boolean isRefresh,
PartitionSpec partitionSpec) {
Preconditions.checkArgument(!isRefresh || name != null);
Preconditions.checkArgument(isRefresh || partitionSpec == null);
this.tableName_ = name;
// not null when refreshing functions in a database.
private final String database_;

private ResetMetadataStmt(TableName tableName, boolean isRefresh,
PartitionSpec partitionSpec, String db) {
Preconditions.checkArgument(!isRefresh || (tableName != null || db != null));
Preconditions.checkArgument(isRefresh || (partitionSpec == null && db == null));
Preconditions.checkArgument(db == null || (
tableName == null && isRefresh && partitionSpec == null));

this.database_ = db;
this.tableName_ = tableName;
this.isRefresh_ = isRefresh;
this.partitionSpec_ = partitionSpec;
if (partitionSpec_ != null) partitionSpec_.setTableName(tableName_);
}

public static ResetMetadataStmt createInvalidateStmt(TableName tableName) {
return new ResetMetadataStmt(tableName, false, null, null);
}

public static ResetMetadataStmt createRefreshTableStmt(TableName tableName,
PartitionSpec partitionSpec) {
return new ResetMetadataStmt(tableName, true, partitionSpec, null);
}

public static ResetMetadataStmt createRefreshFunctionsStmt(String database) {
return new ResetMetadataStmt(null, true, null, database);
}

public TableName getTableName() { return tableName_; }

@Override
Expand Down Expand Up @@ -85,25 +111,28 @@ public void analyze(Analyzer analyzer) throws AnalysisException {
public String toSql() {
StringBuilder result = new StringBuilder();
if (isRefresh_) {
result.append("INVALIDATE METADATA");
} else {
result.append("REFRESH");
if (database_ == null) {
result.append(" ").append(tableName_);
if (partitionSpec_ != null) result.append(" " + partitionSpec_.toSql());
} else {
result.append(" FUNCTIONS ").append(database_);
}
} else {
result.append("INVALIDATE METADATA");
if (tableName_ != null) result.append(" ").append(tableName_);
}

if (tableName_ != null) result.append(" ").append(tableName_);
if (partitionSpec_ != null) result.append(" " + partitionSpec_.toSql());
return result.toString();
}

public TResetMetadataRequest toThrift() {
TResetMetadataRequest params = new TResetMetadataRequest();
TResetMetadataRequest params = new TResetMetadataRequest();
params.setIs_refresh(isRefresh_);
if (tableName_ != null) {
params.setTable_name(new TTableName(tableName_.getDb(), tableName_.getTbl()));
}
if (partitionSpec_ != null) {
params.setPartition_spec(partitionSpec_.toThrift());
}
if (partitionSpec_ != null) params.setPartition_spec(partitionSpec_.toThrift());
if (database_ != null) params.setDb_name(database_);
return params;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.hadoop.hive.metastore.api.ResourceUri;
import org.apache.hadoop.hive.metastore.api.UnknownDBException;
import org.apache.hadoop.hive.ql.exec.FunctionUtils;
import org.apache.impala.analysis.Analyzer;
import org.apache.impala.authorization.SentryConfig;
import org.apache.impala.catalog.MetaStoreClientPool.MetaStoreClient;
import org.apache.impala.common.FileSystemUtil;
Expand Down Expand Up @@ -603,6 +604,68 @@ private void loadJavaFunctions(Db db,
LOG.info("Loaded Java functions for database: " + db.getName());
}

/**
* Reloads function metadata for 'dbName' database. Populates the 'addedFuncs' list
* with functions that were added as a result of this operation. Populates the
* 'removedFuncs' list with functions that were removed.
*/
public void refreshFunctions(MetaStoreClient msClient, String dbName,
List<TCatalogObject> addedFuncs, List<TCatalogObject> removedFuncs)
throws CatalogException {
// Create a temporary database that will contain all the functions from the HMS.
Db tmpDb;
try {
List<org.apache.hadoop.hive.metastore.api.Function> javaFns =
Lists.newArrayList();
for (String javaFn : msClient.getHiveClient().getFunctions(dbName, "*")) {
javaFns.add(msClient.getHiveClient().getFunction(dbName, javaFn));
}
// Contains native functions in it's params map.
org.apache.hadoop.hive.metastore.api.Database msDb =
msClient.getHiveClient().getDatabase(dbName);
tmpDb = new Db(dbName, this, null);
// Load native UDFs into the temporary db.
loadFunctionsFromDbParams(tmpDb, msDb);
// Load Java UDFs from HMS into the temporary db.
loadJavaFunctions(tmpDb, javaFns);

Db db = dbCache_.get().get(dbName);
if (db == null) {
throw new DatabaseNotFoundException("Database does not exist: " + dbName);
}
// Load transient functions into the temporary db.
for (Function fn: db.getTransientFunctions()) tmpDb.addFunction(fn);

// Compute the removed functions and remove them from the db.
for (Map.Entry<String, List<Function>> e: db.getAllFunctions().entrySet()) {
for (Function fn: e.getValue()) {
if (tmpDb.getFunction(
fn, Function.CompareMode.IS_INDISTINGUISHABLE) == null) {
fn.setCatalogVersion(incrementAndGetCatalogVersion());
removedFuncs.add(fn.toTCatalogObject());
}
}
}

// We will re-add all the functions to the db because it's possible that a
// function was dropped and a different function (for example, the binary is
// different) with the same name and signature was re-added in Hive.
db.removeAllFunctions();
for (Map.Entry<String, List<Function>> e: tmpDb.getAllFunctions().entrySet()) {
for (Function fn: e.getValue()) {
// We do not need to increment and acquire a new catalog version for this
// function here because this already happens when the functions are loaded
// into tmpDb.
db.addFunction(fn);
addedFuncs.add(fn.toTCatalogObject());
}
}

} catch (Exception e) {
throw new CatalogException("Error refreshing functions in " + dbName + ": ", e);
}
}

/**
* Invalidates the database 'db'. This method can have potential race
* conditions with external changes to the Hive metastore and hence any
Expand Down
6 changes: 6 additions & 0 deletions fe/src/main/java/org/apache/impala/catalog/Db.java
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,12 @@ public Function removeFunction(Function desc) {
}
}

public void removeAllFunctions() {
synchronized (functions_) {
functions_.clear();
}
}

/**
* Removes a Function with the matching signature string. Returns the removed Function
* if a Function was removed as a result of this call, null otherwise.
Expand Down
9 changes: 9 additions & 0 deletions fe/src/main/java/org/apache/impala/catalog/Function.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.impala.common.InternalException;
import org.apache.impala.service.FeSupport;
import org.apache.impala.thrift.TAggregateFunction;
import org.apache.impala.thrift.TCatalogObject;
import org.apache.impala.thrift.TCatalogObjectType;
import org.apache.impala.thrift.TColumnType;
import org.apache.impala.thrift.TFunction;
Expand Down Expand Up @@ -310,6 +311,14 @@ private boolean isIndistinguishable(Function o) {
// Child classes must override this function.
public String toSql(boolean ifNotExists) { return ""; }

public TCatalogObject toTCatalogObject () {
TCatalogObject result = new TCatalogObject();
result.setType(TCatalogObjectType.FUNCTION);
result.setFn(toThrift());
result.setCatalog_version(catalogVersion_);
return result;
}

public TFunction toThrift() {
TFunction fn = new TFunction();
fn.setSignature(signatureString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,9 @@ private void addCatalogObject(TCatalogObject catalogObject)
addTable(catalogObject.getTable(), catalogObject.getCatalog_version());
break;
case FUNCTION:
// Remove the function first, in case there is an existing function with the same
// name and signature.
removeFunction(catalogObject.getFn(), catalogObject.getCatalog_version());
addFunction(catalogObject.getFn(), catalogObject.getCatalog_version());
break;
case DATA_SOURCE:
Expand Down
30 changes: 17 additions & 13 deletions fe/src/main/java/org/apache/impala/service/CatalogOpExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -987,14 +987,6 @@ private void createDatabase(TCreateDbParams params, TDdlExecResponse resp)
resp.result.getUpdated_catalog_object_DEPRECATED().getCatalog_version());
}

private TCatalogObject buildTCatalogFnObject(Function fn) {
TCatalogObject result = new TCatalogObject();
result.setType(TCatalogObjectType.FUNCTION);
result.setFn(fn.toThrift());
result.setCatalog_version(fn.getCatalogVersion());
return result;
}

private void createFunction(TCreateFunctionParams params, TDdlExecResponse resp)
throws ImpalaException {
Function fn = Function.fromThrift(params.getFn());
Expand Down Expand Up @@ -1042,14 +1034,14 @@ private void createFunction(TCreateFunctionParams params, TDdlExecResponse resp)
addedFn.signatureString()));
}
Preconditions.checkState(catalog_.addFunction(addedFn));
addedFunctions.add(buildTCatalogFnObject(addedFn));
addedFunctions.add(addedFn.toTCatalogObject());
}
}
} else {
if (catalog_.addFunction(fn)) {
// Flush DB changes to metastore
applyAlterDatabase(catalog_.getDb(fn.dbName()));
addedFunctions.add(buildTCatalogFnObject(fn));
addedFunctions.add(fn.toTCatalogObject());
}
}

Expand Down Expand Up @@ -1514,7 +1506,7 @@ private void dropFunction(TDropFunctionParams params, TDdlExecResponse resp)
continue;
}
Preconditions.checkNotNull(catalog_.removeFunction(fn));
removedFunctions.add(buildTCatalogFnObject(fn));
removedFunctions.add(fn.toTCatalogObject());
}
} else {
ArrayList<Type> argTypes = Lists.newArrayList();
Expand All @@ -1531,7 +1523,7 @@ private void dropFunction(TDropFunctionParams params, TDdlExecResponse resp)
} else {
// Flush DB changes to metastore
applyAlterDatabase(catalog_.getDb(fn.dbName()));
removedFunctions.add(buildTCatalogFnObject(fn));
removedFunctions.add(fn.toTCatalogObject());
}
}

Expand Down Expand Up @@ -3099,7 +3091,19 @@ public TResetMetadataResponse execResetMetadata(TResetMetadataRequest req)
resp.setResult(new TCatalogUpdateResult());
resp.getResult().setCatalog_service_id(JniCatalog.getServiceId());

if (req.isSetTable_name()) {
if (req.isSetDb_name()) {
// This is a "refresh functions" operation.
synchronized (metastoreDdlLock_) {
try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) {
List<TCatalogObject> addedFuncs = Lists.newArrayList();
List<TCatalogObject> removedFuncs = Lists.newArrayList();
catalog_.refreshFunctions(msClient, req.getDb_name(), addedFuncs, removedFuncs);
resp.result.setUpdated_catalog_objects(addedFuncs);
resp.result.setRemoved_catalog_objects(removedFuncs);
resp.result.setVersion(catalog_.getCatalogVersion());
}
}
} else if (req.isSetTable_name()) {
// Results of an invalidate operation, indicating whether the table was removed
// from the Metastore, and whether a new database was added to Impala as a result
// of the invalidate operation. Always false for refresh.
Expand Down
18 changes: 9 additions & 9 deletions fe/src/main/java/org/apache/impala/service/Frontend.java
Original file line number Diff line number Diff line change
Expand Up @@ -222,20 +222,20 @@ public TUpdateCatalogCacheResponse updateCatalogCache(
TUpdateCatalogCacheRequest req) throws CatalogException {
ImpaladCatalog catalog = impaladCatalog_;

if (req.is_delta) return catalog.updateCatalog(req);

// If this is not a delta, this update should replace the current
// Catalog contents so create a new catalog and populate it.
if (!req.is_delta) catalog = new ImpaladCatalog(defaultKuduMasterHosts_);
catalog = new ImpaladCatalog(defaultKuduMasterHosts_);

TUpdateCatalogCacheResponse response = catalog.updateCatalog(req);

if (!req.is_delta) {
// This was not a delta update. Now that the catalog has been updated,
// replace the references to impaladCatalog_/authzChecker_ ensure
// clients continue don't see the catalog disappear.
impaladCatalog_ = catalog;
authzChecker_.set(new AuthorizationChecker(authzConfig_,
impaladCatalog_.getAuthPolicy()));
}
// Now that the catalog has been updated, replace the references to
// impaladCatalog_/authzChecker_. This ensures that clients don't see
// the catalog disappear.
impaladCatalog_ = catalog;
authzChecker_.set(new AuthorizationChecker(authzConfig_,
impaladCatalog_.getAuthPolicy()));
return response;
}

Expand Down
2 changes: 2 additions & 0 deletions fe/src/test/java/org/apache/impala/analysis/ParserTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -3076,6 +3076,7 @@ public void TestResetMetadata() {
ParsesOk("refresh Foo partition (col=2)");
ParsesOk("refresh Foo.S partition (col=2)");
ParsesOk("refresh Foo.S partition (col1 = 2, col2 = 3)");
ParsesOk("refresh functions Foo");

ParserError("invalidate");
ParserError("invalidate metadata Foo.S.S");
Expand All @@ -3085,6 +3086,7 @@ public void TestResetMetadata() {
ParserError("refresh");
ParserError("refresh Foo.S partition (col1 = 2, col2)");
ParserError("refresh Foo.S partition ()");
ParserError("refresh functions Foo.S");
}

@Test
Expand Down
21 changes: 21 additions & 0 deletions fe/src/test/java/org/apache/impala/analysis/ToSqlTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -1308,4 +1308,25 @@ public void testTableSample() {
"WITH t AS (SELECT * FROM functional.alltypes TABLESAMPLE SYSTEM(5)) " +
"SELECT * FROM t");
}

/**
* Tests invalidate statements are output correctly.
*/
@Test
public void testInvalidate() {
testToSql("INVALIDATE METADATA", "INVALIDATE METADATA");
testToSql("INVALIDATE METADATA functional.alltypes",
"INVALIDATE METADATA functional.alltypes");
}

/**
* Tests refresh statements are output correctly.
*/
@Test
public void testRefresh() {
testToSql("REFRESH functional.alltypes", "REFRESH functional.alltypes");
testToSql("REFRESH functional.alltypes PARTITION (year=2009, month=1)",
"REFRESH functional.alltypes PARTITION (year=2009, month=1)");
testToSql("REFRESH FUNCTIONS functional", "REFRESH FUNCTIONS functional");
}
}
Loading

0 comments on commit df708e1

Please sign in to comment.