From 092992f78aef89b7065ac16d28f30dde2bb680e1 Mon Sep 17 00:00:00 2001 From: jgustave Date: Wed, 19 Feb 2014 12:28:55 -0800 Subject: [PATCH] Reservoir Sampling v1. --- h2o-scala/build.sbt | 4 +- .../java/water/api/dsl/util/Reservoir.java | 122 ++++++++++++++++++ .../java/water/api/dsl/util/SampleItem.java | 51 ++++++++ .../scala/water/api/dsl/util/Sampler.scala | 38 ++++++ 4 files changed, 214 insertions(+), 1 deletion(-) create mode 100644 h2o-scala/src/main/java/water/api/dsl/util/Reservoir.java create mode 100644 h2o-scala/src/main/java/water/api/dsl/util/SampleItem.java create mode 100644 h2o-scala/src/main/scala/water/api/dsl/util/Sampler.scala diff --git a/h2o-scala/build.sbt b/h2o-scala/build.sbt index 7f6bc5ba5c..4ede8d0d8f 100644 --- a/h2o-scala/build.sbt +++ b/h2o-scala/build.sbt @@ -31,6 +31,8 @@ libraryDependencies += "org.javassist" % "javassist" % "3.16.1-GA" libraryDependencies += "org.apache.hadoop" % "hadoop-client" % "1.1.0" +libraryDependencies += "com.github.wookietreiber" %% "scala-chart" % "latest.integration" + libraryDependencies <+= scalaVersion { v => "org.scala-lang" % "scala-library" % v } libraryDependencies <+= scalaVersion { v => "org.scala-lang" % "scala-compiler" % v } @@ -61,7 +63,7 @@ unmanagedClasspath in Compile += h2oSources.value unmanagedClasspath in Runtime += h2oClasses.value -// Setup run +// Setup run // - Fork in run fork in run := true diff --git a/h2o-scala/src/main/java/water/api/dsl/util/Reservoir.java b/h2o-scala/src/main/java/water/api/dsl/util/Reservoir.java new file mode 100644 index 0000000000..4712b36f2d --- /dev/null +++ b/h2o-scala/src/main/java/water/api/dsl/util/Reservoir.java @@ -0,0 +1,122 @@ +package water.api.dsl.util; + +import water.AutoBuffer; +import water.Iced; + +import java.util.PriorityQueue; + +/** + * Simple utility to perform reservoir sampling in order to make huge data small enough to + * bring in to local memory and display/etc. + * I did this in Java instead of Scala, because I wasn't sure about Serialization/transient issues in Scala. + * + * Not Entirely Sure about threading/synchronization in this model + */ +public class Reservoir extends Iced { + + private transient final PriorityQueue minHeap = new PriorityQueue(); + public final int reservoirSize; + + //Needed For Serialization? + public Reservoir() { + this.reservoirSize = -1; + } + + public Reservoir(int reservoirSize) { + this.reservoirSize = reservoirSize; + } + + @Override + public water.AutoBuffer write(AutoBuffer bb) { + int[] order = new int[minHeap.size()]; + double[] vals = new double[minHeap.size()]; //TODO: Till we figure out nulls + + int x=0; + for(SampleItem item : minHeap) { + order[x] = item.getRandomOrder(); + vals[x] = item.getValue();//==null?0:item.getValue(); //TODO: till we figure out nulls + x++; + } + bb.put4(reservoirSize); + bb.putA4(order); + bb.putA8d(vals); + return( bb ); + } + + @SuppressWarnings("unchecked") + @Override + public water.api.dsl.util.Reservoir read(AutoBuffer bb) { + int rSize = bb.get4(); + int[] order = bb.getA4(); + double[] vals = bb.getA8d(); + Reservoir reservoir = new Reservoir(rSize); + + for( int x=0;x than the lest item in the heap.. then swap them out. + if( item.getRandomOrder() > head.getRandomOrder() ) { + minHeap.poll(); + minHeap.add(item); + } + } + } + } + +// synchronized public void merge( Reservoir other ) { +// if( other != null ) { +// for( SampleItem item : other.minHeap) { +// add(item); +// } +// } +// } + synchronized public Reservoir merge( Reservoir other ) { + Reservoir result = new Reservoir(this.reservoirSize); + for(SampleItem item : this.minHeap ) { + result.add(item); + } + if( other != null ) { + for(SampleItem item : other.minHeap ) { + result.add(item); + } + } + return( result ); + } + + + synchronized public double[] getValues() { + double[] result = new double[minHeap.size()]; + + int x=0; + for(SampleItem item : minHeap) { + result[x] = item.getValue(); + x++; + } + return( result ); + } + + synchronized public int getNumValues(){ + return( minHeap.size() ); + } + +// public static void main(String[] args) { +// Reservoir reservoir = new Reservoir(10); +// for( int x=0;x<15;x++) { +// reservoir.add((double)x); +// } +// } +} diff --git a/h2o-scala/src/main/java/water/api/dsl/util/SampleItem.java b/h2o-scala/src/main/java/water/api/dsl/util/SampleItem.java new file mode 100644 index 0000000000..d80c3b6df5 --- /dev/null +++ b/h2o-scala/src/main/java/water/api/dsl/util/SampleItem.java @@ -0,0 +1,51 @@ +package water.api.dsl.util; + +import water.Iced; + +import java.util.Random; + +/** + * Wraps an value, with a random int (Used in reservoir sampling) + */ +public class SampleItem extends Iced implements Comparable { + private static final Random rand = new Random(); + public final int randomOrder; + public final double value; + + public SampleItem(int randomOrder, double value) { + this.randomOrder = randomOrder; + this.value = value; + } + + public SampleItem(double value) { + this.value = value; + this.randomOrder = rand.nextInt(); + } + + public int getRandomOrder() { + return randomOrder; + } + + public double getValue() { + return value; + } + + @Override + public int compareTo(SampleItem that) { + if(this == that) { + return(0); + }else if(that == null ) { + return(1); + }else { + return( java.lang.Integer.compare(this.randomOrder,that.randomOrder) ); + } + } + + @Override + public String toString() { + return "SampleItemStub{" + + "randomOrder=" + randomOrder + + ", value=" + value + + '}'; + } +} diff --git a/h2o-scala/src/main/scala/water/api/dsl/util/Sampler.scala b/h2o-scala/src/main/scala/water/api/dsl/util/Sampler.scala new file mode 100644 index 0000000000..edc6933c5c --- /dev/null +++ b/h2o-scala/src/main/scala/water/api/dsl/util/Sampler.scala @@ -0,0 +1,38 @@ +package water.api.dsl.util + +import water.api.dsl.T_T_Collect + +/** + * Resevoir Sampler to extract a column from a DataFrame and bring it to the local context. + * + * + * Some Scala REPL foo: + * + * val g = parse ("/Users/jerdavis/temp/export1.gz") + * import water.api.dsl.util._ + * val smallData = g(100) collect ( new Reservoir(1000), new Sampler() ) + * + * + * import scalax.chart._ + * import scalax.chart.Charting._ + * + * val data = (1 to smallData.getNumValues) zip smallData.getValues + * val dataset = data.toXYSeriesCollection("some points") + * val chart = XYLineChart(dataset) + * chart.show + * + * Not entirely sure about threading / synchronization in this model + */ +class Sampler extends T_T_Collect[Reservoir,scala.Double] { + + override def apply(acc:Reservoir, rhs:Array[scala.Double]):Reservoir = { + for( x <- rhs ) { + acc.add(x) + } + acc + } + + override def reduce(lhs:Reservoir,rhs:Reservoir) = { + lhs.merge(rhs) + } +} \ No newline at end of file