Skip to content

Commit

Permalink
[FLINK-2076] [runtime] Fix memory leakage in MutableHashTable
Browse files Browse the repository at this point in the history
This closes apache#751
  • Loading branch information
chiwanpark authored and StephanEwen committed Jun 4, 2015
1 parent 7411343 commit 1559701
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,10 @@ protected boolean prepareNextPartition() throws IOException {

List<MemorySegment> memory = new ArrayList<MemorySegment>();
memory.add(getNextBuffer());
memory.add(getNextBuffer());
MemorySegment nextBuffer = getNextBuffer();
if (nextBuffer != null) {
memory.add(nextBuffer);
}

ChannelReaderInputViewIterator<PT> probeReader = new ChannelReaderInputViewIterator<PT>(this.currentSpilledProbeSide,
returnQueue, memory, this.availableMemory, this.probeSideSerializer, p.getProbeSideBlockCount());
Expand Down Expand Up @@ -652,6 +655,7 @@ public void close() {
throw new RuntimeException("Hashtable closing was interrupted");
}
}
this.writeBehindBuffersAvailable = 0;
}

public void abort() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1400,6 +1400,82 @@ public void testInMemoryReOpen() throws IOException

this.memManager.release(join.getFreedMemory());
}

/*
* This test is same as `testInMemoryReOpen()` but only number of keys and pages are different. This test
* validates a bug fix MutableHashTable memory leakage with small memory segments.
*/
@Test
public void testInMemoryReOpenWithSmallMemory() throws Exception {
final int NUM_KEYS = 10000;
final int BUILD_VALS_PER_KEY = 3;
final int PROBE_VALS_PER_KEY = 10;

// create a build input that gives 30000 pairs with 3 values sharing the same key
MutableObjectIterator<IntPair> buildInput = new UniformIntPairGenerator(NUM_KEYS, BUILD_VALS_PER_KEY, false);

// create a probe input that gives 100000 pairs with 10 values sharing a key
MutableObjectIterator<IntPair> probeInput = new UniformIntPairGenerator(NUM_KEYS, PROBE_VALS_PER_KEY, true);

// allocate the memory for the HashTable
List<MemorySegment> memSegments;
try {
// 33 is minimum number of pages required to perform hash join this inputs
memSegments = this.memManager.allocatePages(MEM_OWNER, 33);
}
catch (MemoryAllocationException maex) {
fail("Memory for the Join could not be provided.");
return;
}

// ----------------------------------------------------------------------------------------

final MutableHashTable<IntPair, IntPair> join = new MutableHashTable<IntPair, IntPair>(
this.pairBuildSideAccesssor, this.pairProbeSideAccesssor,
this.pairBuildSideComparator, this.pairProbeSideComparator, this.pairComparator,
memSegments, ioManager);
join.open(buildInput, probeInput);

final IntPair recordReuse = new IntPair();
int numRecordsInJoinResult = 0;

while (join.nextRecord()) {
HashBucketIterator<IntPair, IntPair> buildSide = join.getBuildSideIterator();
while (buildSide.next(recordReuse) != null) {
numRecordsInJoinResult++;
}
}
Assert.assertEquals("Wrong number of records in join result.", NUM_KEYS * BUILD_VALS_PER_KEY * PROBE_VALS_PER_KEY, numRecordsInJoinResult);

join.close();

// ----------------------------------------------------------------------------------------
// recreate the inputs

// create a build input that gives 30000 pairs with 3 values sharing the same key
buildInput = new UniformIntPairGenerator(NUM_KEYS, BUILD_VALS_PER_KEY, false);

// create a probe input that gives 100000 pairs with 10 values sharing a key
probeInput = new UniformIntPairGenerator(NUM_KEYS, PROBE_VALS_PER_KEY, true);

join.open(buildInput, probeInput);

numRecordsInJoinResult = 0;

while (join.nextRecord()) {
HashBucketIterator<IntPair, IntPair> buildSide = join.getBuildSideIterator();
while (buildSide.next(recordReuse) != null) {
numRecordsInJoinResult++;
}
}
Assert.assertEquals("Wrong number of records in join result.", NUM_KEYS * BUILD_VALS_PER_KEY * PROBE_VALS_PER_KEY, numRecordsInJoinResult);

join.close();

// ----------------------------------------------------------------------------------------

this.memManager.release(join.getFreedMemory());
}

// ============================================================================================

Expand Down

0 comments on commit 1559701

Please sign in to comment.