Skip to content

Commit

Permalink
Hive: Support customizable ClientPool (apache#6698)
Browse files Browse the repository at this point in the history
  • Loading branch information
lirui-apache authored Mar 24, 2023
1 parent 25360c0 commit ef5c731
Show file tree
Hide file tree
Showing 6 changed files with 325 additions and 11 deletions.
2 changes: 2 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -597,6 +597,8 @@ project(':iceberg-hive-metastore') {
implementation project(':iceberg-core')
api project(':iceberg-api')
implementation project(':iceberg-common')
annotationProcessor "org.immutables:value"
compileOnly "org.immutables:value"

implementation "com.github.ben-manes.caffeine:caffeine"

Expand Down
7 changes: 7 additions & 0 deletions core/src/main/java/org/apache/iceberg/CatalogProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,13 @@ private CatalogProperties() {}
"client.pool.cache.eviction-interval-ms";
public static final long CLIENT_POOL_CACHE_EVICTION_INTERVAL_MS_DEFAULT =
TimeUnit.MINUTES.toMillis(5);
/**
* A comma separated list of elements used, in addition to the {@link #URI}, to compose the key of
* the client pool cache.
*
* <p>Supported key elements in a Catalog are implementation-dependent.
*/
public static final String CLIENT_POOL_CACHE_KEYS = "client-pool-cache-keys";

public static final String LOCK_IMPL = "lock-impl";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,29 +20,62 @@

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Comparator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.ClientPool;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.thrift.TException;
import org.immutables.value.Value;

/**
* A ClientPool that caches the underlying HiveClientPool instances.
*
* <p>The following key elements are supported and can be specified via {@link
* CatalogProperties#CLIENT_POOL_CACHE_KEYS}:
*
* <ul>
* <li>ugi - the Hadoop UserGroupInformation instance that represents the current user using the
* cache.
* <li>user_name - similar to UGI but only includes the user's name determined by
* UserGroupInformation#getUserName.
* <li>conf - name of an arbitrary configuration. The value of the configuration will be extracted
* from catalog properties and added to the cache key. A conf element should start with a
* "conf:" prefix which is followed by the configuration name. E.g. specifying
* "conf:metastore.catalog.default" will add "metastore.catalog.default" to the key, and so
* that configurations with different default catalog wouldn't share the same client pool.
* Multiple conf elements can be specified.
* </ul>
*/
public class CachedClientPool implements ClientPool<IMetaStoreClient, TException> {

private static Cache<String, HiveClientPool> clientPoolCache;
private static final String CONF_ELEMENT_PREFIX = "conf:";

private static Cache<Key, HiveClientPool> clientPoolCache;

private final Configuration conf;
private final String metastoreUri;
private final int clientPoolSize;
private final long evictionInterval;
private final Key key;

CachedClientPool(Configuration conf, Map<String, String> properties) {
this.conf = conf;
this.metastoreUri = conf.get(HiveConf.ConfVars.METASTOREURIS.varname, "");
this.clientPoolSize =
PropertyUtil.propertyAsInt(
properties,
Expand All @@ -53,26 +86,27 @@ public class CachedClientPool implements ClientPool<IMetaStoreClient, TException
properties,
CatalogProperties.CLIENT_POOL_CACHE_EVICTION_INTERVAL_MS,
CatalogProperties.CLIENT_POOL_CACHE_EVICTION_INTERVAL_MS_DEFAULT);
this.key = extractKey(properties.get(CatalogProperties.CLIENT_POOL_CACHE_KEYS), conf);
init();
}

@VisibleForTesting
HiveClientPool clientPool() {
return clientPoolCache.get(metastoreUri, k -> new HiveClientPool(clientPoolSize, conf));
return clientPoolCache.get(key, k -> new HiveClientPool(clientPoolSize, conf));
}

private synchronized void init() {
if (clientPoolCache == null) {
clientPoolCache =
Caffeine.newBuilder()
.expireAfterAccess(evictionInterval, TimeUnit.MILLISECONDS)
.removalListener((key, value, cause) -> ((HiveClientPool) value).close())
.removalListener((ignored, value, cause) -> ((HiveClientPool) value).close())
.build();
}
}

@VisibleForTesting
static Cache<String, HiveClientPool> clientPoolCache() {
static Cache<Key, HiveClientPool> clientPoolCache() {
return clientPoolCache;
}

Expand All @@ -87,4 +121,90 @@ public <R> R run(Action<R, IMetaStoreClient, TException> action, boolean retry)
throws TException, InterruptedException {
return clientPool().run(action, retry);
}

@VisibleForTesting
static Key extractKey(String cacheKeys, Configuration conf) {
// generate key elements in a certain order, so that the Key instances are comparable
List<Object> elements = Lists.newArrayList();
elements.add(conf.get(HiveConf.ConfVars.METASTOREURIS.varname, ""));
if (cacheKeys == null || cacheKeys.isEmpty()) {
return Key.of(elements);
}

Set<KeyElementType> types = Sets.newTreeSet(Comparator.comparingInt(Enum::ordinal));
Map<String, String> confElements = Maps.newTreeMap();
for (String element : cacheKeys.split(",", -1)) {
String trimmed = element.trim();
if (trimmed.toLowerCase(Locale.ROOT).startsWith(CONF_ELEMENT_PREFIX)) {
String key = trimmed.substring(CONF_ELEMENT_PREFIX.length());
ValidationException.check(
!confElements.containsKey(key), "Conf key element %s already specified", key);
confElements.put(key, conf.get(key));
} else {
KeyElementType type = KeyElementType.valueOf(trimmed.toUpperCase());
switch (type) {
case UGI:
case USER_NAME:
ValidationException.check(
!types.contains(type), "%s key element already specified", type.name());
types.add(type);
break;
default:
throw new ValidationException("Unknown key element %s", trimmed);
}
}
}
for (KeyElementType type : types) {
switch (type) {
case UGI:
try {
elements.add(UserGroupInformation.getCurrentUser());
} catch (IOException e) {
throw new UncheckedIOException(e);
}
break;
case USER_NAME:
try {
elements.add(UserGroupInformation.getCurrentUser().getUserName());
} catch (IOException e) {
throw new UncheckedIOException(e);
}
break;
default:
throw new RuntimeException("Unexpected key element " + type.name());
}
}
for (String key : confElements.keySet()) {
elements.add(ConfElement.of(key, confElements.get(key)));
}
return Key.of(elements);
}

@Value.Immutable
abstract static class Key {

abstract List<Object> elements();

private static Key of(Iterable<?> elements) {
return ImmutableKey.builder().elements(elements).build();
}
}

@Value.Immutable
abstract static class ConfElement {
abstract String key();

@Nullable
abstract String value();

static ConfElement of(String key, String value) {
return ImmutableConfElement.builder().key(key).value(value).build();
}
}

private enum KeyElementType {
UGI,
USER_NAME,
CONF
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -603,4 +603,9 @@ protected Map<String, String> properties() {
void setListAllTables(boolean listAllTables) {
this.listAllTables = listAllTables;
}

@VisibleForTesting
ClientPool<IMetaStoreClient, TException> clientPool() {
return clients;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,99 @@
*/
package org.apache.iceberg.hive;

import java.security.PrivilegedAction;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.hive.CachedClientPool.Key;
import org.assertj.core.api.Assertions;
import org.junit.Assert;
import org.junit.Test;

public class TestCachedClientPool extends HiveMetastoreTest {

@Test
public void testClientPoolCleaner() throws InterruptedException {
String metastoreUri = hiveConf.get(HiveConf.ConfVars.METASTOREURIS.varname, "");
CachedClientPool clientPool = new CachedClientPool(hiveConf, Collections.emptyMap());
HiveClientPool clientPool1 = clientPool.clientPool();
Assert.assertTrue(CachedClientPool.clientPoolCache().getIfPresent(metastoreUri) == clientPool1);
Assertions.assertThat(
CachedClientPool.clientPoolCache()
.getIfPresent(CachedClientPool.extractKey(null, hiveConf)))
.isSameAs(clientPool1);
TimeUnit.MILLISECONDS.sleep(EVICTION_INTERVAL - TimeUnit.SECONDS.toMillis(2));
HiveClientPool clientPool2 = clientPool.clientPool();
Assert.assertTrue(clientPool1 == clientPool2);
Assert.assertSame(clientPool1, clientPool2);
TimeUnit.MILLISECONDS.sleep(EVICTION_INTERVAL + TimeUnit.SECONDS.toMillis(5));
Assert.assertNull(CachedClientPool.clientPoolCache().getIfPresent(metastoreUri));
Assert.assertNull(
CachedClientPool.clientPoolCache()
.getIfPresent(CachedClientPool.extractKey(null, hiveConf)));
}

@Test
public void testCacheKey() throws Exception {
UserGroupInformation current = UserGroupInformation.getCurrentUser();
UserGroupInformation foo1 = UserGroupInformation.createProxyUser("foo", current);
UserGroupInformation foo2 = UserGroupInformation.createProxyUser("foo", current);
UserGroupInformation bar = UserGroupInformation.createProxyUser("bar", current);

Key key1 =
foo1.doAs(
(PrivilegedAction<Key>)
() -> CachedClientPool.extractKey("user_name,conf:key1", hiveConf));
Key key2 =
foo2.doAs(
(PrivilegedAction<Key>)
() -> CachedClientPool.extractKey("conf:key1,user_name", hiveConf));
Assert.assertEquals("Key elements order shouldn't matter", key1, key2);

key1 = foo1.doAs((PrivilegedAction<Key>) () -> CachedClientPool.extractKey("ugi", hiveConf));
key2 = bar.doAs((PrivilegedAction<Key>) () -> CachedClientPool.extractKey("ugi", hiveConf));
Assert.assertNotEquals("Different users are not supposed to be equivalent", key1, key2);

key2 = foo2.doAs((PrivilegedAction<Key>) () -> CachedClientPool.extractKey("ugi", hiveConf));
Assert.assertNotEquals("Different UGI instances are not supposed to be equivalent", key1, key2);

key1 = CachedClientPool.extractKey("ugi", hiveConf);
key2 = CachedClientPool.extractKey("ugi,conf:key1", hiveConf);
Assert.assertNotEquals(
"Keys with different number of elements are not supposed to be equivalent", key1, key2);

Configuration conf1 = new Configuration(hiveConf);
Configuration conf2 = new Configuration(hiveConf);

conf1.set("key1", "val");
key1 = CachedClientPool.extractKey("conf:key1", conf1);
key2 = CachedClientPool.extractKey("conf:key1", conf2);
Assert.assertNotEquals(
"Config with different values are not supposed to be equivalent", key1, key2);

conf2.set("key1", "val");
conf2.set("key2", "val");
key2 = CachedClientPool.extractKey("conf:key2", conf2);
Assert.assertNotEquals(
"Config with different keys are not supposed to be equivalent", key1, key2);

key1 = CachedClientPool.extractKey("conf:key1,ugi", conf1);
key2 = CachedClientPool.extractKey("ugi,conf:key1", conf2);
Assert.assertEquals("Config with same key/value should be equivalent", key1, key2);

conf1.set("key2", "val");
key1 = CachedClientPool.extractKey("conf:key2 ,conf:key1", conf1);
key2 = CachedClientPool.extractKey("conf:key2,conf:key1", conf2);
Assert.assertEquals("Config with same key/value should be equivalent", key1, key2);

Assertions.assertThatThrownBy(
() -> CachedClientPool.extractKey("ugi,ugi", hiveConf),
"Duplicate key elements should result in an error")
.isInstanceOf(ValidationException.class)
.hasMessageContaining("UGI key element already specified");

Assertions.assertThatThrownBy(
() -> CachedClientPool.extractKey("conf:k1,conf:k2,CONF:k1", hiveConf),
"Duplicate conf key elements should result in an error")
.isInstanceOf(ValidationException.class)
.hasMessageContaining("Conf key element k1 already specified");
}
}
Loading

0 comments on commit ef5c731

Please sign in to comment.