Skip to content

Commit

Permalink
Support shrink for TripleLongPriorityQueue (apache#15936)
Browse files Browse the repository at this point in the history
* Support shrinkage in TripleLongPriorityQueue

* Add unit test

* Remove unused code

* style

* Address comments
  • Loading branch information
315157973 authored Jun 7, 2022
1 parent 64bb9ef commit 522afcf
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pulsar.common.util.collections;

import static com.google.common.base.Preconditions.checkArgument;
import com.google.common.annotations.VisibleForTesting;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;

Expand All @@ -31,16 +32,30 @@ public class TripleLongPriorityQueue implements AutoCloseable {

private static final int SIZE_OF_LONG = 8;
private static final int DEFAULT_INITIAL_CAPACITY = 16;
private static final float DEFAULT_SHRINK_FACTOR = 0.5f;

// Each item is composed of 3 longs
private static final int ITEMS_COUNT = 3;

private static final int TUPLE_SIZE = ITEMS_COUNT * SIZE_OF_LONG;

private final ByteBuf buffer;
/**
* Reserve 10% of the capacity when shrinking to avoid frequent expansion and shrinkage.
*/
private static final float RESERVATION_FACTOR = 0.9f;

private ByteBuf buffer;

private final int initialCapacity;

private int capacity;
private int size;
/**
* When size < capacity * shrinkFactor, may trigger shrinking.
*/
private final float shrinkFactor;

private float shrinkThreshold;

/**
* Create a new priority queue with default initial capacity.
Expand All @@ -49,14 +64,22 @@ public TripleLongPriorityQueue() {
this(DEFAULT_INITIAL_CAPACITY);
}

public TripleLongPriorityQueue(int initialCapacity, float shrinkFactor) {
checkArgument(shrinkFactor > 0);
this.initialCapacity = initialCapacity;
this.capacity = initialCapacity;
this.shrinkThreshold = this.capacity * shrinkFactor;
this.buffer = PooledByteBufAllocator.DEFAULT.directBuffer(initialCapacity * TUPLE_SIZE);
this.size = 0;
this.shrinkFactor = shrinkFactor;
}

/**
* Create a new priority queue with a given initial capacity.
* @param initialCapacity
*/
public TripleLongPriorityQueue(int initialCapacity) {
capacity = initialCapacity;
buffer = PooledByteBufAllocator.DEFAULT.directBuffer(initialCapacity * ITEMS_COUNT * SIZE_OF_LONG);
size = 0;
this(initialCapacity, DEFAULT_SHRINK_FACTOR);
}

/**
Expand Down Expand Up @@ -122,6 +145,7 @@ public void pop() {
swap(0, size - 1);
size--;
siftDown(0);
shrinkCapacity();
}

/**
Expand All @@ -144,14 +168,36 @@ public int size() {
public void clear() {
this.buffer.clear();
this.size = 0;
shrinkCapacity();
}

private void increaseCapacity() {
// For bigger sizes, increase by 50%
this.capacity += (capacity <= 256 ? capacity : capacity / 2);
this.shrinkThreshold = this.capacity * shrinkFactor;
buffer.capacity(this.capacity * TUPLE_SIZE);
}

private void shrinkCapacity() {
if (capacity > initialCapacity && size < shrinkThreshold) {
int decreasingSize = (int) (capacity * shrinkFactor * RESERVATION_FACTOR);
if (decreasingSize <= 0) {
return;
}
if (capacity - decreasingSize <= initialCapacity) {
this.capacity = initialCapacity;
} else {
this.capacity = capacity - decreasingSize;
}
this.shrinkThreshold = this.capacity * shrinkFactor;

ByteBuf newBuffer = PooledByteBufAllocator.DEFAULT.directBuffer(this.capacity * TUPLE_SIZE);
buffer.getBytes(0, newBuffer, size * TUPLE_SIZE);
buffer.release();
this.buffer = newBuffer;
}
}

private void siftUp(int idx) {
while (idx > 0) {
int parentIdx = (idx - 1) / 2;
Expand Down Expand Up @@ -229,4 +275,9 @@ private void swap(int idx1, int idx2) {
buffer.setLong(i2 + 1 * SIZE_OF_LONG, tmp2);
buffer.setLong(i2 + 2 * SIZE_OF_LONG, tmp3);
}

@VisibleForTesting
ByteBuf getBuffer() {
return buffer;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -135,4 +135,39 @@ public void testCompareWithSamePrefix() {

pq.close();
}

@Test
public void testShrink() throws Exception {
int initialCapacity = 20;
int tupleSize = 3 * 8;
TripleLongPriorityQueue pq = new TripleLongPriorityQueue(initialCapacity, 0.5f);
pq.add(0, 0, 0);
assertEquals(pq.size(), 1);
assertEquals(pq.getBuffer().capacity(), initialCapacity * tupleSize);

// Scale out to capacity * 2
triggerScaleOut(initialCapacity, pq);
int scaleCapacity = initialCapacity * 2;
assertEquals(pq.getBuffer().capacity(), scaleCapacity * tupleSize);
// Trigger shrinking
for (int i = 0; i < initialCapacity / 2 + 1; i++) {
pq.pop();
}
int capacity = scaleCapacity - (int)(scaleCapacity * 0.5f * 0.9f);
assertEquals(pq.getBuffer().capacity(), capacity * tupleSize);
// Scale out to capacity * 2
triggerScaleOut(initialCapacity, pq);
scaleCapacity = capacity * 2;
assertEquals(pq.getBuffer().capacity(), scaleCapacity * tupleSize);
// Trigger shrinking
pq.clear();
capacity = scaleCapacity - (int)(scaleCapacity * 0.5f * 0.9f);
assertEquals(pq.getBuffer().capacity(), capacity * tupleSize);
}

private void triggerScaleOut(int initialCapacity, TripleLongPriorityQueue pq) {
for (long i = 0; i < initialCapacity + 1; i++) {
pq.add(i, i, i);
}
}
}

0 comments on commit 522afcf

Please sign in to comment.