Skip to content

Commit

Permalink
Fixed most k-means bugs
Browse files Browse the repository at this point in the history
  • Loading branch information
cypof committed Jun 18, 2013
1 parent 527d9cc commit 3f3f135
Show file tree
Hide file tree
Showing 27 changed files with 154 additions and 1,068 deletions.
4 changes: 0 additions & 4 deletions .classpath
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,6 @@
<classpathentry kind="lib" path="lib/hadoop/cdh3/jackson-mapper-asl-1.5.2.jar"/>
<classpathentry kind="lib" path="lib/hadoop/cdh3/log4j-1.2.15.jar" sourcepath="/home/cypof/.m2/repository/log4j/log4j/1.2.15/log4j-1.2.15-sources.jar"/>
<classpathentry kind="lib" path="lib/hadoop/cdh3/hadoop-core-0.20.2-cdh3u6.jar" sourcepath="lib/hadoop/cdh3/hadoop-core-0.20.2-cdh3u6-sources.jar"/>
<classpathentry kind="lib" path="lib/fastr/antlr-runtime-3.5.jar"/>
<classpathentry kind="lib" path="lib/fastr/arpack_combined_all.jar"/>
<classpathentry kind="lib" path="lib/fastr/netlib-java-0.9.3.jar"/>
<classpathentry kind="lib" path="lib/fastr/r.jar" sourcepath="lib/fastr/r-sources.jar"/>
<classpathentry kind="lib" path="lib/log4j/log4j-1.2.15.jar"/>
<classpathentry kind="output" path="target/classes"/>
</classpath>
2 changes: 1 addition & 1 deletion build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ JAR_ROOT=lib

# additional dependencies, relative to this file, but all dependencies should be
# inside the JAR_ROOT tree so that they are packed to the jar file properly
DEPENDENCIES="${JAR_ROOT}/fastr/*${SEP}${JAR_ROOT}/jama/*${SEP}${JAR_ROOT}/apache/*${SEP}${JAR_ROOT}/junit/*${SEP}${JAR_ROOT}/gson/*${SEP}${JAR_ROOT}/javassist.jar${SEP}${JAR_ROOT}/poi/*${SEP}${JAR_ROOT}/s3/*${SEP}${JAR_ROOT}/jets3t/*${SEP}${JAR_ROOT}/log4j/*"
DEPENDENCIES="${JAR_ROOT}/jama/*${SEP}${JAR_ROOT}/apache/*${SEP}${JAR_ROOT}/junit/*${SEP}${JAR_ROOT}/gson/*${SEP}${JAR_ROOT}/javassist.jar${SEP}${JAR_ROOT}/poi/*${SEP}${JAR_ROOT}/s3/*${SEP}${JAR_ROOT}/jets3t/*${SEP}${JAR_ROOT}/log4j/*"

DEFAULT_HADOOP_VERSION="cdh3"
OUTDIR="target"
Expand Down
Binary file removed lib/fastr/antlr-runtime-3.5.jar
Binary file not shown.
Binary file removed lib/fastr/arpack_combined_all.jar
Binary file not shown.
Binary file removed lib/fastr/netlib-java-0.9.3.jar
Binary file not shown.
Binary file removed lib/fastr/r-sources.jar
Binary file not shown.
Binary file removed lib/fastr/r.jar
Binary file not shown.
72 changes: 47 additions & 25 deletions src/main/java/hex/KMeans.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,20 @@ private KMeans(Key dest, int k, int... cols) {
super("KMeans K: " + k + ", Cols: " + cols.length, dest);
}

public static KMeans start(Key dest, final ValueArray va, final int k, final double epsilon, long randSeed,
boolean normalize, int... cols) {
public static KMeans start(Key dest, final ValueArray va, final int k, final double epsilon, final int maxIter,
long randSeed, boolean normalize, int... cols) {
final KMeans job = new KMeans(dest, k, cols);

// Unlike other models, k-means is a discovery-only procedure and does
// not require a response-column to train. This also means the clusters
// are not classes (although, if a class/response is associated with each
// k-means is an unsupervised learning algorithm and does not require a
// response-column to train. This also means the clusters are not classes
// (although, if a class/response is associated with each
// row we could count the number of each class in each cluster).
int cols2[] = Arrays.copyOf(cols, cols.length + 1);
cols2[cols.length] = -1; // No response column
final KMeansModel res = new KMeansModel(job.dest(), cols2, va._key);
res._normalized = normalize;
res._randSeed = randSeed;
res._maxIter = maxIter;
UKV.put(job.dest(), res);
// Updated column mapping selection after removing various junk columns
final int[] filteredCols = res.columnMapping(va.colNames());
Expand Down Expand Up @@ -93,10 +94,8 @@ private void run(KMeansModel res, ValueArray va, int k, double epsilon, int[] co
clusters = recluster(clusters, k, rand);
res._clusters = clusters;

// Iterate until no cluster mean moves more than epsilon
boolean moved = true;
while( moved ) {
moved = false;
for( ;; ) {
boolean moved = false;
Lloyds task = new Lloyds();
task._arykey = va._key;
task._cols = cols;
Expand All @@ -105,17 +104,24 @@ private void run(KMeansModel res, ValueArray va, int k, double epsilon, int[] co
task.invoke(va._key);

for( int cluster = 0; cluster < clusters.length; cluster++ ) {
for( int column = 0; column < cols.length - 1; column++ ) {
double value = task._sums[cluster][column] / task._counts[cluster];
if( Math.abs(value - clusters[cluster][column]) > epsilon ) {
moved = true;
if( task._counts[cluster] > 0 ) {
for( int column = 0; column < cols.length - 1; column++ ) {
double value = task._sums[cluster][column] / task._counts[cluster];
if( Math.abs(value - clusters[cluster][column]) > epsilon ) {
moved = true;
}
clusters[cluster][column] = value;
}
clusters[cluster][column] = value;
}
}

res._error = task._error;
res._iteration++;
UKV.put(dest(), res);
// Iterate until no cluster mean moves more than epsilon,
if( !moved ) break;
// reached max iterations,
if( res._maxIter != 0 && res._iteration >= res._maxIter ) break;
// or job cancelled
if( cancelled() ) break;
}

Expand Down Expand Up @@ -197,6 +203,7 @@ public static class Lloyds extends MRTask {

double[][] _sums; // OUT: Sum of (normalized) features in each cluster
int[] _counts; // OUT: Count of rows in cluster
double _error; // OUT: Total sqr distance

@Override public void map(Key key) {
assert key.home();
Expand All @@ -213,7 +220,9 @@ public static class Lloyds extends MRTask {
// Find closest cluster for each row
for( int row = 0; row < rows; row++ ) {
datad(va, bits, row, _cols, _normalize, values);
int cluster = closest(_clusters, values, cd)._cluster;
closest(_clusters, values, cd);
int cluster = cd._cluster;
_error += cd._dist;
if( cluster == -1 ) continue; // Ignore broken row

// Add values and increment counter for chosen cluster
Expand All @@ -231,12 +240,14 @@ public static class Lloyds extends MRTask {
if( _sums == null ) {
_sums = task._sums;
_counts = task._counts;
_error = task._error;
} else {
for( int cluster = 0; cluster < _counts.length; cluster++ ) {
for( int column = 0; column < _sums[0].length; column++ )
_sums[cluster][column] += task._sums[cluster][column];
_counts[cluster] += task._counts[cluster];
}
_error += task._error;
}
}
}
Expand Down Expand Up @@ -300,17 +311,28 @@ public static double[][] recluster(double[][] points, int k, Random rand) {
ClusterDist cd = new ClusterDist();

while( count < res.length ) {
// Compute total-square-distance from all points to all other points so-far
double sum = 0;
for( int i = 0; i < points.length; i++ )
sum += minSqr(res, points[i], cd, count);

// // Original k-means++, doesn't seem to help in many cases
// double sum = 0;
// for( int i = 0; i < points.length; i++ )
// sum += minSqr(res, points[i], cd, count);
//
// for( int i = 0; i < points.length; i++ ) {
// if( minSqr(res, points[i], cd, count) >= rand.nextDouble() * sum ) {
// res[count++] = points[i];
// break;
// }
// }
// Takes cluster further from any already chosen ones
double max = 0;
int index = 0;
for( int i = 0; i < points.length; i++ ) {
if( minSqr(res, points[i], cd, count) >= rand.nextDouble() * sum ) {
res[count++] = points[i];
break;
double sqr = minSqr(res, points[i], cd, count);
if( sqr > max ) {
max = sqr;
index = i;
}
}
res[count++] = points[index];
}

return res;
Expand All @@ -320,7 +342,7 @@ public static double[][] recluster(double[][] points, int k, Random rand) {
// know exists because we filtered out columns with no mean).
public static double[] datad(ValueArray va, AutoBuffer bits, int row, int[] cols, boolean normalize, double[] res) {
for( int c = 0; c < cols.length - 1; c++ ) {
ValueArray.Column C = va._cols[c];
ValueArray.Column C = va._cols[cols[c]];
// Use the mean if missing data, then center & normalize
double d = (va.isNA(bits, row, C) ? C._mean : va.datad(bits, row, C));
if( normalize ) {
Expand Down
4 changes: 3 additions & 1 deletion src/main/java/hex/KMeansModel.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
public class KMeansModel extends Model implements Progress {
public static final String NAME = KMeansModel.class.getSimpleName();
public double[][] _clusters; // The cluster centers, normalized according to _va
public double _error; // Sum of min square distances
public int _iteration;
public int _maxIter;
public long _randSeed;
public boolean _normalized;

Expand Down Expand Up @@ -247,7 +249,7 @@ public static Job run(final Key dest, final KMeansModel model, final ValueArray
KMeans.closest(_clusters, values, cd);
chunk = ValueArray.chknum(startRow + row, va.numRows(), ROW_SIZE);
if( chunk != updatedChk ) {
updateClusters(clusters, count, chunk, va.numRows(), rpc, updatedRow);
updateClusters(clusters, count, updatedChk, va.numRows(), rpc, updatedRow);
updatedChk = chunk;
updatedRow = startRow + row;
count = 0;
Expand Down
31 changes: 15 additions & 16 deletions src/main/java/hex/Plot.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
*/
public abstract class Plot {

static public byte[] run(ValueArray va, int width, int height, int... cols) {
public static byte[] run(ValueArray va, int width, int height, int... cols) {
// TODO PCA if more than two columns

// Count dots falling in each pixel
Expand All @@ -21,17 +21,20 @@ static public byte[] run(ValueArray va, int width, int height, int... cols) {
return task._pixels;
}

public static int scale(double d, Column c, int screen) {
d = (d - c._min) / (c._max - c._min);
return (int) (d * (screen - 1));
}

static class Pixels extends MRTask {
Key _arykey;
int _width, _height;
int[] _cols;
Key _arykey;
int _width, _height;
int[] _cols;

// Reduced
byte[] _pixels;

@Override
public void map(Key key) {

@Override public void map(Key key) {
assert key.home();
ValueArray va = DKV.get(_arykey).get();
AutoBuffer bits = va.getChunk(key);
Expand All @@ -43,22 +46,18 @@ public void map(Key key) {
Column cY = va._cols[_cols[1]];
double x = va.datad(bits, row, cX);
double y = va.datad(bits, row, cY);
x = (x - cX._min) / (cX._max - cX._min);
y = (y - cY._min) / (cY._max - cY._min);
int iX = (int) (x * (_width - 1));
int iY = (int) (y * (_height - 1));
int iX = scale(x, cX, _width);;
int iY = scale(y, cY, _height);
int value = _pixels[iY * _width + iX] & 0xff;
value = value == 0xff ? value : value + 1;
_pixels[iY * _width + iX] = (byte) value;
}
}

@Override
public void reduce(DRemoteTask rt) {
Pixels task = (Pixels) rt;
@Override public void reduce(DRemoteTask rt) {
Pixels task = (Pixels) rt;

if( _pixels == null )
_pixels = task._pixels;
if( _pixels == null ) _pixels = task._pixels;
else {
for( int i = 0; i < _pixels.length; i++ ) {
int value = _pixels[i] & 0xff + task._pixels[i] & 0xff;
Expand Down
1 change: 0 additions & 1 deletion src/main/java/water/Boot.java
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,6 @@ else if( (args[i].equals("--ice_root") || args[i].startsWith("-ice_root")) && i
addInternalJars("poi");
addInternalJars("s3");
addInternalJars("jets3t");
addInternalJars("fastr");
addInternalJars("log4j");
}

Expand Down
5 changes: 0 additions & 5 deletions src/main/java/water/H2O.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import water.nbhm.NonBlockingHashMap;
import water.parser.ParseDataset;
import water.persist.*;
import water.r.Shell;
import water.util.*;
import water.util.Log.Tag.Sys;

Expand Down Expand Up @@ -499,7 +498,6 @@ public static class OptArgs extends Arguments.Opt {
public String random_udp_drop = null; // test only, randomly drop udp incoming
public int pparse_limit = Integer.MAX_VALUE;
public String no_requests_log = null; // disable logging of Web requests
public String rshell="false"; //FastR shell
}
public static boolean IS_SYSTEM_RUNNING = false;

Expand Down Expand Up @@ -540,9 +538,6 @@ public static void main( String[] args ) {
initializeExpressionEvaluation(); // starts the expression evaluation system

startupFinalize(); // finalizes the startup & tests (if any)

if (OPT_ARGS.rshell.equals("true")) Shell.go();
// Hang out here until the End of Time
}

private static void initializeExpressionEvaluation() {
Expand Down
4 changes: 3 additions & 1 deletion src/main/java/water/api/KMeans.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
public class KMeans extends Request {
protected final H2OHexKey _source = new H2OHexKey(SOURCE_KEY);
protected final Int _k = new Int(K);
protected final Int _maxIter = new Int(MAX_ITER, 0);
protected final Real _epsilon = new Real(EPSILON, 1e-4);
protected final LongInt _seed = new LongInt(SEED, new Random().nextLong(), "");
protected final Bool _normalize = new Bool(NORMALIZE, false, "");
Expand All @@ -24,6 +25,7 @@ public class KMeans extends Request {
Key source = va._key;
int k = _k.value();
double epsilon = _epsilon.value();
int maxIter = _maxIter.value();
long seed = _seed.record()._valid ? _seed.value() : _seed._defaultValue;
boolean normalize = _normalize.record()._valid ? _normalize.value() : _normalize._defaultValue;
int[] cols = _columns.value();
Expand All @@ -37,7 +39,7 @@ public class KMeans extends Request {
}

try {
hex.KMeans job = hex.KMeans.start(dest, va, k, epsilon, seed, normalize, cols);
hex.KMeans job = hex.KMeans.start(dest, va, k, epsilon, maxIter, seed, normalize, cols);
JsonObject response = new JsonObject();
response.addProperty(JOB, job.self().toString());
response.addProperty(DEST_KEY, dest.toString());
Expand Down
Loading

0 comments on commit 3f3f135

Please sign in to comment.