Skip to content

Commit

Permalink
[FLINK-12843][network] Refactor the pin logic in ReleaseOnConsumption…
Browse files Browse the repository at this point in the history
…ResultPartition

The pin logic is for adding the reference counter based on number of subpartitions for ReleaseOnConsumptionResultPartition. It seems
not necessary to do it in while loop as now, because the atomic counter would not be accessed by other threads during pin. If the
ReleaseOnConsumptionResultPartition was not created yet, the createSubpartitionView would not be called actually resulting in
PartitionNotFoundException. So we could simple increase the reference counter in ReleaseOnConsumptionResultPartition constructor directly.
  • Loading branch information
zhijiangW authored and zentol committed Jun 19, 2019
1 parent 5a532f3 commit 0afd7bc
Show file tree
Hide file tree
Showing 3 changed files with 1 addition and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,25 +47,8 @@ public class ReleaseOnConsumptionResultPartition extends ResultPartition {
ResultPartitionManager partitionManager,
FunctionWithException<BufferPoolOwner, BufferPool, IOException> bufferPoolFactory) {
super(owningTaskName, partitionId, partitionType, subpartitions, numTargetKeyGroups, partitionManager, bufferPoolFactory);
}

/**
* The partition can only be released after each subpartition has been consumed once per pin operation.
*/
@Override
void pin() {
while (true) {
int refCnt = pendingReferences.get();

if (refCnt >= 0) {
if (pendingReferences.compareAndSet(refCnt, refCnt + subpartitions.length)) {
break;
}
}
else {
throw new IllegalStateException("Released.");
}
}
pendingReferences.set(subpartitions.length);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -329,12 +329,6 @@ public String toString() {

// ------------------------------------------------------------------------

/**
* Pins the result partition.
*/
void pin() {
}

/**
* Notification when a subpartition is released.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,6 @@ public ResultPartition create(

createSubpartitions(partition, type, subpartitions);

// Initially, partitions should be consumed once before release.
partition.pin();

LOG.debug("{}: Initialized {}", taskNameWithSubtaskAndId, this);

return partition;
Expand Down

0 comments on commit 0afd7bc

Please sign in to comment.