Skip to content

Commit

Permalink
YOU NEED THIS: unfinished code in ZkMultiPathLock accidentally made i…
Browse files Browse the repository at this point in the history
…t into production and caused all sorts of woes. This is the fix
  • Loading branch information
Dominic Williams committed Nov 17, 2010
1 parent a8c1124 commit 4396e13
Show file tree
Hide file tree
Showing 5 changed files with 223 additions and 330 deletions.
9 changes: 7 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<groupId>org.scale7</groupId>
<artifactId>scale7-cages</artifactId>
<packaging>jar</packaging>
<version>0.6.1-SNAPSHOT</version>
<version>0.7.0-SNAPSHOT</version>
<name>Cages - A library for distributed locking and synchronization using ZooKeeper</name>
<url>http://github.com/s7</url>
<properties>
Expand Down Expand Up @@ -45,6 +45,11 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
<version>2.3</version>
</dependency>
</dependencies>
<repositories>
<repository>
Expand All @@ -55,7 +60,7 @@
<repository>
<id>maven.scale7.org</id>
<name>Scale7 Maven Repo</name>
<url>http://github.com/s7/mvnrepo/raw/master</url>
<url>https://github.com/s7/mvnrepo/raw/master</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package org.scale7.zookeeper.cages;

public interface ISinglePathLock extends ILock {
public interface ISinglePathLock extends ILock, Comparable<ISinglePathLock> {
/**
* Get the path that a lock is operating against
* @return The path the lock is attempting to lock
*/
String getLockPath();


}
104 changes: 55 additions & 49 deletions src/main/java/org/scale7/zookeeper/cages/ZkLockBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@

/**
* Base class for single path read and write locks.
*
* TODO implement timeouts as basic means to rollback from deadlock.
*
*
* TODO implement timeouts as basic means to rollback from deadlock.
*
* @author dominicwilliams
*
*/
Expand All @@ -34,7 +34,7 @@ public abstract class ZkLockBase extends ZkSyncPrimitive implements ISinglePathL
private boolean tryAcquireOnly;
private volatile LockState lockState;
private Integer mutex;

public ZkLockBase(String lockPath) {
super(ZkSessionManager.instance());
PathUtils.validatePath(lockPath);
Expand All @@ -50,7 +50,7 @@ public void acquire() throws ZkCagesException, InterruptedException {
createRootPath(lockPath);
waitSynchronized();
}

/** {@inheritDoc} */
@Override
public void acquire(ILockListener listener, Object context) throws ZkCagesException, InterruptedException {
Expand All @@ -61,7 +61,7 @@ public void acquire(ILockListener listener, Object context) throws ZkCagesExcept
addDieListener(reportDieToListener);
createRootPath(lockPath);
}

/** {@inheritDoc} */
@Override
public boolean tryAcquire() throws ZkCagesException, InterruptedException {
Expand All @@ -71,7 +71,7 @@ public boolean tryAcquire() throws ZkCagesException, InterruptedException {
waitSynchronized();
return lockState == LockState.Acquired;
}

/** {@inheritDoc} */
@Override
public void tryAcquire(ITryLockListener listener, Object context) throws ZkCagesException, InterruptedException {
Expand All @@ -83,13 +83,13 @@ public void tryAcquire(ITryLockListener listener, Object context) throws ZkCages
addDieListener(reportDieToListener);
createRootPath(lockPath);
}

/** {@inheritDoc} */
@Override
public void release() {
safeLockState(LockState.Released);
}

/** {@inheritDoc} */
@Override
public LockState getState()
Expand All @@ -105,14 +105,20 @@ public LockState getState()
public String getLockPath() {
return zkPath.getPath();
}


@Override
public int compareTo(ISinglePathLock other) {
int result = getLockPath().compareTo(other.getLockPath());
return result == 0 ? (getType() == other.getType() ? 1 : 0): result;
}

private void createRootPath(String path) throws InterruptedException {
zkPath = new ZkPath(path, CreateMode.PERSISTENT);
// TODO for now only persistent ZK nodes can have children. fix this.
zkPath.addUpdateListener(createLockNode, true);
zkPath.addDieListener(onLockPathError);
zkPath.addDieListener(onLockPathError);
}

private Runnable onLockPathError = new Runnable () {

@Override
Expand All @@ -122,47 +128,47 @@ public void run() {
// Bubble the killer exception and die!
die(zkPath.getKillerException());
}

};

@Override
protected void onDie(ZkCagesException killerException) {
// We just set the lock state. The killer exception has already been set by base class
safeLockState(LockState.Error);
}

private Runnable createLockNode = new Runnable () {

@Override
public void run() {
zooKeeper().create(zkPath.getPath() + "/" + getType() + "-", new byte[0],
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, createLockNodeHandler, this);
}

};

private StringCallback createLockNodeHandler = new StringCallback() {

@Override
public void processResult(int rc, String path, Object ctx, String name) {

if (rc == Code.OK.intValue())
thisNodeId = ZkLockNode.getLockNodeIdFromName(name);
if (passOrTryRepeat(rc, new Code[] { Code.OK }, (Runnable)ctx))
getQueuedLocks.run();
}
};

};

private Runnable getQueuedLocks = new Runnable () {

@Override
public void run() {
zooKeeper().getChildren(zkPath.getPath(), null, queuedLocksHandler, this);
}

};

private ChildrenCallback queuedLocksHandler = new ChildrenCallback() {

@Override
Expand Down Expand Up @@ -209,18 +215,18 @@ public void processResult(int rc, String path, Object ctx, List<String> children
}
}
}
};

};

private Runnable watchBlockingNode = new Runnable() {

@Override
public void run() {
zooKeeper().exists(zkPath.getPath() + "/" + blockingNodeId, ZkLockBase.this, blockingNodeHandler, this);
}

};

private StatCallback blockingNodeHandler = new StatCallback() {

@Override
Expand All @@ -230,32 +236,32 @@ public void processResult(int rc, String path, Object ctx, Stat stat) {
else
passOrTryRepeat(rc, new Code[] { Code.OK }, (Runnable)ctx);
}

};

@Override
protected void onNodeDeleted(String path) {
getQueuedLocks.run();
}

private Runnable releaseLock = new Runnable () {

@Override
public void run() {
zooKeeper().delete(zkPath.getPath() + "/" + thisNodeId, -1, releaseLockHandler, this);
}

};

private VoidCallback releaseLockHandler = new VoidCallback() {

@Override
public void processResult(int rc, String path, Object ctx) {
passOrTryRepeat(rc, new Code[] { Code.OK, Code.NONODE }, (Runnable)ctx);
passOrTryRepeat(rc, new Code[] { Code.OK, Code.NONODE }, (Runnable)ctx);
}

};

private Runnable reportStateUpdatedToListener = new Runnable() {

@Override
Expand All @@ -269,18 +275,18 @@ public void run() {
else
listener.onLockAcquired(ZkLockBase.this, context);
}

};

private Runnable reportDieToListener = new Runnable() {

@Override
public void run() {
listener.onLockError(getKillerException(), ZkLockBase.this, context);
}

};

/**
* Set the lock state when we know an exception can't be thrown
* @param newState The new lock state
Expand All @@ -293,23 +299,23 @@ private void safeLockState(LockState newState) {
assert false : "Unknown condition";
}
}

/**
* Set the lock state
* @param newState The new lock state
* @throws ZkCagesException
*/
private void setLockState(LockState newState) throws ZkCagesException {

synchronized (mutex) {
switch (newState) {

case Idle:
assert false : "Unknown condition";

case Waiting:
/**
* We only set this state from the public interface methods. This means we can directly throw an
* We only set this state from the public interface methods. This means we can directly throw an
* exception back at the caller!
*/
switch (lockState) {
Expand All @@ -329,7 +335,7 @@ private void setLockState(LockState newState) throws ZkCagesException {
assert false : "Unknown condition";
}
break;

case Abandoned:
/**
* We tried to acquire a lock, but it was already held and we are abandoning our attempt to acquire.
Expand All @@ -355,7 +361,7 @@ private void setLockState(LockState newState) throws ZkCagesException {
assert false : "Unknown condition";
}
break;

case Acquired:
/**
* We have successfully acquired the lock.
Expand All @@ -376,7 +382,7 @@ private void setLockState(LockState newState) throws ZkCagesException {
assert false : "Unknown condition";
}
break;

case Released:
/**
* We are releasing a lock. This can be done before a lock has been acquired if an operation is in progress.
Expand Down Expand Up @@ -405,7 +411,7 @@ private void setLockState(LockState newState) throws ZkCagesException {
assert false : "Unknown condition";
}
break;

case Error:
switch (lockState) {
case Released:
Expand All @@ -417,7 +423,7 @@ private void setLockState(LockState newState) throws ZkCagesException {
return;
}
}

assert false : "Unknown condition";
}
}
Expand Down
Loading

0 comments on commit 4396e13

Please sign in to comment.