Skip to content

Commit

Permalink
Start of real key locking
Browse files Browse the repository at this point in the history
  • Loading branch information
cliffclick committed Jan 27, 2014
1 parent 8eb23fb commit 53be2c3
Show file tree
Hide file tree
Showing 7 changed files with 101 additions and 77 deletions.
2 changes: 1 addition & 1 deletion prj.el
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
'(jde-run-option-debug nil)
'(jde-run-option-vm-args (quote ("-XX:+PrintGC")))
'(jde-compile-option-directory "./target/classes")
'(jde-run-option-application-args (quote ("-beta" "-mainClass" "org.junit.runner.JUnitCore" "water.exec.Expr2Test")))
'(jde-run-option-application-args (quote ("-beta" "-mainClass" "org.junit.runner.JUnitCore" "hex.gbm.GBMTest")))
'(jde-debugger (quote ("JDEbug")))
'(jde-compile-option-source (quote ("1.6")))
'(jde-compile-option-classpath (quote ("./target/classes" "./lib/javassist.jar" "./lib/hadoop/cdh4/hadoop-common.jar" "./lib/hadoop/cdh4/hadoop-auth.jar" "./lib/hadoop/cdh4/slf4j-api-1.6.1.jar" "./lib/hadoop/cdh4/slf4j-nop-1.6.1.jar" "./lib/hadoop/cdh4/hadoop-hdfs.jar" "./lib/hadoop/cdh4/protobuf-java-2.4.0a.jar" "./lib/apache/commons-codec-1.4.jar" "./lib/apache/commons-configuration-1.6.jar" "./lib/apache/commons-lang-2.4.jar" "./lib/apache/commons-logging-1.1.1.jar" "./lib/apache/httpclient-4.1.1.jar" "./lib/apache/httpcore-4.1.jar" "./lib/junit/junit-4.11.jar" "./lib/apache/guava-12.0.1.jar" "./lib/gson/gson-2.2.2.jar" "./lib/poi/poi-3.8-20120326.jar" "./lib/poi/poi-ooxml-3.8-20120326.jar" "./lib/poi/poi-ooxml-schemas-3.8-20120326.jar" "./lib/poi/dom4j-1.6.1.jar" "./lib/Jama/Jama.jar" "./lib/s3/aws-java-sdk-1.3.27.jar" "./lib/log4j/log4j-1.2.15.jar")))
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/hex/GLMGrid.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public GLMModels atomic(GLMModels old) {
old._runTime = Math.max(runTime,old._runTime);
return old;
}
@Override public void onSuccess() {
@Override public void onSuccess(Value old) {
// we're done, notify
if(done) remove();
};
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/water/Atomic.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public Atomic(){}
* atomic update. Override this if you need to perform some action after
* the update succeeds (eg cleanup).
*/
public void onSuccess(){}
public void onSuccess( Value old ){}

/** Block until it completes, even if run remotely */
public final T invoke( Key key ) {
Expand Down Expand Up @@ -60,7 +60,7 @@ public final RPC<Atomic<T>> fork(Key key) {
// Attempt atomic update
Value res = DKV.DputIfMatch(_key,val2,val1,fs);
if( res == val1 ) { // Success?
onSuccess(); // Call user's post-XTN function
onSuccess(val1); // Call user's post-XTN function
fs.blockForPending(); // Block for any pending invalidates on the atomic update
break;
}
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/water/Job.java
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ public void cancel(final String msg) {
}
return old;
}
@Override public void onSuccess(){
@Override public void onSuccess(Value old){
if(_job != null){
final Job job = _job;
H2O.submitTask(new H2OCountedCompleter() {
Expand Down
163 changes: 92 additions & 71 deletions src/main/java/water/Lockable.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,73 +29,110 @@ public abstract class Lockable<T extends Lockable<T>> extends Iced {
@API(help="My Key")
public final Key _key;

///** Write-locker job is in _jobs[0 ].
// * Read -locker jobs are in _jobs[1+].
// * Unlocked has _jobs equal to null.
// * Only 1 situation will be true at a time; atomically updated.
// */
//@API(help="Jobs locking this key")
//public Key _lockers[];
//
/** Write-locker job is in _jobs[0 ]. Can be null locker.
* Read -locker jobs are in _jobs[1+].
* Unlocked has _jobs equal to null.
* Only 1 situation will be true at a time; atomically updated.
* Transient, because this data is only valid on the master node.
*/
@API(help="Jobs locking this key")
public transient Key _lockers[];

// Create unlocked
public Lockable( Key key ) { _key = key; }
//// Create locked by Job
//public Lockable( Key key, Job locker ) { _key = key; _lockers = new Key[]{locker._job_key}; }
//
//
//// Utility class to ask-state or set-state.
//// NO ERROR CHECKING, just a primitive abstraction layer.
//static private abstract class LockImpl extends TAtomic<Lockable> {
// final Key _job_key; // Job doing the locking
// Key _lockers[]; // Job state on completion
// LockImpl( Key job_key ) { _job_key = job_key; }
// boolean is_write_locked() { return _lockers!= null && _lockers[0]==_job_key; }
// boolean is_unlocked () { return _lockers== null; }
// Key[] set_write_lock( Key job_key ) { return _lockers=new Key[]{_job_key}; }
//}

// Atomic create+overwrite of prior key.
// If prior key exists, block until acquire a write-lock.
// The call delete_impl, removing all of a prior key.
// The replace this object as the new Lockable, still write-locked.
// "locker" can be null, meaning the special no-Job locker; mostly used by fast tests
// "locker" can be null, meaning the special no-Job locker; for use by expected-fast operations
//
// Example: write-lock & remove an old VA, and replace with a new locked Frame
// Local-Node Master-Node
// (1) FR,VA -->write_lock(job)--> VA
// (2) FR,VA.waiting... FR,VA+job-locked atomic xtn loop
// (3) VA.delete_impl onSuccess
// (4) FR <--update success <-- FR+job-locked

public T delete_and_lock( Job locker ) {
// Atomically acquire lock
assert _key != null; // Must have a Key to be lockable (not all Frames ever exist in the K/V)
// Key job_key = job.job_key;
// WriteLock wl = (WriteLock)(new WriteLock(job_key).invoke(_key));
// if( wl == null ) // Aborted?
// throw new IllegalArgumentException("Attempting to write-lock a deleted key");
// assert wl.is_write_locked();

// Default non-locking empty implementation!!!
Futures fs = new Futures();
Value val = DKV.get(_key);
if( val != null ) // Prior exists?
// Asserts that this _key only refers to Lockables its whole life
((T)val.get()).delete_impl(fs); // Delete prior
DKV.put(_key,this,fs); // Put initial copy, replacing old guy
fs.blockForPending();
System.out.println("Locking "+_key+" by "+locker);
Key job_key = locker == null ? null : locker.job_key;
new WriteUpdateLock(job_key).invoke(_key);
return (T)this;
}

// Atomic remove self. Trivial if no self-key.
public void delete( ) { delete(new Futures()).blockForPending(); }
public Futures delete( Futures fs ) {
// Default non-locking empty implementation!!!
delete_impl(fs); // Nuke self
if( _key != null ) DKV.remove(_key,fs); // Remove self key
return fs;
// Obtain the write-lock on _key, deleting the prior _key.
// Blocks locally waiting on the master node to resolve.
// Blocks on master, until the lock can be obtained (burning a F/J till lock acquire).
// Spins, if several requests are competing for a lock.
private class WriteUpdateLock extends TAtomic<Lockable> {
final Key _job_key; // Job doing the locking
WriteUpdateLock( Key job_key ) { _job_key = job_key; }
// This code runs on master, in a loop until the returned value is
// atomically updated over the Lockable's _key.
@Override public Lockable atomic(Lockable old) {
// "old" is a private defensive copy of the old Lockable - no other
// writers. We can read it without concern for races.
if( old != null ) { // Prior Lockable exists?
assert !old.is_locked(_job_key); // No double locking by same job
if( !old.is_unlocked() ) // Blocking for some other Job to finish???
throw H2O.unimpl();
}
// Update-in-place the defensive copy
assert is_unlocked();
set_write_lock(_job_key);
assert is_locked(_job_key);
return Lockable.this;
}
@Override public void onSuccess( Lockable old ) {
if( old != null ) {
Futures fs = new Futures();
old.delete_impl(fs);
fs.blockForPending();
}
}
}


// Atomic lock & remove self.
public void delete( ) { new WriteDeleteLock().invoke(_key); }
public Futures delete(Futures fs) { delete(); return fs; }
public static void delete( Key k ) {
if( k == null ) return;
Value val = DKV.get(k);
if( !val.isLockable() ) DKV.remove(k);
else ((Lockable)val.get()).delete();
}

// Atomically set a new version of self
// Obtain the write-lock on _key, then delete.
private class WriteDeleteLock extends TAtomic<Lockable> {
// This code runs on master, in a loop until the returned value is
// atomically updated over the Lockable's _key.
@Override public Lockable atomic(Lockable old) {
// "old" is a private defensive copy of the old Lockable - no other
// writers. We can read it without concern for races.
if( old != null && // Prior Lockable exists?
!old.is_unlocked() ) // Blocking for some other Job to finish???
throw H2O.unimpl();
// Update-in-place the defensive copy
assert old.is_unlocked();
old.set_write_lock(null);
assert old.is_locked(null);
return old;
}
@Override public void onSuccess( Lockable old ) {
if( old != null ) {
Futures fs = new Futures();
old.delete_impl(fs);
DKV.remove(_key,fs); // Delete self also
fs.blockForPending();
}
}
}

// Atomically set a new version of self
public void update() {
// Default non-unlocking empty implementation!!!
// Assert locked by job
Expand All @@ -107,35 +144,19 @@ public void update() {
public Futures unlock( Futures fs ) {
// Default non-unlocking empty implementation!!!
DKV.put(_key,this,fs); // Just freshen copy of self
System.out.println("Unlocking "+_key);
return fs;
}

//// Block until acquire a write-lock.
//void write_lock( Job job ) {
// // Atomically acquire lock
// Key job_key = job.job_key;
// WriteLock wl = (WriteLock)(new WriteLock(job_key).invoke(_key));
// if( wl == null ) // Aborted?
// throw new IllegalArgumentException("Attempting to write-lock a deleted key");
// assert wl.is_write_locked();
//}
//
//private class WriteLock extends LockImpl {
// WriteLock( Key job_key ) { super(job_key); }
// @Override public Lockable atomic(Lockable old) {
// // "old" is a private defensive copy of the old Lockable - no other
// // writers. We can read it without concern for races. The outer-class
// // new Lockable is what is getting locked. These might be the same.
// if( old == null ) return null; // Nothing to lock???
// _lockers = old._lockers;
// assert !is_write_locked(); // Double locking by same job
// if( !is_unlocked() ) // Blocking for some other Job to finish???
// throw H2O.unimpl();
// // Update-in-place the defensive copy
// old._lockers = set_write_lock(_job_key);
// return old;
// }
//}
// Accessers for locking state. No self-checking, just primitive results.
private boolean is_locked(Key job_key) {
if( _lockers==null ) return false;
for( Key k : _lockers ) if( job_key.equals(k) ) return true;
return false;
}
private boolean is_unlocked() { return _lockers== null; }
private void set_write_lock( Key job_key ) { _lockers=new Key[]{job_key}; }


// Remove any subparts before removing the whole thing
protected abstract void delete_impl( Futures fs );
Expand Down
3 changes: 3 additions & 0 deletions src/main/java/water/TAtomic.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,7 @@ public TAtomic(){}
T nnn = atomic(old);
return nnn == null ? null : new Value(_key,nnn);
}
@Override public void onSuccess( Value old ) { onSuccess(old==null?null:(T)old.get()); }
// Upcast the old value to T
public void onSuccess( T old ) { }
}
2 changes: 1 addition & 1 deletion src/main/java/water/parser/DParseTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ public AtomicUnion(byte[] buf, int dstOff){
return new Value(_key,bits2);
}

@Override public void onSuccess(){
@Override public void onSuccess(Value old){
_bits = null; // Do not return the bits
}
}
Expand Down

0 comments on commit 53be2c3

Please sign in to comment.