From 6938e630d0cc1757281360ed34e9e3e71e90dff8 Mon Sep 17 00:00:00 2001 From: Sebastian Utz Date: Mon, 16 Dec 2019 15:01:14 +0100 Subject: [PATCH] Fix ram accounting leak, close all created contexts Since c8b99d3a918d8e387cab1b1f53ecda94655a8133, the creator of a `RamAccountingContext` is responsible for closing it. Some created contexts were not yet tracked and closed. --- .../io/crate/execution/jobs/JobSetup.java | 48 +++++++++++++++---- 1 file changed, 38 insertions(+), 10 deletions(-) diff --git a/sql/src/main/java/io/crate/execution/jobs/JobSetup.java b/sql/src/main/java/io/crate/execution/jobs/JobSetup.java index 3791892d5220..550845de8009 100644 --- a/sql/src/main/java/io/crate/execution/jobs/JobSetup.java +++ b/sql/src/main/java/io/crate/execution/jobs/JobSetup.java @@ -688,7 +688,7 @@ public Boolean visitMergePhase(final MergePhase phase, final Context context) { phase.orderByPositions(), () -> new RowAccountingWithEstimators( phase.inputTypes(), - RamAccountingContext.forExecutionPhase(breaker(), phase))), + ramAccountingContext)), phase.numUpstreams()); } else { pageBucketReceiver = new IncrementalPageBucketReceiver<>( @@ -797,13 +797,28 @@ public Boolean visitFetchPhase(final FetchPhase phase, final Context context) { @Override public Boolean visitNestedLoopPhase(NestedLoopPhase phase, Context context) { + MergePhase leftMerge = phase.leftMergePhase(); + MergePhase rightMerge = phase.rightMergePhase(); + RamAccountingContext ramAccountingLeft = leftMerge == null ? null : + RamAccountingContext.forExecutionPhase(breaker(), leftMerge); + RamAccountingContext ramAccountingRight = rightMerge == null ? null : + RamAccountingContext.forExecutionPhase(breaker(), rightMerge); + RamAccountingContext ramAccountingContext = RamAccountingContext.forExecutionPhase(breaker(), phase); RowConsumer lastConsumer = context.getRowConsumer(phase, Paging.PAGE_SIZE, ramAccountingContext); MemoryManager memoryManager = memoryManagerFactory.getMemoryManager(ramAccountingContext); + lastConsumer.completionFuture().whenComplete((result, error) -> { memoryManager.close(); ramAccountingContext.close(); + if (ramAccountingLeft != null) { + ramAccountingLeft.close(); + } + if (ramAccountingRight != null) { + ramAccountingRight.close(); + } }); + RowConsumer firstConsumer = ProjectingRowConsumer.create( lastConsumer, phase.projections(), @@ -832,9 +847,9 @@ public Boolean visitNestedLoopPhase(NestedLoopPhase phase, Context context) { phase.phaseId(), context, (byte) 0, - phase.leftMergePhase(), + leftMerge, joinOperation.leftConsumer(), - ramAccountingContext, + ramAccountingLeft, memoryManager ); @@ -846,9 +861,9 @@ public Boolean visitNestedLoopPhase(NestedLoopPhase phase, Context context) { phase.phaseId(), context, (byte) 1, - phase.rightMergePhase(), + rightMerge, joinOperation.rightConsumer(), - ramAccountingContext, + ramAccountingRight, memoryManager ); if (right != null) { @@ -865,12 +880,25 @@ public Boolean visitNestedLoopPhase(NestedLoopPhase phase, Context context) { @Override public Boolean visitHashJoinPhase(HashJoinPhase phase, Context context) { + MergePhase leftMerge = phase.leftMergePhase(); + MergePhase rightMerge = phase.rightMergePhase(); + RamAccountingContext ramAccountingLeft = leftMerge == null ? null : + RamAccountingContext.forExecutionPhase(breaker(), leftMerge); + RamAccountingContext ramAccountingRight = rightMerge == null ? null : + RamAccountingContext.forExecutionPhase(breaker(), rightMerge); + RamAccountingContext ramAccountingContext = RamAccountingContext.forExecutionPhase(breaker(), phase); RowConsumer lastConsumer = context.getRowConsumer(phase, Paging.PAGE_SIZE, ramAccountingContext); MemoryManager memoryManager = memoryManagerFactory.getMemoryManager(ramAccountingContext); lastConsumer.completionFuture().whenComplete((result, error) -> { memoryManager.close(); ramAccountingContext.close(); + if (ramAccountingLeft != null) { + ramAccountingLeft.close(); + } + if (ramAccountingRight != null) { + ramAccountingRight.close(); + } }); RowConsumer firstConsumer = ProjectingRowConsumer.create( @@ -905,9 +933,9 @@ public Boolean visitHashJoinPhase(HashJoinPhase phase, Context context) { phase.phaseId(), context, (byte) 0, - phase.leftMergePhase(), + leftMerge, joinOperation.leftConsumer(), - ramAccountingContext, + ramAccountingLeft, memoryManager ); if (left != null) { @@ -917,9 +945,9 @@ public Boolean visitHashJoinPhase(HashJoinPhase phase, Context context) { phase.phaseId(), context, (byte) 1, - phase.rightMergePhase(), + rightMerge, joinOperation.rightConsumer(), - ramAccountingContext, + ramAccountingRight, memoryManager ); if (right != null) { @@ -972,7 +1000,7 @@ private DistResultRXTask pageDownstreamContextForNestedLoop(int nlPhaseId, mergePhase.orderByPositions(), () -> new RowAccountingWithEstimators( mergePhase.inputTypes(), - RamAccountingContext.forExecutionPhase(breaker(), mergePhase))), + ramAccountingContext)), mergePhase.numUpstreams()); return new DistResultRXTask(