Skip to content

Commit

Permalink
Reservoir Sampling v1.
Browse files Browse the repository at this point in the history
  • Loading branch information
jgustave authored and jgustave committed Feb 19, 2014
1 parent 6f435db commit 092992f
Show file tree
Hide file tree
Showing 4 changed files with 214 additions and 1 deletion.
4 changes: 3 additions & 1 deletion h2o-scala/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ libraryDependencies += "org.javassist" % "javassist" % "3.16.1-GA"

libraryDependencies += "org.apache.hadoop" % "hadoop-client" % "1.1.0"

libraryDependencies += "com.github.wookietreiber" %% "scala-chart" % "latest.integration"

libraryDependencies <+= scalaVersion { v => "org.scala-lang" % "scala-library" % v }

libraryDependencies <+= scalaVersion { v => "org.scala-lang" % "scala-compiler" % v }
Expand Down Expand Up @@ -61,7 +63,7 @@ unmanagedClasspath in Compile += h2oSources.value

unmanagedClasspath in Runtime += h2oClasses.value

// Setup run
// Setup run
// - Fork in run
fork in run := true

Expand Down
122 changes: 122 additions & 0 deletions h2o-scala/src/main/java/water/api/dsl/util/Reservoir.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package water.api.dsl.util;

import water.AutoBuffer;
import water.Iced;

import java.util.PriorityQueue;

/**
* Simple utility to perform reservoir sampling in order to make huge data small enough to
* bring in to local memory and display/etc.
* I did this in Java instead of Scala, because I wasn't sure about Serialization/transient issues in Scala.
*
* Not Entirely Sure about threading/synchronization in this model
*/
public class Reservoir extends Iced {

private transient final PriorityQueue<SampleItem> minHeap = new PriorityQueue<SampleItem>();
public final int reservoirSize;

//Needed For Serialization?
public Reservoir() {
this.reservoirSize = -1;
}

public Reservoir(int reservoirSize) {
this.reservoirSize = reservoirSize;
}

@Override
public water.AutoBuffer write(AutoBuffer bb) {
int[] order = new int[minHeap.size()];
double[] vals = new double[minHeap.size()]; //TODO: Till we figure out nulls

int x=0;
for(SampleItem item : minHeap) {
order[x] = item.getRandomOrder();
vals[x] = item.getValue();//==null?0:item.getValue(); //TODO: till we figure out nulls
x++;
}
bb.put4(reservoirSize);
bb.putA4(order);
bb.putA8d(vals);
return( bb );
}

@SuppressWarnings("unchecked")
@Override
public water.api.dsl.util.Reservoir read(AutoBuffer bb) {
int rSize = bb.get4();
int[] order = bb.getA4();
double[] vals = bb.getA8d();
Reservoir reservoir = new Reservoir(rSize);

for( int x=0;x<order.length;x++) {
reservoir.minHeap.add(new SampleItem(order[x], vals[x]));
}

return(reservoir);
}

public void add(double item) {
add( new SampleItem(item) );
}

synchronized public void add(SampleItem item) {
if( item != null ) {
if( minHeap.size() < reservoirSize) {
minHeap.add(item);
}else {
SampleItem head = minHeap.peek();
//If Item is > than the lest item in the heap.. then swap them out.
if( item.getRandomOrder() > head.getRandomOrder() ) {
minHeap.poll();
minHeap.add(item);
}
}
}
}

// synchronized public void merge( Reservoir other ) {
// if( other != null ) {
// for( SampleItem item : other.minHeap) {
// add(item);
// }
// }
// }
synchronized public Reservoir merge( Reservoir other ) {
Reservoir result = new Reservoir(this.reservoirSize);
for(SampleItem item : this.minHeap ) {
result.add(item);
}
if( other != null ) {
for(SampleItem item : other.minHeap ) {
result.add(item);
}
}
return( result );
}


synchronized public double[] getValues() {
double[] result = new double[minHeap.size()];

int x=0;
for(SampleItem item : minHeap) {
result[x] = item.getValue();
x++;
}
return( result );
}

synchronized public int getNumValues(){
return( minHeap.size() );
}

// public static void main(String[] args) {
// Reservoir reservoir = new Reservoir(10);
// for( int x=0;x<15;x++) {
// reservoir.add((double)x);
// }
// }
}
51 changes: 51 additions & 0 deletions h2o-scala/src/main/java/water/api/dsl/util/SampleItem.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package water.api.dsl.util;

import water.Iced;

import java.util.Random;

/**
* Wraps an value, with a random int (Used in reservoir sampling)
*/
public class SampleItem extends Iced implements Comparable<SampleItem> {
private static final Random rand = new Random();
public final int randomOrder;
public final double value;

public SampleItem(int randomOrder, double value) {
this.randomOrder = randomOrder;
this.value = value;
}

public SampleItem(double value) {
this.value = value;
this.randomOrder = rand.nextInt();
}

public int getRandomOrder() {
return randomOrder;
}

public double getValue() {
return value;
}

@Override
public int compareTo(SampleItem that) {
if(this == that) {
return(0);
}else if(that == null ) {
return(1);
}else {
return( java.lang.Integer.compare(this.randomOrder,that.randomOrder) );
}
}

@Override
public String toString() {
return "SampleItemStub{" +
"randomOrder=" + randomOrder +
", value=" + value +
'}';
}
}
38 changes: 38 additions & 0 deletions h2o-scala/src/main/scala/water/api/dsl/util/Sampler.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package water.api.dsl.util

import water.api.dsl.T_T_Collect

/**
* Resevoir Sampler to extract a column from a DataFrame and bring it to the local context.
*
*
* Some Scala REPL foo:
*
* val g = parse ("/Users/jerdavis/temp/export1.gz")
* import water.api.dsl.util._
* val smallData = g(100) collect ( new Reservoir(1000), new Sampler() )
*
*
* import scalax.chart._
* import scalax.chart.Charting._
*
* val data = (1 to smallData.getNumValues) zip smallData.getValues
* val dataset = data.toXYSeriesCollection("some points")
* val chart = XYLineChart(dataset)
* chart.show
*
* Not entirely sure about threading / synchronization in this model
*/
class Sampler extends T_T_Collect[Reservoir,scala.Double] {

override def apply(acc:Reservoir, rhs:Array[scala.Double]):Reservoir = {
for( x <- rhs ) {
acc.add(x)
}
acc
}

override def reduce(lhs:Reservoir,rhs:Reservoir) = {
lhs.merge(rhs)
}
}

0 comments on commit 092992f

Please sign in to comment.