|
4 | 4 | import water.H2O;
|
5 | 5 | import water.Key;
|
6 | 6 | import water.MRTask2;
|
| 7 | +import water.util.Log; |
7 | 8 |
|
8 | 9 | import java.util.Arrays;
|
9 | 10 | import java.util.Iterator;
|
@@ -98,41 +99,46 @@ public static class RebalanceTask extends MRTask2<RebalanceTask> {
|
98 | 99 | @Override public boolean logVerbose() { return false; }
|
99 | 100 |
|
100 | 101 | private void rebalanceChunk(Vec srcVec, Chunk chk){
|
101 |
| - NewChunk dst = new NewChunk(chk); |
102 |
| - dst._len = dst._sparseLen = 0; |
103 |
| - int rem = chk._len; |
104 |
| - while(rem > 0 && dst._len < chk._len){ |
105 |
| - Chunk srcRaw = srcVec.chunkForRow(chk._start+dst._len); |
106 |
| - NewChunk src = new NewChunk((srcRaw)); |
107 |
| - src = srcRaw.inflate_impl(src); |
108 |
| - assert src._len == srcRaw._len; |
109 |
| - int srcFrom = (int)(chk._start+dst._len - src._start); |
110 |
| - // check if the result is sparse (not exact since we only take subset of src in general) |
111 |
| - if((src.sparse() && dst.sparse()) || (src._len + dst._len < NewChunk.MIN_SPARSE_RATIO*(src._len + dst._len))){ |
112 |
| - src.set_sparse(src._sparseLen); |
113 |
| - dst.set_sparse(dst._sparseLen); |
| 102 | + try { |
| 103 | + NewChunk dst = new NewChunk(chk); |
| 104 | + dst._len = dst._sparseLen = 0; |
| 105 | + int rem = chk._len; |
| 106 | + while (rem > 0 && dst._len < chk._len) { |
| 107 | + Chunk srcRaw = srcVec.chunkForRow(chk._start + dst._len); |
| 108 | + NewChunk src = new NewChunk((srcRaw)); |
| 109 | + src = srcRaw.inflate_impl(src); |
| 110 | + assert src._len == srcRaw._len; |
| 111 | + int srcFrom = (int) (chk._start + dst._len - src._start); |
| 112 | + // check if the result is sparse (not exact since we only take subset of src in general) |
| 113 | + if ((src.sparse() && dst.sparse()) || (src._len + dst._len < NewChunk.MIN_SPARSE_RATIO * (src._len + dst._len))) { |
| 114 | + src.set_sparse(src._sparseLen); |
| 115 | + dst.set_sparse(dst._sparseLen); |
| 116 | + } |
| 117 | + final int srcTo = srcFrom + rem; |
| 118 | + int off = srcFrom - 1; |
| 119 | + Iterator<NewChunk.Value> it = src.values(Math.max(0, srcFrom), srcTo); |
| 120 | + while (it.hasNext()) { |
| 121 | + NewChunk.Value v = it.next(); |
| 122 | + final int rid = v.rowId0(); |
| 123 | + assert rid < srcTo; |
| 124 | + int add = rid - off; |
| 125 | + off = rid; |
| 126 | + dst.addZeros(add - 1); |
| 127 | + v.add2Chunk(dst); |
| 128 | + rem -= add; |
| 129 | + assert rem >= 0; |
| 130 | + } |
| 131 | + int trailingZeros = Math.min(rem, src._len - off - 1); |
| 132 | + dst.addZeros(trailingZeros); |
| 133 | + rem -= trailingZeros; |
114 | 134 | }
|
115 |
| - final int srcTo = srcFrom + rem; |
116 |
| - int off = srcFrom-1; |
117 |
| - Iterator<NewChunk.Value> it = src.values(Math.max(0,srcFrom),srcTo); |
118 |
| - while(it.hasNext()){ |
119 |
| - NewChunk.Value v = it.next(); |
120 |
| - final int rid = v.rowId0(); |
121 |
| - assert rid < srcTo; |
122 |
| - int add = rid - off; |
123 |
| - off = rid; |
124 |
| - dst.addZeros(add-1); |
125 |
| - v.add2Chunk(dst); |
126 |
| - rem -= add; |
127 |
| - assert rem >= 0; |
128 |
| - } |
129 |
| - int trailingZeros = Math.min(rem,src._len - off -1); |
130 |
| - dst.addZeros(trailingZeros); |
131 |
| - rem -= trailingZeros; |
| 135 | + assert rem == 0 : "rem = " + rem; |
| 136 | + assert dst._len == chk._len : "len2 = " + dst._len + ", _len = " + chk._len; |
| 137 | + dst.close(dst.cidx(), _fs); |
| 138 | + } catch(RuntimeException t){ |
| 139 | + Log.err("got exception while rebalancing chunk " + chk); |
| 140 | + throw t; |
132 | 141 | }
|
133 |
| - assert rem == 0:"rem = " + rem; |
134 |
| - assert dst._len == chk._len:"len2 = " + dst._len + ", _len = " + chk._len; |
135 |
| - dst.close(dst.cidx(),_fs); |
136 | 142 | }
|
137 | 143 | @Override public void map(Chunk [] chks){
|
138 | 144 | for(int i = 0; i < chks.length; ++i)
|
|
0 commit comments