Skip to content

Commit

Permalink
HIVE-20682: Async query execution can potentially fail if shared sess…
Browse files Browse the repository at this point in the history
…ionHive is closed by master thread (Sankar Hariappan, reviewed by Mahesh Kumar Behera, Anishek Agarwal)
  • Loading branch information
sankarh committed Nov 13, 2018
1 parent af40170 commit 99d25f0
Show file tree
Hide file tree
Showing 5 changed files with 138 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ private Task getReplLoadRootTask(String replicadb, boolean isIncrementalDump, Tu
Task replLoadTask = TaskFactory.get(replLoadWork, confTemp);
replLoadTask.initialize(null, null, new DriverContext(driver.getContext()), null);
replLoadTask.executeTask(null);
Hive.getThreadLocal().closeCurrent();
Hive.closeCurrent();
return replLoadWork.getRootTask();
}

Expand Down
129 changes: 74 additions & 55 deletions ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
Original file line number Diff line number Diff line change
Expand Up @@ -167,35 +167,36 @@ public class Hive {
private IMetaStoreClient metaStoreClient;
private SynchronizedMetaStoreClient syncMetaStoreClient;
private UserGroupInformation owner;
private boolean isAllowClose = true;

// metastore calls timing information
private final ConcurrentHashMap<String, Long> metaCallTimeMap = new ConcurrentHashMap<>();

// Static class to store thread local Hive object and allowClose flag.
// Static class to store thread local Hive object.
private static class ThreadLocalHive extends ThreadLocal<Hive> {
private ThreadLocal<Boolean> allowClose = ThreadLocal.withInitial(() -> true);

@Override
protected Hive initialValue() {
return null;
}

@Override
public synchronized void remove() {
if (allowClose() && (this.get() != null)) {
this.get().close();
public synchronized void set(Hive hiveObj) {
Hive currentHive = this.get();
if (currentHive != hiveObj) {
// Remove/close current thread-local Hive object before overwriting with new Hive object.
remove();
super.set(hiveObj);
}
super.remove();
this.allowClose.set(true);
}

public synchronized void set(Hive hiveObj, boolean allowClose) {
super.set(hiveObj);
this.allowClose.set(allowClose);
}

boolean allowClose() {
return this.allowClose.get();
@Override
public synchronized void remove() {
Hive currentHive = this.get();
if (currentHive != null) {
// Close the metastore connections before removing it from thread local hiveDB.
currentHive.close(false);
super.remove();
}
}
}

Expand Down Expand Up @@ -317,34 +318,29 @@ private static Hive getInternal(HiveConf c, boolean needsRefresh, boolean isFast
Hive db = hiveDB.get();
if (db == null || !db.isCurrentUserOwner() || needsRefresh
|| (c != null && !isCompatible(db, c, isFastCheck))) {
db = create(c, false, db, doRegisterAllFns);
if (db != null) {
LOG.debug("Creating new db. db = " + db + ", needsRefresh = " + needsRefresh +
", db.isCurrentUserOwner = " + db.isCurrentUserOwner());
closeCurrent();
}
db = create(c, doRegisterAllFns);
}
if (c != null) {
db.conf = c;
}
return db;
}

private static Hive create(HiveConf c, boolean needsRefresh, Hive db, boolean doRegisterAllFns)
throws HiveException {
if (db != null) {
LOG.debug("Creating new db. db = " + db + ", needsRefresh = " + needsRefresh +
", db.isCurrentUserOwner = " + db.isCurrentUserOwner());
if (hiveDB.allowClose()) {
db.close();
}
}
closeCurrent();
private static Hive create(HiveConf c, boolean doRegisterAllFns) throws HiveException {
if (c == null) {
c = createHiveConf();
}
c.set("fs.scheme.class", "dfs");
Hive newdb = new Hive(c, doRegisterAllFns);
hiveDB.set(newdb, true);
hiveDB.set(newdb);
return newdb;
}


private static HiveConf createHiveConf() {
SessionState session = SessionState.get();
return (session == null) ? new HiveConf(Hive.class) : session.getConf();
Expand All @@ -360,6 +356,18 @@ private static boolean isCompatible(Hive db, HiveConf c, boolean isFastCheck) {
}
}

private boolean isCurrentUserOwner() throws HiveException {
try {
return owner == null || owner.equals(UserGroupInformation.getCurrentUser());
} catch(IOException e) {
throw new HiveException("Error getting current user: " + e.getMessage(), e);
}
}

public static Hive getThreadLocal() {
return hiveDB.get();
}

public static Hive get() throws HiveException {
return get(true);
}
Expand All @@ -383,21 +391,13 @@ public static Hive get(HiveConf c, boolean needsRefresh) throws HiveException {
}

public static void set(Hive hive) {
hiveDB.set(hive, true);
}

public static void set(Hive hive, boolean allowClose) {
hiveDB.set(hive, allowClose);
hiveDB.set(hive);
}

public static void closeCurrent() {
hiveDB.remove();
}

public static Hive getThreadLocal() {
return hiveDB.get();
}

/**
* Hive
*
Expand All @@ -411,30 +411,49 @@ private Hive(HiveConf c, boolean doRegisterAllFns) throws HiveException {
}
}


private boolean isCurrentUserOwner() throws HiveException {
try {
return owner == null || owner.equals(UserGroupInformation.getCurrentUser());
} catch(IOException e) {
throw new HiveException("Error getting current user: " + e.getMessage(), e);
}
/**
* GC is attempting to destroy the object.
* No one references this Hive anymore, so HMS connection from this Hive object can be closed.
* @throws Throwable
*/
@Override
protected void finalize() throws Throwable {
close(true);
super.finalize();
}

/**
* Marks if the given Hive object is allowed to close metastore connections.
* @param allowClose
*/
public void setAllowClose(boolean allowClose) {
isAllowClose = allowClose;
}

/**
* Gets the allowClose flag which determines if it is allowed to close metastore connections.
* @returns allowClose flag
*/
public boolean allowClose() {
return isAllowClose;
}

/**
* closes the connection to metastore for the calling thread
* Closes the connection to metastore for the calling thread if allow to close.
* @param forceClose - Override the isAllowClose flag to forcefully close the MS connections.
*/
private void close() {
LOG.debug("Closing current thread's connection to Hive Metastore.");
if (metaStoreClient != null) {
metaStoreClient.close();
metaStoreClient = null;
}
// syncMetaStoreClient is wrapped on metaStoreClient. So, it is enough to close it once.
syncMetaStoreClient = null;
if (owner != null) {
owner = null;
public void close(boolean forceClose) {
if (allowClose() || forceClose) {
LOG.debug("Closing current thread's connection to Hive Metastore.");
if (metaStoreClient != null) {
metaStoreClient.close();
metaStoreClient = null;
}
// syncMetaStoreClient is wrapped on metaStoreClient. So, it is enough to close it once.
syncMetaStoreClient = null;
if (owner != null) {
owner = null;
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,11 @@ private final class BackgroundWork implements Runnable {
private BackgroundWork(UserGroupInformation currentUGI,
Hive parentHive,
SessionState parentSessionState, boolean asyncPrepare) {
// Note: parentHive can be shared by multiple threads and so it should be protected from any
// thread closing metastore connections when some other thread still accessing it. So, it is
// expected that allowClose flag in parentHive is set to false by caller and it will be caller's
// responsibility to close it explicitly with forceClose flag as true.
// Shall refer to sessionHive in HiveSessionImpl.java for the usage.
this.currentUGI = currentUGI;
this.parentHive = parentHive;
this.parentSessionState = parentSessionState;
Expand All @@ -310,7 +315,8 @@ public void run() {
PrivilegedExceptionAction<Object> doAsAction = new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws HiveSQLException {
Hive.set(parentHive, false);
assert (!parentHive.allowClose());
Hive.set(parentHive);
// TODO: can this result in cross-thread reuse of session state?
SessionState.setCurrentSessionState(parentSessionState);
PerfLogger.setPerfLogger(SessionState.getPerfLogger());
Expand All @@ -328,13 +334,11 @@ public Object run() throws HiveSQLException {
LOG.error("Error running hive query: ", e);
} finally {
LogUtils.unregisterLoggingContext();
Hive hiveDb = Hive.getThreadLocal();
if (hiveDb != null && hiveDb != parentHive) {
// If new hive object is created by the child thread, then we need to close it as it might
// have created a hms connection. Call Hive.closeCurrent() that closes the HMS connection, causes
// HMS connection leaks otherwise.
Hive.closeCurrent();
}

// If new hive object is created by the child thread, then we need to close it as it might
// have created a hms connection. Call Hive.closeCurrent() that closes the HMS connection, causes
// HMS connection leaks otherwise.
Hive.closeCurrent();
}
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,11 +185,10 @@ public void open(Map<String, String> sessionConfMap) throws HiveSQLException {
LOG.error(msg, e);
throw new HiveSQLException(msg, e);
}
try {
sessionHive = Hive.get(getHiveConf());
} catch (HiveException e) {
throw new HiveSQLException("Failed to get metastore connection", e);
}

// Set sessionHive object created based on sessionConf.
setSessionHive();

// Process global init file: .hiverc
processGlobalInitFile();
// Set fetch size in session conf map
Expand Down Expand Up @@ -237,6 +236,28 @@ protected int processCmd(String cmd) {
}
}

/**
* Sets sessionHive object created based on sessionConf.
* @throws HiveSQLException
*/
private void setSessionHive() throws HiveSQLException {
Hive newSessionHive;
try {
newSessionHive = Hive.get(getHiveConf());

// HMS connections from sessionHive shouldn't be closed by any query execution thread when it
// recreates the Hive object. It is allowed to be closed only when session is closed/released.
newSessionHive.setAllowClose(false);
} catch (HiveException e) {
throw new HiveSQLException("Failed to get metastore connection", e);
}

// The previous sessionHive object might still be referred by any async query execution thread.
// So, it shouldn't be closed here explicitly. Anyways, Hive object will auto-close HMS connection
// when it is garbage collected. So, it is safe to just overwrite sessionHive here.
sessionHive = newSessionHive;
}

private void processGlobalInitFile() {
IHiveFileProcessor processor = new GlobalHivercFileProcessor();

Expand Down Expand Up @@ -402,7 +423,20 @@ private synchronized void acquireAfterOpLock(boolean userAccess) {
}
// set the thread name with the logging prefix.
sessionState.updateThreadName();
Hive.set(sessionHive);

// If the thread local Hive is different from sessionHive, it means, the previous query execution in
// master thread has re-created Hive object due to changes in MS related configurations in sessionConf.
// So, it is necessary to reset sessionHive object based on new sessionConf. Here, we cannot,
// directly set sessionHive with thread local Hive because if the previous command was REPL LOAD, then
// the config changes lives only within command execution not in session level.
// So, the safer option is to invoke Hive.get() which decides if to reuse Thread local Hive or re-create it.
if (Hive.getThreadLocal() != sessionHive) {
try {
setSessionHive();
} catch (HiveSQLException e) {
throw new RuntimeException(e);
}
}
}

/**
Expand Down Expand Up @@ -777,12 +811,20 @@ public void close() throws HiveSQLException {
}
if (sessionHive != null) {
try {
Hive.closeCurrent();
sessionHive.close(true);
} catch (Throwable t) {
LOG.warn("Error closing sessionHive", t);
}
sessionHive = null;
}
try {
// The thread local Hive in master thread can be different from sessionHive if any query
// execution from master thread resets it to new Hive object due to changes in sessionConf.
// So, need to close it as well. If it is same as sessionHive, then it is just no-op.
Hive.closeCurrent();
} catch (Throwable t) {
LOG.warn("Error closing thread local Hive", t);
}
release(true, false);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,7 @@ public void testActiveSessionMetrics() throws Exception {
@Override
public void run() {
try {
Hive.set(session.getSessionHive());
OperationHandle handle = session.getTables("catalog", "schema", "table", null);
session.closeOperation(handle);
} catch (Exception e) {
Expand Down Expand Up @@ -334,6 +335,7 @@ public void testActiveSessionTimeMetrics() throws Exception {
@Override
public void run() {
try {
Hive.set(session.getSessionHive());
OperationHandle handle = session.getTables("catalog", "schema", "table", null);
session.closeOperation(handle);
} catch (Exception e) {
Expand Down

0 comments on commit 99d25f0

Please sign in to comment.