Skip to content

Commit

Permalink
HBASE-14370 Use separate thread for calling ZKPermissionWatcher#refre…
Browse files Browse the repository at this point in the history
…shNodes()
  • Loading branch information
tedyu committed Sep 11, 2015
1 parent c94d109 commit dff5243
Show file tree
Hide file tree
Showing 8 changed files with 470 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -971,7 +971,7 @@ public void start(CoprocessorEnvironment env) throws IOException {
// throw RuntimeException so that the coprocessor is unloaded.
if (zk != null) {
try {
this.authManager = TableAuthManager.get(zk, env.getConfiguration());
this.authManager = TableAuthManager.getOrCreate(zk, env.getConfiguration());
} catch (IOException ioe) {
throw new RuntimeException("Error obtaining TableAuthManager", ioe);
}
Expand All @@ -984,7 +984,9 @@ public void start(CoprocessorEnvironment env) throws IOException {

@Override
public void stop(CoprocessorEnvironment env) {

if (this.authManager != null) {
TableAuthManager.release(authManager);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hadoop.hbase.security.access;

import java.io.Closeable;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
Expand All @@ -40,6 +41,7 @@
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Lists;
Expand All @@ -48,7 +50,7 @@
* Performs authorization checks for a given user's assigned permissions
*/
@InterfaceAudience.Private
public class TableAuthManager {
public class TableAuthManager implements Closeable {
private static class PermissionCache<T extends Permission> {
/** Cache of user permissions */
private ListMultimap<String,T> userCache = ArrayListMultimap.create();
Expand Down Expand Up @@ -95,8 +97,6 @@ public ListMultimap<String,T> getAllPermissions() {

private static final Log LOG = LogFactory.getLog(TableAuthManager.class);

private static TableAuthManager instance;

/** Cache of global permissions */
private volatile PermissionCache<Permission> globalCache;

Expand Down Expand Up @@ -125,6 +125,11 @@ private TableAuthManager(ZooKeeperWatcher watcher, Configuration conf)
}
}

@Override
public void close() {
this.zkperms.close();
}

/**
* Returns a new {@code PermissionCache} initialized with permission assignments
* from the {@code hbase.superuser} configuration key.
Expand Down Expand Up @@ -739,16 +744,54 @@ public long getMTime() {
return mtime.get();
}

static Map<ZooKeeperWatcher,TableAuthManager> managerMap =
private static Map<ZooKeeperWatcher,TableAuthManager> managerMap =
new HashMap<ZooKeeperWatcher,TableAuthManager>();

public synchronized static TableAuthManager get(
private static Map<TableAuthManager, Integer> refCount = new HashMap<>();

/** Returns a TableAuthManager from the cache. If not cached, constructs a new one. Returned
* instance should be released back by calling {@link #release(TableAuthManager)}. */
public synchronized static TableAuthManager getOrCreate(
ZooKeeperWatcher watcher, Configuration conf) throws IOException {
instance = managerMap.get(watcher);
TableAuthManager instance = managerMap.get(watcher);
if (instance == null) {
instance = new TableAuthManager(watcher, conf);
managerMap.put(watcher, instance);
}
int ref = refCount.get(instance) == null ? 0 : refCount.get(instance).intValue();
refCount.put(instance, ref + 1);
return instance;
}

@VisibleForTesting
static int getTotalRefCount() {
int total = 0;
for (int count : refCount.values()) {
total += count;
}
return total;
}

/**
* Releases the resources for the given TableAuthManager if the reference count is down to 0.
* @param instance TableAuthManager to be released
*/
public synchronized static void release(TableAuthManager instance) {
if (refCount.get(instance) == null || refCount.get(instance) < 1) {
String msg = "Something wrong with the TableAuthManager reference counting: " + instance
+ " whose count is " + refCount.get(instance);
LOG.fatal(msg);
instance.close();
managerMap.remove(instance.getZKPermissionWatcher().getWatcher());
instance.getZKPermissionWatcher().getWatcher().abort(msg, null);
} else {
int ref = refCount.get(instance);
refCount.put(instance, ref-1);
if (ref-1 == 0) {
instance.close();
managerMap.remove(instance.getZKPermissionWatcher().getWatcher());
refCount.remove(instance);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,23 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.DaemonThreadFactory;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException;

import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;

/**
* Handles synchronization of access control list entries and updates
Expand All @@ -43,42 +50,68 @@
* trigger updates in the {@link TableAuthManager} permission cache.
*/
@InterfaceAudience.Private
public class ZKPermissionWatcher extends ZooKeeperListener {
public class ZKPermissionWatcher extends ZooKeeperListener implements Closeable {
private static final Log LOG = LogFactory.getLog(ZKPermissionWatcher.class);
// parent node for permissions lists
static final String ACL_NODE = "acl";
TableAuthManager authManager;
String aclZNode;
CountDownLatch initialized = new CountDownLatch(1);
AtomicReference<List<ZKUtil.NodeAndData>> nodes =
new AtomicReference<List<ZKUtil.NodeAndData>>(null);
ExecutorService executor;

public ZKPermissionWatcher(ZooKeeperWatcher watcher,
TableAuthManager authManager, Configuration conf) {
super(watcher);
this.authManager = authManager;
String aclZnodeParent = conf.get("zookeeper.znode.acl.parent", ACL_NODE);
this.aclZNode = ZKUtil.joinZNode(watcher.baseZNode, aclZnodeParent);
executor = Executors.newSingleThreadExecutor(
new DaemonThreadFactory("zk-permission-watcher"));
}

public void start() throws KeeperException {
try {
watcher.registerListener(this);
if (ZKUtil.watchAndCheckExists(watcher, aclZNode)) {
List<ZKUtil.NodeAndData> existing =
ZKUtil.getChildDataAndWatchForNewChildren(watcher, aclZNode);
if (existing != null) {
refreshNodes(existing);
try {
executor.submit(new Callable<Void>() {
@Override
public Void call() throws KeeperException {
List<ZKUtil.NodeAndData> existing =
ZKUtil.getChildDataAndWatchForNewChildren(watcher, aclZNode);
if (existing != null) {
refreshNodes(existing, null);
}
return null;
}
}).get();
} catch (ExecutionException ex) {
if (ex.getCause() instanceof KeeperException) {
throw (KeeperException)ex.getCause();
} else {
throw new RuntimeException(ex.getCause());
}
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
} finally {
initialized.countDown();
}
}

@Override
public void close() {
executor.shutdown();
}

private void waitUntilStarted() {
try {
initialized.await();
} catch (InterruptedException e) {
LOG.warn("Interrupted while waiting", e);
LOG.warn("Interrupted while waiting for start", e);
Thread.currentThread().interrupt();
}
}
Expand All @@ -87,68 +120,103 @@ private void waitUntilStarted() {
public void nodeCreated(String path) {
waitUntilStarted();
if (path.equals(aclZNode)) {
try {
List<ZKUtil.NodeAndData> nodes =
ZKUtil.getChildDataAndWatchForNewChildren(watcher, aclZNode);
refreshNodes(nodes);
} catch (KeeperException ke) {
LOG.error("Error reading data from zookeeper", ke);
// only option is to abort
watcher.abort("Zookeeper error obtaining acl node children", ke);
}
executor.submit(new Runnable() {
@Override
public void run() {
try {
List<ZKUtil.NodeAndData> nodes =
ZKUtil.getChildDataAndWatchForNewChildren(watcher, aclZNode);
refreshNodes(nodes, null);
} catch (KeeperException ke) {
LOG.error("Error reading data from zookeeper", ke);
// only option is to abort
watcher.abort("Zookeeper error obtaining acl node children", ke);
}
}
});
}
}

@Override
public void nodeDeleted(String path) {
public void nodeDeleted(final String path) {
waitUntilStarted();
if (aclZNode.equals(ZKUtil.getParent(path))) {
String table = ZKUtil.getNodeName(path);
if(AccessControlLists.isNamespaceEntry(table)) {
authManager.removeNamespace(Bytes.toBytes(table));
} else {
authManager.removeTable(TableName.valueOf(table));
}
executor.submit(new Runnable() {
@Override
public void run() {
String table = ZKUtil.getNodeName(path);
if(AccessControlLists.isNamespaceEntry(table)) {
authManager.removeNamespace(Bytes.toBytes(table));
} else {
authManager.removeTable(TableName.valueOf(table));
}
}
});
}
}

@Override
public void nodeDataChanged(String path) {
public void nodeDataChanged(final String path) {
waitUntilStarted();
if (aclZNode.equals(ZKUtil.getParent(path))) {
// update cache on an existing table node
String entry = ZKUtil.getNodeName(path);
try {
byte[] data = ZKUtil.getDataAndWatch(watcher, path);
refreshAuthManager(entry, data);
} catch (KeeperException ke) {
LOG.error("Error reading data from zookeeper for node " + entry, ke);
// only option is to abort
watcher.abort("Zookeeper error getting data for node " + entry, ke);
} catch (IOException ioe) {
LOG.error("Error reading permissions writables", ioe);
}
executor.submit(new Runnable() {
@Override
public void run() {
// update cache on an existing table node
String entry = ZKUtil.getNodeName(path);
try {
byte[] data = ZKUtil.getDataAndWatch(watcher, path);
refreshAuthManager(entry, data);
} catch (KeeperException ke) {
LOG.error("Error reading data from zookeeper for node " + entry, ke);
// only option is to abort
watcher.abort("Zookeeper error getting data for node " + entry, ke);
} catch (IOException ioe) {
LOG.error("Error reading permissions writables", ioe);
}
}
});
}
}

@Override
public void nodeChildrenChanged(String path) {
public void nodeChildrenChanged(final String path) {
waitUntilStarted();
if (path.equals(aclZNode)) {
// table permissions changed
try {
List<ZKUtil.NodeAndData> nodes =
List<ZKUtil.NodeAndData> nodeList =
ZKUtil.getChildDataAndWatchForNewChildren(watcher, aclZNode);
refreshNodes(nodes);
while (!nodes.compareAndSet(null, nodeList)) {
try {
Thread.sleep(20);
} catch (InterruptedException e) {
LOG.warn("Interrupted while setting node list", e);
Thread.currentThread().interrupt();
}
}
} catch (KeeperException ke) {
LOG.error("Error reading data from zookeeper for path "+path, ke);
watcher.abort("Zookeeper error get node children for path "+path, ke);
}
executor.submit(new Runnable() {
// allows subsequent nodeChildrenChanged event to preempt current processing of
// nodeChildrenChanged event
@Override
public void run() {
List<ZKUtil.NodeAndData> nodeList = nodes.get();
nodes.set(null);
refreshNodes(nodeList, nodes);
}
});
}
}

private void refreshNodes(List<ZKUtil.NodeAndData> nodes) {
private void refreshNodes(List<ZKUtil.NodeAndData> nodes, AtomicReference ref) {
for (ZKUtil.NodeAndData n : nodes) {
if (ref != null && ref.get() != null) {
// there is a newer list
break;
}
if (n.isEmpty()) continue;
String path = n.getNode();
String entry = (ZKUtil.getNodeName(path));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,18 @@ public static void enableSecurity(Configuration conf) throws IOException {
}

public static void verifyConfiguration(Configuration conf) {
String coprocs = conf.get(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY);
boolean accessControllerLoaded = false;
for (String coproc : coprocs.split(",")) {
try {
accessControllerLoaded = AccessController.class.isAssignableFrom(Class.forName(coproc));
if (accessControllerLoaded) break;
} catch (ClassNotFoundException cnfe) {
}
}
if (!(conf.get(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY).contains(
AccessController.class.getName())
&& conf.get(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY).contains(
AccessController.class.getName()) && conf.get(
&& accessControllerLoaded && conf.get(
CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY).contains(
AccessController.class.getName()))) {
throw new RuntimeException("AccessController is missing from a system coprocessor list");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,8 @@ public static void setupBeforeClass() throws Exception {
public static void tearDownAfterClass() throws Exception {
cleanUp();
TEST_UTIL.shutdownMiniCluster();
int total = TableAuthManager.getTotalRefCount();
assertTrue("Unexpected reference count: " + total, total == 0);
}

private static void setUpTableAndUserPermissions() throws Exception {
Expand Down
Loading

0 comments on commit dff5243

Please sign in to comment.