diff --git a/pom.xml b/pom.xml index d729b40..9f2e98a 100644 --- a/pom.xml +++ b/pom.xml @@ -4,7 +4,7 @@ org.scale7 scale7-cages jar - 0.6.1-SNAPSHOT + 0.7.0-SNAPSHOT Cages - A library for distributed locking and synchronization using ZooKeeper http://github.com/s7 @@ -45,6 +45,11 @@ + + commons-lang + commons-lang + 2.3 + @@ -55,7 +60,7 @@ maven.scale7.org Scale7 Maven Repo - http://github.com/s7/mvnrepo/raw/master + https://github.com/s7/mvnrepo/raw/master true diff --git a/src/main/java/org/scale7/zookeeper/cages/ISinglePathLock.java b/src/main/java/org/scale7/zookeeper/cages/ISinglePathLock.java index d7b9d19..6637629 100644 --- a/src/main/java/org/scale7/zookeeper/cages/ISinglePathLock.java +++ b/src/main/java/org/scale7/zookeeper/cages/ISinglePathLock.java @@ -1,9 +1,11 @@ package org.scale7.zookeeper.cages; -public interface ISinglePathLock extends ILock { +public interface ISinglePathLock extends ILock, Comparable { /** * Get the path that a lock is operating against * @return The path the lock is attempting to lock */ String getLockPath(); + + } diff --git a/src/main/java/org/scale7/zookeeper/cages/ZkLockBase.java b/src/main/java/org/scale7/zookeeper/cages/ZkLockBase.java index 5804703..ad2ee3b 100644 --- a/src/main/java/org/scale7/zookeeper/cages/ZkLockBase.java +++ b/src/main/java/org/scale7/zookeeper/cages/ZkLockBase.java @@ -17,9 +17,9 @@ /** * Base class for single path read and write locks. - * - * TODO implement timeouts as basic means to rollback from deadlock. - * + * + * TODO implement timeouts as basic means to rollback from deadlock. + * * @author dominicwilliams * */ @@ -34,7 +34,7 @@ public abstract class ZkLockBase extends ZkSyncPrimitive implements ISinglePathL private boolean tryAcquireOnly; private volatile LockState lockState; private Integer mutex; - + public ZkLockBase(String lockPath) { super(ZkSessionManager.instance()); PathUtils.validatePath(lockPath); @@ -50,7 +50,7 @@ public void acquire() throws ZkCagesException, InterruptedException { createRootPath(lockPath); waitSynchronized(); } - + /** {@inheritDoc} */ @Override public void acquire(ILockListener listener, Object context) throws ZkCagesException, InterruptedException { @@ -61,7 +61,7 @@ public void acquire(ILockListener listener, Object context) throws ZkCagesExcept addDieListener(reportDieToListener); createRootPath(lockPath); } - + /** {@inheritDoc} */ @Override public boolean tryAcquire() throws ZkCagesException, InterruptedException { @@ -71,7 +71,7 @@ public boolean tryAcquire() throws ZkCagesException, InterruptedException { waitSynchronized(); return lockState == LockState.Acquired; } - + /** {@inheritDoc} */ @Override public void tryAcquire(ITryLockListener listener, Object context) throws ZkCagesException, InterruptedException { @@ -83,13 +83,13 @@ public void tryAcquire(ITryLockListener listener, Object context) throws ZkCages addDieListener(reportDieToListener); createRootPath(lockPath); } - + /** {@inheritDoc} */ @Override public void release() { safeLockState(LockState.Released); } - + /** {@inheritDoc} */ @Override public LockState getState() @@ -105,14 +105,20 @@ public LockState getState() public String getLockPath() { return zkPath.getPath(); } - + + @Override + public int compareTo(ISinglePathLock other) { + int result = getLockPath().compareTo(other.getLockPath()); + return result == 0 ? (getType() == other.getType() ? 1 : 0): result; + } + private void createRootPath(String path) throws InterruptedException { zkPath = new ZkPath(path, CreateMode.PERSISTENT); // TODO for now only persistent ZK nodes can have children. fix this. zkPath.addUpdateListener(createLockNode, true); - zkPath.addDieListener(onLockPathError); + zkPath.addDieListener(onLockPathError); } - + private Runnable onLockPathError = new Runnable () { @Override @@ -122,15 +128,15 @@ public void run() { // Bubble the killer exception and die! die(zkPath.getKillerException()); } - + }; - + @Override protected void onDie(ZkCagesException killerException) { // We just set the lock state. The killer exception has already been set by base class safeLockState(LockState.Error); } - + private Runnable createLockNode = new Runnable () { @Override @@ -138,31 +144,31 @@ public void run() { zooKeeper().create(zkPath.getPath() + "/" + getType() + "-", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, createLockNodeHandler, this); } - + }; - + private StringCallback createLockNodeHandler = new StringCallback() { @Override public void processResult(int rc, String path, Object ctx, String name) { - + if (rc == Code.OK.intValue()) thisNodeId = ZkLockNode.getLockNodeIdFromName(name); if (passOrTryRepeat(rc, new Code[] { Code.OK }, (Runnable)ctx)) getQueuedLocks.run(); } - - }; - + + }; + private Runnable getQueuedLocks = new Runnable () { @Override public void run() { zooKeeper().getChildren(zkPath.getPath(), null, queuedLocksHandler, this); } - + }; - + private ChildrenCallback queuedLocksHandler = new ChildrenCallback() { @Override @@ -209,18 +215,18 @@ public void processResult(int rc, String path, Object ctx, List children } } } - - }; - + + }; + private Runnable watchBlockingNode = new Runnable() { @Override public void run() { zooKeeper().exists(zkPath.getPath() + "/" + blockingNodeId, ZkLockBase.this, blockingNodeHandler, this); } - + }; - + private StatCallback blockingNodeHandler = new StatCallback() { @Override @@ -230,32 +236,32 @@ public void processResult(int rc, String path, Object ctx, Stat stat) { else passOrTryRepeat(rc, new Code[] { Code.OK }, (Runnable)ctx); } - + }; - + @Override protected void onNodeDeleted(String path) { getQueuedLocks.run(); } - + private Runnable releaseLock = new Runnable () { @Override public void run() { zooKeeper().delete(zkPath.getPath() + "/" + thisNodeId, -1, releaseLockHandler, this); } - + }; private VoidCallback releaseLockHandler = new VoidCallback() { @Override public void processResult(int rc, String path, Object ctx) { - passOrTryRepeat(rc, new Code[] { Code.OK, Code.NONODE }, (Runnable)ctx); + passOrTryRepeat(rc, new Code[] { Code.OK, Code.NONODE }, (Runnable)ctx); } - + }; - + private Runnable reportStateUpdatedToListener = new Runnable() { @Override @@ -269,18 +275,18 @@ public void run() { else listener.onLockAcquired(ZkLockBase.this, context); } - + }; - + private Runnable reportDieToListener = new Runnable() { @Override public void run() { listener.onLockError(getKillerException(), ZkLockBase.this, context); } - + }; - + /** * Set the lock state when we know an exception can't be thrown * @param newState The new lock state @@ -293,23 +299,23 @@ private void safeLockState(LockState newState) { assert false : "Unknown condition"; } } - + /** * Set the lock state * @param newState The new lock state * @throws ZkCagesException */ private void setLockState(LockState newState) throws ZkCagesException { - + synchronized (mutex) { switch (newState) { - + case Idle: assert false : "Unknown condition"; - + case Waiting: /** - * We only set this state from the public interface methods. This means we can directly throw an + * We only set this state from the public interface methods. This means we can directly throw an * exception back at the caller! */ switch (lockState) { @@ -329,7 +335,7 @@ private void setLockState(LockState newState) throws ZkCagesException { assert false : "Unknown condition"; } break; - + case Abandoned: /** * We tried to acquire a lock, but it was already held and we are abandoning our attempt to acquire. @@ -355,7 +361,7 @@ private void setLockState(LockState newState) throws ZkCagesException { assert false : "Unknown condition"; } break; - + case Acquired: /** * We have successfully acquired the lock. @@ -376,7 +382,7 @@ private void setLockState(LockState newState) throws ZkCagesException { assert false : "Unknown condition"; } break; - + case Released: /** * We are releasing a lock. This can be done before a lock has been acquired if an operation is in progress. @@ -405,7 +411,7 @@ private void setLockState(LockState newState) throws ZkCagesException { assert false : "Unknown condition"; } break; - + case Error: switch (lockState) { case Released: @@ -417,7 +423,7 @@ private void setLockState(LockState newState) throws ZkCagesException { return; } } - + assert false : "Unknown condition"; } } diff --git a/src/main/java/org/scale7/zookeeper/cages/ZkMultiPathLock.java b/src/main/java/org/scale7/zookeeper/cages/ZkMultiPathLock.java index 6e83aeb..5d23193 100644 --- a/src/main/java/org/scale7/zookeeper/cages/ZkMultiPathLock.java +++ b/src/main/java/org/scale7/zookeeper/cages/ZkMultiPathLock.java @@ -1,11 +1,10 @@ package org.scale7.zookeeper.cages; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import org.scale7.concurrency.ManualResetEvent; +import org.apache.commons.lang.NotImplementedException; import org.scale7.networking.utility.NetworkAlgorithms; /** @@ -14,48 +13,36 @@ * environments. Whenever two or more single path locks are acquired sequentially, the possibility of * deadlock arises (for example if process A requests r(X) and then requests w(Y), but process B already * holds r(Y) and then simultaneously requests w(X)). Detecting closed wait graphs in a distributed - * environment is necessarily difficult and expensive. One solution is for developers to acquire all single + * environment is difficult and expensive. One solution is for developers to acquire all single * lock paths with a timeout, such that in a deadlock situation an exception will be thrown, and hopefully * all held locks released as the operation is rolled back. However, there are a number of problems with * this approach: (1) developers must be relied upon to set appropriate timeouts (2) developers may * have to roll back partially completed operations when an a timeout exception is thrown when they - * are trying to acquire a nested lock (3) even if developers have handled the aforementioned complexity + * are trying to acquire a nested lock (3) even if developers have handled this complexity * correctly, in such situations where deadlock occurs during the period leading up to the timeout * many operations and locks may have become queued up, thus leading to a stampede of lock acquisition * attempts and a likely repeat of the deadlock occurring. For this reason, it is better to forbid * developers from acquiring single lock paths in a nested manner, and to require them to use this - * multi-lock paradigm, where all locks required for an operation are acquired and released together, - * thus avoiding the risk of deadlock occurring. It is even arguable, that the usage of the single - * path read and write lock classes should be forbidden altogether in projects pursuing maximum robustness, - * since this class will be only be slightly less efficient unless the lock paths involved are heavily - * contested, and in practice - since deadlocks will occur - it will be much more efficient. + * q multi-lock system where all locks required for an operation are acquired and released together, + * thus avoiding the risk of deadlock occurring. * * @author dominicwilliams * */ -public class ZkMultiPathLock implements IMultiPathLock, ITryLockListener { +public class ZkMultiPathLock implements IMultiPathLock { private final int MIN_RETRY_DELAY = 125; private final int MAX_RETRY_DELAY = 8000; - private ILockListener listener; - private boolean tryAcquireOnly; - private volatile ArrayList locks; - private final ManualResetEvent isDone; - private ZkCagesException killerException; + private ArrayList locks; + private ISinglePathLock[] sortedLocks; private volatile LockState lockState; - private ScheduledExecutorService retry; - private Object context; - private int attemptId; private final Integer mutex; public ZkMultiPathLock() { lockState = LockState.Idle; locks = new ArrayList(32); - isDone = new ManualResetEvent(false); - retry = ZkSessionManager.instance().callbackExecutor; mutex = new Integer(-1); - attemptId = 0; } /** @@ -81,59 +68,93 @@ public void addWriteLock(String lockPath) { /** {@inheritDoc} */ @Override public void acquire() throws ZkCagesException, InterruptedException { - synchronized (mutex) { - setLockState(LockState.Waiting, null); - tryAcquireOnly = false; - tryAcquireAll(); + setLockState(LockState.Waiting); + prepareSortedLockArray(); + int attempts = 0; + while (true) { + attempts++; + if (doTryAcquire()) { + setLockState(LockState.Acquired); + return; + } + Thread.sleep(NetworkAlgorithms.getBinaryBackoffDelay(attempts, MIN_RETRY_DELAY, MAX_RETRY_DELAY)); } - isDone.waitOne(); - if (killerException != null) - throw killerException; } /** {@inheritDoc} */ @Override public void acquire(ILockListener listener, Object context) throws ZkCagesException, InterruptedException { - synchronized (mutex) { - setLockState(LockState.Waiting, null); - tryAcquireOnly = false; - this.listener = listener; - this.context = context; - tryAcquireAll(); - } + throw new NotImplementedException(); } /** {@inheritDoc} */ @Override public boolean tryAcquire() throws ZkCagesException, InterruptedException { - synchronized (mutex) { - setLockState(LockState.Waiting, null); - tryAcquireOnly = true; - tryAcquireAll(); + setLockState(LockState.Waiting); + prepareSortedLockArray(); + if (doTryAcquire()) { + setLockState(LockState.Acquired); + return true; + } + setLockState(LockState.Abandoned); + return false; + } + + /** + * When locks are being acquired sequentially, this will produce the earliest possible back off when + * there is contention (since multi-locks will try to acquire the same paths first) + */ + protected void prepareSortedLockArray() { + sortedLocks = locks.toArray(new ISinglePathLock[0]); + Arrays.sort(sortedLocks); + } + + protected boolean doTryAcquire() throws ZkCagesException, InterruptedException { + // loop until we acquire all locks + try { + boolean tryFailed = false; + // try and get them all + for (ILock lock : sortedLocks) + if (!lock.tryAcquire()) { + tryFailed = true; + break; + } + // done if successful + if (!tryFailed) { + return true; + } + // otherwise roll back + release(); + return false; + } catch (ZkCagesException ex) { + // record killer ZooKeeper exception + setLockState(LockState.Error); + // roll back any locks we have obtained so far + release(); + // re-throw exception + throw ex; + } catch (InterruptedException ex) { + // roll back any locks we have obtained so far + release(); + // re-throw interrupted + Thread.currentThread().interrupt(); + throw ex; } - isDone.waitOne(); - if (killerException != null) - throw killerException; - return lockState == LockState.Acquired; } /** {@inheritDoc} */ @Override public void tryAcquire(ITryLockListener listener, Object context) throws ZkCagesException, InterruptedException { - synchronized (mutex) { - setLockState(LockState.Waiting, null); - tryAcquireOnly = true; - this.listener = listener; - this.context = context; - tryAcquireAll(); - } + throw new NotImplementedException(); } /** {@inheritDoc} */ @Override public void release() { synchronized (mutex) { - safeLockState(LockState.Released, null); + for (ILock lock : sortedLocks) + if (lock.getState() == LockState.Acquired) + lock.release(); } } @@ -171,251 +192,110 @@ private String[] getLockPathsByType(ILock.LockType type) { return lockPaths.toArray(new String[] {}); } - /** {@inheritDoc} */ - @Override - public void onTryAcquireLockFailed(ILock lock, Object context) { - synchronized (mutex) { - if (oldAttemptFeedback(context)) - return; - if (tryAcquireOnly) { - safeLockState(LockState.Abandoned, null); - } else { - // Schedule new attempt to acquire locks over the paths - rescheduleTryAcquireAll(context); - } - } - } - - /** {@inheritDoc} */ - @Override - public void onLockAcquired(ILock lock, Object context) { - synchronized (mutex) { - if (oldAttemptFeedback(context)) - return; - for (ILock currLock : locks) { - if (currLock.getState() != LockState.Acquired) - return; - } - safeLockState(LockState.Acquired, null); - } - } - - /** {@inheritDoc} */ - @Override - public void onLockError(ZkCagesException error, ILock lock, Object context) { - synchronized (mutex) { - if (oldAttemptFeedback(context)) - return; - safeLockState(LockState.Error, error); - } - } - - private void tryAcquireAll() throws ZkCagesException, InterruptedException { - Object context = new Integer(attemptId); - if (locks.size() == 1) - // Only 1 lock is being acquired. Therefore we can simply wait to acquire a lock over its path - // without creating an opportunity for deadlock. This is more efficient where the path is contended - // than repeatedly trying to acquire paths according to a BEB schedule. - locks.get(0).acquire(this, context); - else - // Try to acquire all the locks together. Do not block waiting for any individual path. If we cannot - // acquire all paths together, we wil retry according to the BEB schedule. - for (ILock lock : locks) - lock.tryAcquire(this, context); - } - - private void releaseAll() { - for (ILock lock : locks) - lock.release(); - } - - private boolean oldAttemptFeedback(Object context) { - int attemptId = (Integer)context; - return this.attemptId != attemptId; - } - - private void rescheduleTryAcquireAll(Object context) { - // Increment attemptId to begin a new attempt - attemptId++; - // Release all existing locks / attempts to lock - releaseAll(); - // Create new array of lock objects - ArrayList newLockObjs = new ArrayList(locks.size()); - for (ISinglePathLock lock : locks) { - if (lock.getType() == LockType.Read) - newLockObjs.add(new ZkReadLock(lock.getLockPath())); - else if (lock.getType() == LockType.Write) - newLockObjs.add(new ZkWriteLock(lock.getLockPath())); - else - assert false; - } - locks = newLockObjs; - // Schedule new attempt - retry.schedule(retryAcquireLocks, NetworkAlgorithms.getBinaryBackoffDelay(attemptId, MIN_RETRY_DELAY, MAX_RETRY_DELAY), TimeUnit.MILLISECONDS); - } - - private Runnable retryAcquireLocks = new Runnable () { - - @Override - public void run() { - synchronized (mutex) { - try { - tryAcquireAll(); - } catch (ZkCagesException e) { - safeLockState(LockState.Error, e); - } catch (InterruptedException e) { - safeLockState(LockState.Error, new ZkCagesException(ZkCagesException.Error.INTERRUPTED_EXCEPTION)); - Thread.currentThread().interrupt(); - } - } - } - - }; - - /** - * Set the lock state when we know an exception can't be thrown - * @param newState The new lock state - */ - private void safeLockState(LockState newState, ZkCagesException killerException) { - try { - setLockState(newState, killerException); - } catch (ZkCagesException e) { - e.printStackTrace(); - assert false : "Unknown condition"; - } - } - /** * Set the lock state * @param newState The new lock state * @throws ZkCagesException */ - private void setLockState(LockState newState, ZkCagesException killerException) throws ZkCagesException { - - switch (newState) { - - case Idle: - assert false : "Unknown condition"; + private void setLockState(LockState newState) throws ZkCagesException { + synchronized (mutex) { + switch (newState) { - case Waiting: - /** - * We only set this state from the public interface methods. This means we can directly throw an - * exception back at the caller. - */ - switch (lockState) { case Idle: - // Caller is starting operation - lockState = newState; - return; - case Waiting: - throw new ZkCagesException(ZkCagesException.Error.LOCK_ALREADY_WAITING); - case Abandoned: - throw new ZkCagesException(ZkCagesException.Error.LOCK_ALREADY_ABANDONED); - case Acquired: - throw new ZkCagesException(ZkCagesException.Error.LOCK_ALREADY_ACQUIRED); - case Released: - throw new ZkCagesException(ZkCagesException.Error.LOCK_ALREADY_RELEASED); - default: assert false : "Unknown condition"; - } - break; - case Abandoned: - /** - * We tried to acquire a lock, but it was already held and we are abandoning our attempt to acquire. - */ - switch (lockState) { case Waiting: - // Attempt to acquire lock without blocking has failed - lockState = newState; - // Release our lock nodes immediately - releaseAll(); - // Notify waiting caller about result - isDone.set(); - // Notify listener about result - if (listener != null) { - ITryLockListener listener = (ITryLockListener)this.listener; - listener.onTryAcquireLockFailed(this, context); + /** + * We only set this state from the public interface methods. This means we can directly throw an + * exception back at the caller. + */ + switch (lockState) { + case Idle: + // Caller is starting operation + lockState = newState; + return; + case Waiting: + throw new ZkCagesException(ZkCagesException.Error.LOCK_ALREADY_WAITING); + case Abandoned: + throw new ZkCagesException(ZkCagesException.Error.LOCK_ALREADY_ABANDONED); + case Acquired: + throw new ZkCagesException(ZkCagesException.Error.LOCK_ALREADY_ACQUIRED); + case Released: + throw new ZkCagesException(ZkCagesException.Error.LOCK_ALREADY_RELEASED); + default: + assert false : "Unknown condition"; } - return; - case Released: - case Error: - // The lock nodes have already been released. No need to releaseAll(); - return; - default: - assert false : "Unknown condition"; - } - break; + break; - case Acquired: - /** - * We have successfully acquired the lock. - */ - switch (lockState) { - case Waiting: - // Attempt to acquire lock has succeeded - lockState = newState; - // Notify caller - isDone.set(); - // Notify listener - if (listener != null) { - listener.onLockAcquired(this, context); + case Abandoned: + /** + * We tried to acquire a lock, but it was already held and we are abandoning our attempt to acquire. + */ + switch (lockState) { + case Waiting: + // Attempt to acquire lock without blocking has failed + lockState = newState; + return; + case Released: + case Error: + // The lock nodes have already been released. No need to releaseAll(); + return; + default: + assert false : "Unknown condition"; } - return; - default: - assert false : "Unknown condition"; - } - break; + break; - case Released: - /** - * We are releasing a lock. This can be done before a lock has been acquired if an operation is in progress. - */ - switch (lockState) { - case Idle: - // Change to the released state to prevent this lock being used again - lockState = newState; - return; - case Waiting: - setLockState(LockState.Error, new ZkCagesException(ZkCagesException.Error.LOCK_RELEASED_WHILE_WAITING)); - return; case Acquired: - // We are simply releasing the lock while holding it. Everything fine. - lockState = newState; - // Initiate the release procedure immediately - releaseAll(); - case Released: - case Abandoned: - // We consider that release() has been called vacuously - return; - default: - assert false : "Unknown condition"; - } - break; + /** + * We have successfully acquired the lock. + */ + switch (lockState) { + case Waiting: + // Attempt to acquire lock has succeeded + lockState = newState; + return; + default: + assert false : "Unknown condition"; + } + break; - case Error: - switch (lockState) { case Released: + /** + * We are releasing a lock. This can be done before a lock has been acquired if an operation is in progress. + */ + switch (lockState) { + case Idle: + // Change to the released state to prevent this lock being used again + lockState = newState; + return; + case Waiting: + throw new ZkCagesException(ZkCagesException.Error.LOCK_RELEASED_WHILE_WAITING); + case Acquired: + // We are simply releasing the lock while holding it. Everything fine. + lockState = newState; + case Released: + case Abandoned: + // We consider that release() has been called vacuously + return; + default: + assert false : "Unknown condition"; + } + break; + case Error: - // Error is vacuous now. Locks have already been released. - return; - default: - // An error has occurred. - lockState = newState; - // Record the killer exception - this.killerException = killerException; - // Initiate the release procedure immediately - releaseAll(); - // Notify caller - isDone.set(); - // Notify listener - if (listener != null) { - listener.onLockError(killerException, this, context); + switch (lockState) { + case Released: + case Error: + // Error is vacuous now. Locks have already been released. + return; + default: + // An error has occurred. + lockState = newState; + return; } - return; } - } - assert false : "Unknown condition"; + assert false : "Unknown condition"; + } } } diff --git a/src/main/java/org/scale7/zookeeper/cages/ZkWriteLock.java b/src/main/java/org/scale7/zookeeper/cages/ZkWriteLock.java index 7b2adad..1558bf4 100644 --- a/src/main/java/org/scale7/zookeeper/cages/ZkWriteLock.java +++ b/src/main/java/org/scale7/zookeeper/cages/ZkWriteLock.java @@ -1,7 +1,7 @@ package org.scale7.zookeeper.cages; public class ZkWriteLock extends ZkLockBase { - + public ZkWriteLock(String lockPath) { super(lockPath); }