Skip to content

Commit

Permalink
Fix ram accounting leak, close all created contexts
Browse files Browse the repository at this point in the history
Since c8b99d3, the creator of a
`RamAccountingContext` is responsible for closing it. Some created
contexts were not yet tracked and closed.
  • Loading branch information
seut authored and mergify[bot] committed Dec 16, 2019
1 parent d73b0d2 commit 6938e63
Showing 1 changed file with 38 additions and 10 deletions.
48 changes: 38 additions & 10 deletions sql/src/main/java/io/crate/execution/jobs/JobSetup.java
Original file line number Diff line number Diff line change
Expand Up @@ -688,7 +688,7 @@ public Boolean visitMergePhase(final MergePhase phase, final Context context) {
phase.orderByPositions(),
() -> new RowAccountingWithEstimators(
phase.inputTypes(),
RamAccountingContext.forExecutionPhase(breaker(), phase))),
ramAccountingContext)),
phase.numUpstreams());
} else {
pageBucketReceiver = new IncrementalPageBucketReceiver<>(
Expand Down Expand Up @@ -797,13 +797,28 @@ public Boolean visitFetchPhase(final FetchPhase phase, final Context context) {

@Override
public Boolean visitNestedLoopPhase(NestedLoopPhase phase, Context context) {
MergePhase leftMerge = phase.leftMergePhase();
MergePhase rightMerge = phase.rightMergePhase();
RamAccountingContext ramAccountingLeft = leftMerge == null ? null :
RamAccountingContext.forExecutionPhase(breaker(), leftMerge);
RamAccountingContext ramAccountingRight = rightMerge == null ? null :
RamAccountingContext.forExecutionPhase(breaker(), rightMerge);

RamAccountingContext ramAccountingContext = RamAccountingContext.forExecutionPhase(breaker(), phase);
RowConsumer lastConsumer = context.getRowConsumer(phase, Paging.PAGE_SIZE, ramAccountingContext);
MemoryManager memoryManager = memoryManagerFactory.getMemoryManager(ramAccountingContext);

lastConsumer.completionFuture().whenComplete((result, error) -> {
memoryManager.close();
ramAccountingContext.close();
if (ramAccountingLeft != null) {
ramAccountingLeft.close();
}
if (ramAccountingRight != null) {
ramAccountingRight.close();
}
});

RowConsumer firstConsumer = ProjectingRowConsumer.create(
lastConsumer,
phase.projections(),
Expand Down Expand Up @@ -832,9 +847,9 @@ public Boolean visitNestedLoopPhase(NestedLoopPhase phase, Context context) {
phase.phaseId(),
context,
(byte) 0,
phase.leftMergePhase(),
leftMerge,
joinOperation.leftConsumer(),
ramAccountingContext,
ramAccountingLeft,
memoryManager
);

Expand All @@ -846,9 +861,9 @@ public Boolean visitNestedLoopPhase(NestedLoopPhase phase, Context context) {
phase.phaseId(),
context,
(byte) 1,
phase.rightMergePhase(),
rightMerge,
joinOperation.rightConsumer(),
ramAccountingContext,
ramAccountingRight,
memoryManager
);
if (right != null) {
Expand All @@ -865,12 +880,25 @@ public Boolean visitNestedLoopPhase(NestedLoopPhase phase, Context context) {

@Override
public Boolean visitHashJoinPhase(HashJoinPhase phase, Context context) {
MergePhase leftMerge = phase.leftMergePhase();
MergePhase rightMerge = phase.rightMergePhase();
RamAccountingContext ramAccountingLeft = leftMerge == null ? null :
RamAccountingContext.forExecutionPhase(breaker(), leftMerge);
RamAccountingContext ramAccountingRight = rightMerge == null ? null :
RamAccountingContext.forExecutionPhase(breaker(), rightMerge);

RamAccountingContext ramAccountingContext = RamAccountingContext.forExecutionPhase(breaker(), phase);
RowConsumer lastConsumer = context.getRowConsumer(phase, Paging.PAGE_SIZE, ramAccountingContext);
MemoryManager memoryManager = memoryManagerFactory.getMemoryManager(ramAccountingContext);
lastConsumer.completionFuture().whenComplete((result, error) -> {
memoryManager.close();
ramAccountingContext.close();
if (ramAccountingLeft != null) {
ramAccountingLeft.close();
}
if (ramAccountingRight != null) {
ramAccountingRight.close();
}
});

RowConsumer firstConsumer = ProjectingRowConsumer.create(
Expand Down Expand Up @@ -905,9 +933,9 @@ public Boolean visitHashJoinPhase(HashJoinPhase phase, Context context) {
phase.phaseId(),
context,
(byte) 0,
phase.leftMergePhase(),
leftMerge,
joinOperation.leftConsumer(),
ramAccountingContext,
ramAccountingLeft,
memoryManager
);
if (left != null) {
Expand All @@ -917,9 +945,9 @@ public Boolean visitHashJoinPhase(HashJoinPhase phase, Context context) {
phase.phaseId(),
context,
(byte) 1,
phase.rightMergePhase(),
rightMerge,
joinOperation.rightConsumer(),
ramAccountingContext,
ramAccountingRight,
memoryManager
);
if (right != null) {
Expand Down Expand Up @@ -972,7 +1000,7 @@ private DistResultRXTask pageDownstreamContextForNestedLoop(int nlPhaseId,
mergePhase.orderByPositions(),
() -> new RowAccountingWithEstimators(
mergePhase.inputTypes(),
RamAccountingContext.forExecutionPhase(breaker(), mergePhase))),
ramAccountingContext)),
mergePhase.numUpstreams());

return new DistResultRXTask(
Expand Down

0 comments on commit 6938e63

Please sign in to comment.