Skip to content

Commit

Permalink
Hive: Lock hardening (apache#6451)
Browse files Browse the repository at this point in the history
  • Loading branch information
pvary authored Jan 11, 2023
1 parent ba63f25 commit fede493
Show file tree
Hide file tree
Showing 15 changed files with 512 additions and 99 deletions.
27 changes: 18 additions & 9 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -159,15 +159,24 @@ Here are the catalog properties related to locking. They are used by some catalo
## Hadoop configuration

The following properties from the Hadoop configuration are used by the Hive Metastore connector.

| Property | Default | Description |
| ------------------------------------- | ---------------- | ---------------------------------------------------------------------------------- |
| iceberg.hive.client-pool-size | 5 | The size of the Hive client pool when tracking tables in HMS |
| iceberg.hive.lock-timeout-ms | 180000 (3 min) | Maximum time in milliseconds to acquire a lock |
| iceberg.hive.lock-check-min-wait-ms | 50 | Minimum time in milliseconds to check back on the status of lock acquisition |
| iceberg.hive.lock-check-max-wait-ms | 5000 | Maximum time in milliseconds to check back on the status of lock acquisition |

Note: `iceberg.hive.lock-check-max-wait-ms` should be less than the [transaction timeout](https://cwiki.apache.org/confluence/display/Hive/Configuration+Properties#ConfigurationProperties-hive.txn.timeout)
The HMS table locking is a 2-step process:
1. Lock Creation: Create lock in HMS and queue for acquisition
2. Lock Check: Check if lock successfully acquired

| Property | Default | Description |
|-------------------------------------------|-----------------|------------------------------------------------------------------------------|
| iceberg.hive.client-pool-size | 5 | The size of the Hive client pool when tracking tables in HMS |
| iceberg.hive.lock-creation-timeout-ms | 180000 (3 min) | Maximum time in milliseconds to create a lock in the HMS |
| iceberg.hive.lock-creation-min-wait-ms | 50 | Minimum time in milliseconds between retries of creating the lock in the HMS |
| iceberg.hive.lock-creation-max-wait-ms | 5000 | Maximum time in milliseconds between retries of creating the lock in the HMS |
| iceberg.hive.lock-timeout-ms | 180000 (3 min) | Maximum time in milliseconds to acquire a lock |
| iceberg.hive.lock-check-min-wait-ms | 50 | Minimum time in milliseconds between checking the acquisition of the lock |
| iceberg.hive.lock-check-max-wait-ms | 5000 | Maximum time in milliseconds between checking the acquisition of the lock |
| iceberg.hive.lock-heartbeat-interval-ms | 240000 (4 min) | The heartbeat interval for the HMS locks. |
| iceberg.hive.metadata-refresh-max-retries | 2 | Maximum number of retries when the metadata file is missing |
| iceberg.hive.table-level-lock-evict-ms | 600000 (10 min) | The timeout for the JVM table lock is |

Note: `iceberg.hive.lock-check-max-wait-ms` and `iceberg.hive.lock-heartbeat-interval-ms` should be less than the [transaction timeout](https://cwiki.apache.org/confluence/display/Hive/Configuration+Properties#ConfigurationProperties-hive.txn.timeout)
of the Hive Metastore (`hive.txn.timeout` or `metastore.txn.timeout` in the newer versions). Otherwise, the heartbeats on the lock (which happens during the lock checks) would end up expiring in the
Hive Metastore before the lock is retried from Iceberg.

Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ private static String convertToTypeString(Type type) {
return "string";
case TIMESTAMP:
Types.TimestampType timestampType = (Types.TimestampType) type;
if (MetastoreUtil.hive3PresentOnClasspath() && timestampType.shouldAdjustToUTC()) {
if (HiveVersion.min(HiveVersion.HIVE_3) && timestampType.shouldAdjustToUTC()) {
return "timestamp with local time zone";
}
return "timestamp";
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.hive;

import org.apache.hive.common.util.HiveVersionInfo;

public enum HiveVersion {
HIVE_4(4),
HIVE_3(3),
HIVE_2(2),
HIVE_1_2(1),
NOT_SUPPORTED(0);

private final int order;
private static final HiveVersion current = calculate();

HiveVersion(int order) {
this.order = order;
}

public static HiveVersion current() {
return current;
}

public static boolean min(HiveVersion other) {
return current.order >= other.order;
}

private static HiveVersion calculate() {
String version = HiveVersionInfo.getShortVersion();
String[] versions = version.split("\\.");
switch (versions[0]) {
case "4":
return HIVE_4;
case "3":
return HIVE_3;
case "2":
return HIVE_2;
case "1":
if (versions[1].equals("2")) {
return HIVE_1_2;
} else {
return NOT_SUPPORTED;
}
default:
return NOT_SUPPORTED;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,6 @@
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;

public class MetastoreUtil {

// this class is unique to Hive3 and cannot be found in Hive2, therefore a good proxy to see if
// we are working against Hive3 dependencies
private static final String HIVE3_UNIQUE_CLASS =
"org.apache.hadoop.hive.serde2.io.DateWritableV2";

private static final DynMethods.UnboundMethod ALTER_TABLE =
DynMethods.builder("alter_table")
.impl(
Expand All @@ -51,15 +45,8 @@ public class MetastoreUtil {
.impl(IMetaStoreClient.class, "alter_table", String.class, String.class, Table.class)
.build();

private static final boolean HIVE3_PRESENT_ON_CLASSPATH = detectHive3();

private MetastoreUtil() {}

/** Returns true if Hive3 dependencies are found on classpath, false otherwise. */
public static boolean hive3PresentOnClasspath() {
return HIVE3_PRESENT_ON_CLASSPATH;
}

/**
* Calls alter_table method using the metastore client. If possible, an environmental context will
* be used that turns off stats updates to avoid recursive listing.
Expand All @@ -71,13 +58,4 @@ public static void alterTable(
ImmutableMap.of(StatsSetupConst.DO_NOT_UPDATE_STATS, StatsSetupConst.TRUE));
ALTER_TABLE.invoke(client, databaseName, tblName, table, envContext);
}

private static boolean detectHive3() {
try {
Class.forName(HIVE3_UNIQUE_CLASS);
return true;
} catch (ClassNotFoundException e) {
return false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,15 @@
import org.apache.hadoop.hive.metastore.api.LockRequest;
import org.apache.hadoop.hive.metastore.api.LockResponse;
import org.apache.hadoop.hive.metastore.api.LockState;
import org.apache.hadoop.hive.metastore.api.ShowLocksResponse;
import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement;
import org.apache.iceberg.AssertHelpers;
import org.apache.iceberg.HasTableOperations;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.exceptions.CommitFailedException;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Types;
import org.apache.thrift.TException;
import org.junit.AfterClass;
Expand All @@ -59,6 +62,7 @@
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.AdditionalAnswers;
import org.mockito.ArgumentCaptor;
import org.mockito.invocation.InvocationOnMock;

public class TestHiveCommitLocks extends HiveTableBaseTest {
Expand All @@ -76,6 +80,7 @@ public class TestHiveCommitLocks extends HiveTableBaseTest {
LockResponse waitLockResponse = new LockResponse(dummyLockId, LockState.WAITING);
LockResponse acquiredLockResponse = new LockResponse(dummyLockId, LockState.ACQUIRED);
LockResponse notAcquiredLockResponse = new LockResponse(dummyLockId, LockState.NOT_ACQUIRED);
ShowLocksResponse emptyLocks = new ShowLocksResponse(Lists.newArrayList());

@BeforeClass
public static void startMetastore() throws Exception {
Expand Down Expand Up @@ -137,6 +142,7 @@ public void before() throws Exception {
catalog.name(),
dbName,
tableName));
reset(spyClient);
}

@AfterClass
Expand Down Expand Up @@ -178,6 +184,182 @@ public void testLockAcquisitionAfterRetries() throws TException, InterruptedExce
Assert.assertEquals(1, spyOps.current().schema().columns().size()); // should be 1 again
}

@Test
public void testLockAcquisitionAfterFailedNotFoundLock() throws TException, InterruptedException {
doReturn(emptyLocks).when(spyClient).showLocks(any());
doThrow(new TException("Failed to connect to HMS"))
.doReturn(waitLockResponse)
.when(spyClient)
.lock(any());
doReturn(waitLockResponse)
.doReturn(acquiredLockResponse)
.when(spyClient)
.checkLock(eq(dummyLockId));
doNothing().when(spyOps).doUnlock(eq(dummyLockId));
doNothing().when(spyClient).heartbeat(eq(0L), eq(dummyLockId));

spyOps.doCommit(metadataV2, metadataV1);

Assert.assertEquals(1, spyOps.current().schema().columns().size()); // should be 1 again
}

@Test
public void testLockAcquisitionAfterFailedAndFoundLock() throws TException, InterruptedException {
ArgumentCaptor<LockRequest> lockRequestCaptor = ArgumentCaptor.forClass(LockRequest.class);
doReturn(emptyLocks).when(spyClient).showLocks(any());
doThrow(new TException("Failed to connect to HMS"))
.doReturn(waitLockResponse)
.when(spyClient)
.lock(lockRequestCaptor.capture());

// Capture the lockRequest, and generate a response simulating that we have a lock
ShowLocksResponse showLocksResponse = new ShowLocksResponse(Lists.newArrayList());
ShowLocksResponseElement showLocksElement =
new ShowLocksResponseElementWrapper(lockRequestCaptor);
showLocksResponse.getLocks().add(showLocksElement);

doReturn(showLocksResponse).when(spyClient).showLocks(any());
doReturn(acquiredLockResponse).when(spyClient).checkLock(eq(dummyLockId));
doNothing().when(spyOps).doUnlock(eq(dummyLockId));
doNothing().when(spyClient).heartbeat(eq(0L), eq(dummyLockId));

spyOps.doCommit(metadataV2, metadataV1);

Assert.assertEquals(1, spyOps.current().schema().columns().size()); // should be 1 again
}

@Test
public void testUnLock() throws TException {
doReturn(waitLockResponse).when(spyClient).lock(any());
doReturn(acquiredLockResponse).when(spyClient).checkLock(eq(dummyLockId));
doNothing().when(spyClient).unlock(eq(dummyLockId));
doNothing().when(spyClient).heartbeat(eq(0L), eq(dummyLockId));

spyOps.doCommit(metadataV2, metadataV1);

verify(spyClient, times(1)).unlock(eq(dummyLockId));
}

@Test
public void testUnLockInterruptedUnLock() throws TException {
doReturn(waitLockResponse).when(spyClient).lock(any());
doReturn(acquiredLockResponse).when(spyClient).checkLock(eq(dummyLockId));
doAnswer(
invocation -> {
throw new InterruptedException("Interrupt test");
})
.doNothing()
.when(spyClient)
.unlock(eq(dummyLockId));
doNothing().when(spyClient).heartbeat(eq(0L), eq(dummyLockId));

spyOps.doCommit(metadataV2, metadataV1);

verify(spyClient, times(2)).unlock(eq(dummyLockId));
}

@Test
public void testUnLockAfterInterruptedLock() throws TException {
ArgumentCaptor<LockRequest> lockRequestCaptor = ArgumentCaptor.forClass(LockRequest.class);
doAnswer(
invocation -> {
throw new InterruptedException("Interrupt test");
})
.when(spyClient)
.lock(lockRequestCaptor.capture());

// Capture the lockRequest, and generate a response simulating that we have a lock
ShowLocksResponse showLocksResponse = new ShowLocksResponse(Lists.newArrayList());
ShowLocksResponseElement showLocksElement =
new ShowLocksResponseElementWrapper(lockRequestCaptor);
showLocksResponse.getLocks().add(showLocksElement);

doReturn(showLocksResponse).when(spyClient).showLocks(any());
doReturn(acquiredLockResponse).when(spyClient).checkLock(eq(dummyLockId));
doNothing().when(spyClient).unlock(eq(dummyLockId));
doNothing().when(spyClient).heartbeat(eq(0L), eq(dummyLockId));

AssertHelpers.assertThrows(
"Expected an exception",
RuntimeException.class,
"Interrupted while acquiring lock",
() -> spyOps.doCommit(metadataV2, metadataV1));

verify(spyClient, times(1)).unlock(eq(dummyLockId));
// Make sure that we exit the lock loop on InterruptedException
verify(spyClient, times(1)).lock(any());
}

@Test
public void testUnLockAfterInterruptedLockCheck() throws TException {
doReturn(waitLockResponse).when(spyClient).lock(any());
doAnswer(
invocation -> {
throw new InterruptedException("Interrupt test");
})
.when(spyClient)
.checkLock(eq(dummyLockId));

doNothing().when(spyClient).unlock(eq(dummyLockId));
doNothing().when(spyClient).heartbeat(eq(0L), eq(dummyLockId));

AssertHelpers.assertThrows(
"Expected an exception",
RuntimeException.class,
"Could not acquire the lock on",
() -> spyOps.doCommit(metadataV2, metadataV1));

verify(spyClient, times(1)).unlock(eq(dummyLockId));
// Make sure that we exit the checkLock loop on InterruptedException
verify(spyClient, times(1)).checkLock(eq(dummyLockId));
}

@Test
public void testUnLockAfterInterruptedGetTable() throws TException {
doReturn(acquiredLockResponse).when(spyClient).lock(any());
doAnswer(
invocation -> {
throw new InterruptedException("Interrupt test");
})
.when(spyClient)
.getTable(any(), any());

doNothing().when(spyClient).unlock(eq(dummyLockId));
doNothing().when(spyClient).heartbeat(eq(0L), eq(dummyLockId));

AssertHelpers.assertThrows(
"Expected an exception",
RuntimeException.class,
"Interrupted during commit",
() -> spyOps.doCommit(metadataV2, metadataV1));

verify(spyClient, times(1)).unlock(eq(dummyLockId));
}

/** Wraps an ArgumentCaptor to provide data based on the request */
private class ShowLocksResponseElementWrapper extends ShowLocksResponseElement {
private ArgumentCaptor<LockRequest> wrapped;

private ShowLocksResponseElementWrapper(ArgumentCaptor<LockRequest> wrapped) {
this.wrapped = wrapped;
}

@Override
public String getAgentInfo() {
return wrapped.getValue().getAgentInfo();
}

@Override
public LockState getState() {
return LockState.WAITING;
}

@Override
public long getLockid() {
return dummyLockId;
}
}

@Test
public void testLockFailureAtFirstTime() throws TException {
doReturn(notAcquiredLockResponse).when(spyClient).lock(any());
Expand Down Expand Up @@ -286,11 +468,11 @@ public void testTableLevelProcessLockBlocksConcurrentHMSRequestsForSameTable() t
}

@Test
public void testLockHeartbeat() throws TException {
public void testLockHeartbeat() throws TException, InterruptedException {
doReturn(acquiredLockResponse).when(spyClient).lock(any());
doAnswer(AdditionalAnswers.answersWithDelay(2000, InvocationOnMock::callRealMethod))
.when(spyClient)
.getTable(any(), any());
.when(spyOps)
.loadHmsTable();
doNothing().when(spyClient).heartbeat(eq(0L), eq(dummyLockId));

spyOps.doCommit(metadataV2, metadataV1);
Expand Down
Loading

0 comments on commit fede493

Please sign in to comment.