8
8
import water .*;
9
9
import water .Job .ColumnsJob ;
10
10
import water .api .*;
11
- import water .fvec .*;
11
+ import water .fvec .Chunk ;
12
+ import water .fvec .Vec ;
12
13
import water .util .Utils ;
13
14
14
15
/**
@@ -49,14 +50,13 @@ public KMeans2() {
49
50
for ( int i = 0 ; i < cols .length ; i ++ )
50
51
names [i ] = source ._names [cols [i ]];
51
52
Vec [] vecs = selectVecs (source );
52
- Frame frame = new Frame (names , vecs );
53
53
// Fill-in response based on K
54
- Vec response = frame . anyVec (). makeZero () ;
55
- response . _domain = new String [ k ];
56
- for ( int i = 0 ; i < response . _domain . length ; i ++ )
57
- response . _domain [ i ] = "Cluster " + i ;
58
- frame . add ( "response" , response );
59
- KMeans2Model model = new KMeans2Model (destination_key , sourceKey , frame );
54
+ String [] domain = new String [ k ] ;
55
+ for ( int i = 0 ; i < domain . length ; i ++ )
56
+ domain [ i ] = "Cluster " + i ;
57
+ String [] namesResp = Utils . append ( names , "response" ) ;
58
+ String [][] domaiResp = ( String [][]) Utils . append ( source . domains (), ( Object ) domain );
59
+ KMeans2Model model = new KMeans2Model (destination_key , sourceKey , namesResp , domaiResp );
60
60
61
61
double [] subs = null , muls = null ;
62
62
if ( normalize ) {
@@ -101,7 +101,7 @@ public KMeans2() {
101
101
sampler ._subs = subs ;
102
102
sampler ._muls = muls ;
103
103
sampler .doAll (vecs );
104
- clusters = DRemoteTask . merge (clusters , sampler ._sampled );
104
+ clusters = Utils . append (clusters , sampler ._sampled );
105
105
106
106
if ( cancelled () )
107
107
return ;
@@ -120,15 +120,11 @@ public KMeans2() {
120
120
task ._subs = subs ;
121
121
task ._muls = muls ;
122
122
task .doAll (vecs );
123
- for ( int cluster = 0 ; cluster < clusters .length ; cluster ++ ) {
124
- if ( task ._counts [cluster ] > 0 ) {
125
- for ( int vec = 0 ; vec < vecs .length ; vec ++ ) {
126
- double value = task ._sums [cluster ][vec ] / task ._counts [cluster ];
127
- clusters [cluster ][vec ] = value ;
128
- }
129
- }
130
- }
131
- model .clusters = normalize ? denormalize (clusters , vecs ) : clusters ;
123
+ model .clusters = normalize ? denormalize (task ._means , vecs ) : task ._means ;
124
+ for ( int clu = 0 ; clu < task ._sigms .length ; clu ++ )
125
+ for ( int col = 0 ; col < task ._sigms [clu ].length ; col ++ )
126
+ task ._sigms [clu ][col ] = task ._sigms [clu ][col ] / (task ._rows [clu ] - 1 );
127
+ model .variances = task ._sigms ;
132
128
model .error = task ._sqr ;
133
129
model .iterations ++;
134
130
UKV .put (destination_key , model );
@@ -141,7 +137,9 @@ public KMeans2() {
141
137
142
138
@ Override protected Response redirect () {
143
139
String n = KMeans2Progress .class .getSimpleName ();
144
- return new Response (Response .Status .redirect , this , -1 , -1 , n , "job" , job_key , "dst_key" , destination_key );
140
+ return new Response (Response .Status .redirect , this , -1 , -1 , n , //
141
+ "job_key" , job_key , //
142
+ "destination_key" , destination_key );
145
143
}
146
144
147
145
public static class KMeans2Progress extends Progress2 {
@@ -204,7 +202,7 @@ public static class KMeans2Model extends Model implements Progress {
204
202
@ API (help = "Sum of min square distances" )
205
203
public double error ;
206
204
207
- @ API (help = "Whether data should be normalized" )
205
+ @ API (help = "Whether data was normalized" )
208
206
public boolean normalized ;
209
207
210
208
@ API (help = "Maximum number of iterations before stopping" )
@@ -213,11 +211,14 @@ public static class KMeans2Model extends Model implements Progress {
213
211
@ API (help = "Iterations the algorithm ran" )
214
212
public int iterations ;
215
213
214
+ @ API (help = "In-cluster variances" )
215
+ public double [][] variances ;
216
+
216
217
private transient double [] _subs , _muls ; // Normalization
217
218
private transient double [][] _normClust ;
218
219
219
- public KMeans2Model (Key selfKey , Key dataKey , Frame fr ) {
220
- super (selfKey , dataKey , fr );
220
+ public KMeans2Model (Key selfKey , Key dataKey , String names [], String domains [][] ) {
221
+ super (selfKey , dataKey , names , domains );
221
222
}
222
223
223
224
@ Override public float progress () {
@@ -301,47 +302,65 @@ public static class Sampler extends MRTask2<Sampler> {
301
302
}
302
303
303
304
@ Override public void reduce (Sampler other ) {
304
- _sampled = DRemoteTask . merge (_sampled , other ._sampled );
305
+ _sampled = Utils . append (_sampled , other ._sampled );
305
306
}
306
307
}
307
308
308
309
public static class Lloyds extends MRTask2 <Lloyds > {
309
310
// IN
310
311
double [][] _clusters ;
311
- double [] _subs , _muls ; // Normalization
312
+ double [] _subs , _muls ; // Normalization
312
313
313
314
// OUT
314
- double [][] _sums ; // Sum of (normalized) features in each cluster
315
- int [] _counts ; // Count of rows in cluster
316
- double _sqr ; // Total sqr distance
315
+ double [][] _means , _sigms ; // Means and sigma for each cluster
316
+ long [] _rows ; // Rows per cluster
317
+ double _sqr ; // Total sqr distance
317
318
318
319
@ Override public void map (Chunk [] cs ) {
319
- double [] values = new double [_clusters [0 ].length ];
320
- _sums = new double [_clusters .length ][values .length ];
321
- _counts = new int [_clusters .length ];
322
- ClusterDist cd = new ClusterDist ();
320
+ _means = new double [_clusters .length ][_clusters [0 ].length ];
321
+ _sigms = new double [_clusters .length ][_clusters [0 ].length ];
322
+ _rows = new long [_clusters .length ];
323
323
324
324
// Find closest cluster for each row
325
+ double [] values = new double [_clusters [0 ].length ];
326
+ ClusterDist cd = new ClusterDist ();
327
+ int [] clusters = new int [cs [0 ]._len ];
325
328
for ( int row = 0 ; row < cs [0 ]._len ; row ++ ) {
326
329
data (values , cs , row , _subs , _muls );
327
330
closest (_clusters , values , cd );
328
- int cluster = cd ._cluster ;
331
+ int clu = clusters [ row ] = cd ._cluster ;
329
332
_sqr += cd ._dist ;
330
- if ( cluster == -1 )
333
+ if ( clu == -1 )
331
334
continue ; // Ignore broken row
332
335
333
336
// Add values and increment counter for chosen cluster
334
- Utils .add (_sums [cluster ], values );
335
- _counts [cluster ]++;
337
+ for ( int col = 0 ; col < values .length ; col ++ )
338
+ _means [clu ][col ] += values [col ];
339
+ _rows [clu ]++;
340
+ }
341
+ for ( int clu = 0 ; clu < _means .length ; clu ++ )
342
+ for ( int col = 0 ; col < _means [clu ].length ; col ++ )
343
+ _means [clu ][col ] /= _rows [clu ];
344
+ // Second pass for in-cluster variances
345
+ for ( int row = 0 ; row < cs [0 ]._len ; row ++ ) {
346
+ int clu = clusters [row ];
347
+ if ( clu == -1 )
348
+ continue ;
349
+ data (values , cs , row , _subs , _muls );
350
+ for ( int col = 0 ; col < values .length ; col ++ ) {
351
+ double delta = values [col ] - _means [clu ][col ];
352
+ _sigms [clu ][col ] += delta * delta ;
353
+ }
336
354
}
337
355
_clusters = null ;
338
356
_subs = _muls = null ;
339
357
}
340
358
341
- @ Override public void reduce (Lloyds other ) {
342
- Utils .add (_sums , other ._sums );
343
- Utils .add (_counts , other ._counts );
344
- _sqr += other ._sqr ;
359
+ @ Override public void reduce (Lloyds mr ) {
360
+ for ( int clu = 0 ; clu < _means .length ; clu ++ )
361
+ Layer .Stats .reduce (_means [clu ], _sigms [clu ], _rows [clu ], mr ._means [clu ], mr ._sigms [clu ], mr ._rows [clu ]);
362
+ Utils .add (_rows , mr ._rows );
363
+ _sqr += mr ._sqr ;
345
364
}
346
365
}
347
366
0 commit comments