Skip to content

Commit

Permalink
Remove H2O Node limit of 58
Browse files Browse the repository at this point in the history
Split apart the R/W lock for in-progress-Gets & Puts from the BitSet
holding the list of replicating Nodes.   R/W lock count can now go to
2^31.
  • Loading branch information
cliffclick committed Feb 16, 2014
1 parent 0cdec17 commit 167f00c
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 96 deletions.
2 changes: 1 addition & 1 deletion prj.el
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
'(jde-run-option-debug nil)
'(jde-run-option-vm-args (quote ("-XX:+PrintGC")))
'(jde-compile-option-directory "./target/classes")
'(jde-run-option-application-args (quote ("-beta" "-mainClass" "org.junit.runner.JUnitCore" "water.exec.DdplyTest")))
'(jde-run-option-application-args (quote ("-beta" "-mainClass" "org.junit.runner.JUnitCore" "water.KVTest")))
'(jde-debugger (quote ("JDEbug")))
'(jde-compile-option-source (quote ("1.6")))
'(jde-compile-option-classpath (quote ("./target/classes" "./lib/javassist.jar" "./lib/hadoop/cdh4/hadoop-common.jar" "./lib/hadoop/cdh4/hadoop-auth.jar" "./lib/hadoop/cdh4/slf4j-api-1.6.1.jar" "./lib/hadoop/cdh4/slf4j-nop-1.6.1.jar" "./lib/hadoop/cdh4/hadoop-hdfs.jar" "./lib/hadoop/cdh4/protobuf-java-2.4.0a.jar" "./lib/apache/commons-codec-1.4.jar" "./lib/apache/commons-configuration-1.6.jar" "./lib/apache/commons-lang-2.4.jar" "./lib/apache/commons-logging-1.1.1.jar" "./lib/apache/httpclient-4.1.1.jar" "./lib/apache/httpcore-4.1.jar" "./lib/junit/junit-4.11.jar" "./lib/apache/guava-12.0.1.jar" "./lib/gson/gson-2.2.2.jar" "./lib/poi/poi-3.8-20120326.jar" "./lib/poi/poi-ooxml-3.8-20120326.jar" "./lib/poi/poi-ooxml-schemas-3.8-20120326.jar" "./lib/poi/dom4j-1.6.1.jar" "./lib/Jama/Jama.jar" "./lib/s3/aws-java-sdk-1.3.27.jar" "./lib/log4j/log4j-1.2.15.jar")))
Expand Down
1 change: 0 additions & 1 deletion src/main/java/water/TaskPutKey.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ static void put( H2ONode h2o, Key key, Value val, Futures fs, boolean dontCache)
assert _key.home() || _val==null; // Only PUT to home for keys, or remote invalidation from home
Paxos.lockCloud();
// Initialize Value for having a single known replica (the sender)
//if( _val != null && !_dontCache ) _val.initReplicaHome(sender,_key);
if( _val != null ) _val.initReplicaHome(sender,_key);
// Spin, until we update something.
Value old = H2O.raw_get(_key); // Raw-get: do not lazy-manifest if overwriting
Expand Down
184 changes: 90 additions & 94 deletions src/main/java/water/Value.java
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
package water;

import java.io.*;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicInteger;

import jsr166y.ForkJoinPool;
import water.Job.ProgressMonitor;
import water.api.Constants.Extensions;
import water.fvec.*;
import water.nbhm.NonBlockingSetInt;
import water.persist.*;

/**
Expand Down Expand Up @@ -297,7 +298,7 @@ public boolean isHex() {
if( va._cols == null || va._cols.length == 0 ) return false;
if( va._cols.length > 1 ) return true;
if( va._cols[0]._size != 1 ) return true;
return _key.toString().endsWith(Extensions.HEX);
return _key.toString().endsWith(water.api.Constants.Extensions.HEX);
}

public boolean isBitIdentical( Value v ) {
Expand Down Expand Up @@ -337,7 +338,8 @@ public Value(Key k, int max, byte[] mem, short type, byte be ) {
// passed-in persist bits
byte p = (byte)(be&BACKEND_MASK);
_persist = (p==ICE) ? p : be;
_replicas = new AtomicLong(0);
_rwlock = new AtomicInteger(0);
_replicas = k.home() ? new NonBlockingSetInt() : null;
}
public Value(Key k, byte[] mem ) { this(k, mem.length, mem, TypeMap.PRIM_B, ICE); }
public Value(Key k, int max ) { this(k, max, new byte[max], TypeMap.PRIM_B, ICE); }
Expand All @@ -355,7 +357,8 @@ public Value(Key k, Iced pojo, byte be ) {
// passed-in persist bits
byte p = (byte)(be&BACKEND_MASK);
_persist = (p==ICE) ? p : be;
_replicas = new AtomicLong(0);
_rwlock = new AtomicInteger(0);
_replicas = k.home() ? new NonBlockingSetInt() : null;
}
public Value(Key k, Freezable pojo) {
_key = k;
Expand All @@ -364,11 +367,13 @@ public Value(Key k, Freezable pojo) {
_mem = pojo.write(new AutoBuffer()).buf();
_max = _mem.length;
_persist = ICE;
_replicas = new AtomicLong(0);
_rwlock = new AtomicInteger(0);
_replicas = k.home() ? new NonBlockingSetInt() : null;
}
// Nullary constructor for weaving
public Value() {
_replicas = new AtomicLong(0);
_rwlock = new AtomicInteger(0);
_replicas = new NonBlockingSetInt();
}

// Custom serializers: the _mem field is racily cleared by the MemoryManager
Expand All @@ -389,10 +394,10 @@ public Value read(AutoBuffer bb) {
_mem = bb.getA1();
_max = _mem.length;
_pojo = null;
// On remote nodes _replicas is initialized to 0 (signaling a remote PUT is
// On remote nodes _rwlock is initialized to 0 (signaling a remote PUT is
// in progress) flips to -1 when the remote PUT is done, or +1 if a notify
// needs to happen.
_replicas.set(-1); // Set as 'remote put is done'
_rwlock.set(-1); // Set as 'remote put is done'
touch();
return this;
}
Expand Down Expand Up @@ -450,13 +455,11 @@ public static Value lazyArrayChunk( Key key ) {
// ordering (3-4) can't be used to create the effect.
//
// A Reader/Writer lock for the home node to control racing Gets and Puts.
// - A bitvector of up to 58 cloud nodes known to have replicas
// - 6 bits of active Gets (reader-lock count up to 58), or -1/63- locked
// Active Readers/Gets atomically set the r/w lock count AND set their
// replication-bit (or fail because the lock is write-locked, in which
// case they Get is retried from the start and should see a new Value).
// - 0 for unlocked
// - +N for locked by N concurrent GETs-in-flight
// - -1 for write-locked
//
// An ACK from the client GET lowers the r/w lock count.
// An ACKACK from the client GET lowers the reader lock count.
//
// Home node PUTs alter which Value is mapped to a Key, then they block until
// there are no active GETs, then atomically set the write-lock, then send
Expand All @@ -469,139 +472,132 @@ public static Value lazyArrayChunk( Key key ) {
// Key from the same JVM). The 2nd PUT will CAS the value to 1, indicating
// the need for the finishing 1st PUT to call notify().
//
// Note that this sequence involves a lot of blocking on the writes, but not
// the readers - i.e., writes are slow to complete.
private transient final AtomicLong _replicas;
public int numReplicas() {
long r = _replicas.get();
int c = 0;
for( int i = 0; i < 58; ++i ) c += (r >> i) & 0x01;
return c;
// Note that this sequence involves a lot of blocking on repeated writes with
// cached readers, but not the readers - i.e., writes are slow to complete.
private transient final AtomicInteger _rwlock;
private boolean RW_CAS( int old, int nnn, String msg ) {
if( !_rwlock.compareAndSet(old,nnn) ) return false;
//System.out.println(_key+", "+old+" -> "+nnn+", "+msg);
return true;
}

// List of who is replicated where
private transient final NonBlockingSetInt _replicas;
public int numReplicas() { return _replicas.size(); }
/** True if h2o has a copy of this Value */
boolean isReplicatedTo( H2ONode h2o ) {
assert h2o._unique_idx<58;
return (_replicas.get()&(1L<<h2o._unique_idx)) != 0;
}

private static int decodeReaderCount(long replicas) { return (int) (replicas >>> 58); }
private static long encodeReaderCount(int readers) { return ((long)readers) << 58; }
boolean isReplicatedTo( H2ONode h2o ) { return _replicas.contains(h2o._unique_idx); }

/** Atomically insert h2o into the replica list; reports false if the Value
* flagged against future replication with a -1/63. Also bumps the active
* Get count, which remains until the Get completes (we recieve an ACKACK). */
* flagged against future replication with a -1. Also bumps the active
* Get count, which remains until the Get completes (we receive an ACKACK). */
boolean setReplica( H2ONode h2o ) {
assert h2o._unique_idx<58;
assert _key.home(); // Only the HOME node for a key tracks replicas
assert h2o != H2O.SELF; // Do not track self as a replica
while( true ) { // Repeat, in case racing GETs are bumping the counter
long old = _replicas.get();
if( old == -1 ) return false; // No new replications
long nnn = old + encodeReaderCount(1);
nnn |= (1L<<h2o._unique_idx); // Set replica bit for H2O
assert decodeReaderCount(nnn) < 58; // Count does not overflow
assert decodeReaderCount(nnn) > 0; // At least one reader now
if( _replicas.compareAndSet(old,nnn) ) return true;
int old = _rwlock.get();
if( old == -1 ) return false; // Write-locked; no new replications. Read fails to read *this* value
assert old >= 0; // Not negative
if( RW_CAS(old,old+1,"rlock+") ) break;
}
// Narrow non-race here. Here is a time window where the rwlock count went
// up, but the replica list does not account for the new replica. However,
// the rwlock cannot go down until an ACKACK is received, and the ACK
// (hence ACKACK) doesn't go out until after this function returns.
_replicas.add(h2o._unique_idx);
// Both rwlock taken, and replica count is up now.
return true;
}

/** Atomically lower active GET countn */
/** Atomically lower active GET count */
void lowerActiveGetCount( H2ONode h2o ) {
assert h2o._unique_idx<58;
assert _key.home(); // Only the HOME node for a key tracks replicas
assert h2o != H2O.SELF; // Do not track self as a replica
long nnn;
while( true ) { // Repeat, in case racing GETs are bumping the counter
long old = _replicas.get();
assert old != -1; // Not locked yet, because we are active
assert (old&(1L<<h2o._unique_idx)) !=0; // Self-bit is set
assert decodeReaderCount(old) > 0; // Since lowering, must be at least 1
nnn = old - encodeReaderCount(1);
assert decodeReaderCount(nnn) >= 0; // Count does not go negative
if( _replicas.compareAndSet(old,nnn) )
break; // Repeat until count is lowered
assert _key.home(); // Only the HOME node for a key tracks replicas
assert h2o != H2O.SELF;// Do not track self as a replica
while( true ) { // Repeat, in case racing GETs are bumping the counter
int old = _rwlock.get(); // Read the lock-word
assert old > 0; // Since lowering, must be at least 1
assert old != -1; // Not write-locked, because we are an active reader
assert _replicas.contains(h2o._unique_idx); // Self-bit is set
if( RW_CAS(old,old-1,"rlock-") ) {
if( old-1 == 0 ) // GET count fell to zero?
synchronized( this ) { notifyAll(); } // Notify any pending blocked PUTs
return; // Repeat until count is lowered
}
}
if( decodeReaderCount(nnn) == 0 ) // GET count fell to zero?
synchronized( this ) { notifyAll(); } // Notify any pending blocked PUTs
}

/** Atomically set the replica count to -1/63 locking it from further GETs and
* ship out invalidates to caching replicas. May need to block on active
* GETs. Updates a set of Future invalidates that can be blocked against. */
/** This value was atomically extracted from the local STORE by a successful
* TaskPutKey attempt (only 1 thread can ever extract and thus call here).
* No future lookups will find this Value, but there may be existing uses.
* Atomically set the rwlock count to -1 locking it from further GETs and
* ship out invalidates to caching replicas. May need to block on active
* GETs. Updates a set of Future invalidates that can be blocked against. */
Futures lockAndInvalidate( H2ONode sender, Futures fs ) {
assert _key.home(); // Only the HOME node for a key tracks replicas
// Lock against further GETs
long old = _replicas.get();
assert old != -1; // Only the thread doing a PUT ever locks
assert decodeReaderCount(old) >= 0; // Count does not go negative
// Repeat, in case racing GETs are bumping the counter
// assert my current lock-lvl is smaller than lock lvl of this lock
while( decodeReaderCount(old) > 0 || // Has readers?
!_replicas.compareAndSet(old,-1) ) { // or failed to lock?
// assert I am waiting only on threads with higher priority
try { ForkJoinPool.managedBlock(this); } catch( InterruptedException e ) { }
old = _replicas.get();
// Write-Lock against further GETs
while( true ) { // Repeat, in case racing GETs are bumping the counter
int old = _rwlock.get();
assert old >= 0 : _key+", rwlock="+old; // Count does not go negative
assert old != -1; // Only the thread doing a PUT ever locks
assert decodeReaderCount(old) >= 0; // Count does not go negative
if( old !=0 ) { // has readers?
// Active readers: need to block until the GETs (of this very Value!)
// all complete, before we can invalidate this Value - lest a racing
// Invalidate bypass a GET.
try { ForkJoinPool.managedBlock(this); } catch( InterruptedException e ) { }
} else if( RW_CAS(0,-1,"wlock") )
break; // Got the write-lock!
}
assert decodeReaderCount(old) == 0; // Only get here with no active readers
if( old == 0 ) return fs; // Nobody is caching, so nothing to block against

// We have the set of Nodes with replicas now. Ship out invalidates.
for( int i=0; i<58; i++ )
if( ((old>>i)&1) != 0 && H2ONode.IDX[i] != sender )
int max = _replicas.length();
for( int i=0; i<max; i++ )
if( _replicas.contains(i) && H2ONode.IDX[i] != sender )
TaskInvalidateKey.invalidate(H2ONode.IDX[i],_key,fs);
return fs;
}

/** Initialize the _replicas field for a PUT. On the Home node (for remote
* PUTs), it is initialized to the one replica we know about. */
* PUTs), it is initialized to the one replica we know about, and not
* read-locked. Used on a new Value about to be PUT on the Home node. */
void initReplicaHome( H2ONode h2o, Key key ) {
assert key.home();
assert _key == null; // This is THE initializing key write for serialized Values
assert h2o != H2O.SELF; // Do not track self as a replica
_key = key;
// Set the replica bit for the one node we know about, and leave the
// rest clear. No GETs are in-flight at this time.
_replicas.set(1L<<h2o._unique_idx);
// rest clear.
_replicas.add(h2o._unique_idx);
_rwlock.set(0); // No GETs are in-flight at this time.
//System.out.println(key+", init "+_rwlock.get());
}

/** Block this thread until all prior remote PUTs complete - to force
* remote-PUT ordering on the home node. */
* remote-PUT ordering on the home node. */
void startRemotePut() {
assert !_key.home();
long x = 0;
int x = 0;
// assert I am waiting on threads with higher priority?
while( (x=_replicas.get()) != -1L ) // Spin until replicas==-1
if( x == 1L || _replicas.compareAndSet(0L,1L) )
while( (x=_rwlock.get()) != -1 ) // Spin until rwlock==-1
if( x == 1 || RW_CAS(0,1,"remote_need_notify") )
try { ForkJoinPool.managedBlock(this); } catch( InterruptedException e ) { }
}

/** The PUT for this Value has completed. Wakeup any blocked later PUTs. */
void completeRemotePut() {
assert !_key.home();
// Attempt an eager blind attempt, assuming no blocked pending notifies
if( _replicas.compareAndSet(0L, -1L) ) return;
if( RW_CAS(0, -1,"remote_complete") ) return;
synchronized(this) {
boolean res = _replicas.compareAndSet(1L, -1L);
boolean res = RW_CAS(1, -1,"remote_do_notify");
assert res; // Must succeed
notifyAll(); // Wake up pending blocked PUTs
}
}

public boolean isRemotePutInFlight() {
assert !_key.home();
return _replicas.get() != -1;
}

/** Return true if blocking is unnecessary.
* Alas, used in TWO places and the blocking API forces them to share here. */
* Alas, used in TWO places and the blocking API forces them to share here. */
@Override public boolean isReleasable() {
long r = _replicas.get();
int r = _rwlock.get();
if( _key.home() ) { // Called from lock_and_invalidate
// Home-key blocking: wait for active-GET count to fall to zero
return decodeReaderCount(r) == 0;
return r == 0;
} else { // Called from start_put
// Remote-key blocking: wait for active-PUT lock to hit -1
assert r == 1 || r == -1; // Either waiting (1) or done (-1) but not started(0)
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/water/nbhm/NonBlockingSetInt.java
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ public boolean add( final int i ) {
* @return count of elements.
*/
public int size ( ) { return _nbsi.size( ); }
/** Approx largest element in set; at least as big (but max might be smaller). */
public int length() { return _nbsi._bits.length<<6; }
/** Empty the bitvector. */
public void clear ( ) {
NBSI cleared = new NBSI(63, new ConcurrentAutoTable(), this); // An empty initial NBSI
Expand Down

0 comments on commit 167f00c

Please sign in to comment.