Skip to content

Commit

Permalink
Allow to obtain informations of used direct and heap memory for ByteB…
Browse files Browse the repository at this point in the history
…ufAllocator implementations

Motivation:

Often its useful for the user to be able to get some stats about the memory allocated via an allocator.

Modifications:

- Allow to obtain the used heap and direct memory for an allocator
- Add test case

Result:

Fixes [netty#6341]
  • Loading branch information
normanmaurer committed Mar 1, 2017
1 parent 93f5f62 commit 461f9a1
Show file tree
Hide file tree
Showing 10 changed files with 355 additions and 44 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright 2017 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.buffer;

/**
* {@link ByteBufAllocator} which exposes metrics.
*/
public interface InstrumentedByteBufAllocator extends ByteBufAllocator {
/**
* Returns the number of bytes of heap memory used by a {@link ByteBufAllocator} or {@code -1} if unknown.
*/
long usedHeapMemory();

/**
* Returns the number of bytes of direct memory used by a {@link ByteBufAllocator} or {@code -1} if unknown.
*/
long usedDirectMemory();
}
32 changes: 28 additions & 4 deletions buffer/src/main/java/io/netty/buffer/PooledByteBufAllocator.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import java.util.Collections;
import java.util.List;

public class PooledByteBufAllocator extends AbstractByteBufAllocator {
public class PooledByteBufAllocator extends AbstractByteBufAllocator implements InstrumentedByteBufAllocator {

private static final InternalLogger logger = InternalLoggerFactory.getInstance(PooledByteBufAllocator.class);
private static final int DEFAULT_NUM_HEAP_ARENA;
Expand Down Expand Up @@ -184,9 +184,9 @@ public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectA
}

public PooledByteBufAllocator(boolean preferDirect, int nHeapArena,
int nDirectArena, int pageSize, int maxOrder, int tinyCacheSize,
int smallCacheSize, int normalCacheSize,
boolean useCacheForAllThreads) {
int nDirectArena, int pageSize, int maxOrder, int tinyCacheSize,
int smallCacheSize, int normalCacheSize,
boolean useCacheForAllThreads) {
this(preferDirect, nHeapArena, nDirectArena, pageSize, maxOrder,
tinyCacheSize, smallCacheSize, normalCacheSize,
useCacheForAllThreads, DEFAULT_DIRECT_MEMORY_CACHE_ALIGNMENT);
Expand Down Expand Up @@ -519,6 +519,30 @@ public final int chunkSize() {
return chunkSize;
}

@Override
public final long usedHeapMemory() {
return usedMemory(heapArenas);
}

@Override
public final long usedDirectMemory() {
return usedMemory(directArenas);
}

private static long usedMemory(PoolArena<?>... arenas) {
if (arenas == null) {
return -1;
}
long used = 0;
for (PoolArena<?> arena : arenas) {
used += arena.numActiveBytes();
if (used < 0) {
return Long.MAX_VALUE;
}
}
return used;
}

final PoolThreadCache threadCache() {
return threadCache.get();
}
Expand Down
146 changes: 139 additions & 7 deletions buffer/src/main/java/io/netty/buffer/UnpooledByteBufAllocator.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,18 @@
*/
package io.netty.buffer;

import io.netty.util.internal.LongCounter;
import io.netty.util.internal.PlatformDependent;

import java.nio.ByteBuffer;

/**
* Simplistic {@link ByteBufAllocator} implementation that does not pool anything.
*/
public final class UnpooledByteBufAllocator extends AbstractByteBufAllocator {
public final class UnpooledByteBufAllocator extends AbstractByteBufAllocator implements InstrumentedByteBufAllocator {

private final LongCounter directCounter = PlatformDependent.newLongCounter();
private final LongCounter heapCounter = PlatformDependent.newLongCounter();
private final boolean disableLeakDetector;

/**
Expand Down Expand Up @@ -56,16 +61,21 @@ public UnpooledByteBufAllocator(boolean preferDirect, boolean disableLeakDetecto

@Override
protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {
return PlatformDependent.hasUnsafe() ? new UnpooledUnsafeHeapByteBuf(this, initialCapacity, maxCapacity)
: new UnpooledHeapByteBuf(this, initialCapacity, maxCapacity);
return PlatformDependent.hasUnsafe() ?
new InstrumentedUnpooledUnsafeHeapByteBuf(this, initialCapacity, maxCapacity) :
new InstrumentedUnpooledHeapByteBuf(this, initialCapacity, maxCapacity);
}

@Override
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
ByteBuf buf = PlatformDependent.hasUnsafe() ?
UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) :
new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);

final ByteBuf buf;
if (PlatformDependent.hasUnsafe()) {
buf = PlatformDependent.useDirectBufferNoCleaner() ?
new InstrumentedUnpooledUnsafeNoCleanerDirectByteBuf(this, initialCapacity, maxCapacity) :
new InstrumentedUnpooledUnsafeDirectByteBuf(this, initialCapacity, maxCapacity);
} else {
buf = new InstrumentedUnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
}
return disableLeakDetector ? buf : toLeakAwareBuffer(buf);
}

Expand All @@ -85,4 +95,126 @@ public CompositeByteBuf compositeDirectBuffer(int maxNumComponents) {
public boolean isDirectBufferPooled() {
return false;
}

@Override
public long usedHeapMemory() {
return heapCounter.value();
}

@Override
public long usedDirectMemory() {
return directCounter.value();
}

private static final class InstrumentedUnpooledUnsafeHeapByteBuf extends UnpooledUnsafeHeapByteBuf {
InstrumentedUnpooledUnsafeHeapByteBuf(UnpooledByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
super(alloc, initialCapacity, maxCapacity);
}

@Override
byte[] allocateArray(int initialCapacity) {
byte[] bytes = super.allocateArray(initialCapacity);
((UnpooledByteBufAllocator) alloc()).heapCounter.add(bytes.length);
return bytes;
}

@Override
void freeArray(byte[] array) {
int length = array.length;
super.freeArray(array);
((UnpooledByteBufAllocator) alloc()).heapCounter.add(-length);
}
}

private static final class InstrumentedUnpooledHeapByteBuf extends UnpooledHeapByteBuf {
InstrumentedUnpooledHeapByteBuf(UnpooledByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
super(alloc, initialCapacity, maxCapacity);
}

@Override
byte[] allocateArray(int initialCapacity) {
byte[] bytes = super.allocateArray(initialCapacity);
((UnpooledByteBufAllocator) alloc()).heapCounter.add(bytes.length);
return bytes;
}

@Override
void freeArray(byte[] array) {
int length = array.length;
super.freeArray(array);
((UnpooledByteBufAllocator) alloc()).heapCounter.add(-length);
}
}

private static final class InstrumentedUnpooledUnsafeNoCleanerDirectByteBuf
extends UnpooledUnsafeNoCleanerDirectByteBuf {
InstrumentedUnpooledUnsafeNoCleanerDirectByteBuf(
UnpooledByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
super(alloc, initialCapacity, maxCapacity);
}

@Override
protected ByteBuffer allocateDirect(int initialCapacity) {
ByteBuffer buffer = super.allocateDirect(initialCapacity);
((UnpooledByteBufAllocator) alloc()).directCounter.add(buffer.capacity());
return buffer;
}

@Override
ByteBuffer reallocateDirect(ByteBuffer oldBuffer, int initialCapacity) {
int capacity = oldBuffer.capacity();
ByteBuffer buffer = super.reallocateDirect(oldBuffer, initialCapacity);
((UnpooledByteBufAllocator) alloc()).directCounter.add(buffer.capacity() - capacity);
return buffer;
}

@Override
protected void freeDirect(ByteBuffer buffer) {
int capacity = buffer.capacity();
super.freeDirect(buffer);
((UnpooledByteBufAllocator) alloc()).directCounter.add(-capacity);
}
}

private static final class InstrumentedUnpooledUnsafeDirectByteBuf extends UnpooledUnsafeDirectByteBuf {
InstrumentedUnpooledUnsafeDirectByteBuf(
UnpooledByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
super(alloc, initialCapacity, maxCapacity);
}

@Override
protected ByteBuffer allocateDirect(int initialCapacity) {
ByteBuffer buffer = super.allocateDirect(initialCapacity);
((UnpooledByteBufAllocator) alloc()).directCounter.add(buffer.capacity());
return buffer;
}

@Override
protected void freeDirect(ByteBuffer buffer) {
int capacity = buffer.capacity();
super.freeDirect(buffer);
((UnpooledByteBufAllocator) alloc()).directCounter.add(-capacity);
}
}

private static final class InstrumentedUnpooledDirectByteBuf extends UnpooledDirectByteBuf {
InstrumentedUnpooledDirectByteBuf(
UnpooledByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
super(alloc, initialCapacity, maxCapacity);
}

@Override
protected ByteBuffer allocateDirect(int initialCapacity) {
ByteBuffer buffer = super.allocateDirect(initialCapacity);
((UnpooledByteBufAllocator) alloc()).directCounter.add(buffer.capacity());
return buffer;
}

@Override
protected void freeDirect(ByteBuffer buffer) {
int capacity = buffer.capacity();
super.freeDirect(buffer);
((UnpooledByteBufAllocator) alloc()).directCounter.add(-capacity);
}
}
}
52 changes: 34 additions & 18 deletions buffer/src/main/java/io/netty/buffer/UnpooledHeapByteBuf.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import java.nio.channels.GatheringByteChannel;
import java.nio.channels.ScatteringByteChannel;

import static io.netty.util.internal.ObjectUtil.checkNotNull;

/**
* Big endian Java heap buffer implementation.
*/
Expand All @@ -43,7 +45,18 @@ public class UnpooledHeapByteBuf extends AbstractReferenceCountedByteBuf {
* @param maxCapacity the max capacity of the underlying byte array
*/
protected UnpooledHeapByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
this(alloc, new byte[initialCapacity], 0, 0, maxCapacity);
super(maxCapacity);

checkNotNull(alloc, "alloc");

if (initialCapacity > maxCapacity) {
throw new IllegalArgumentException(String.format(
"initialCapacity(%d) > maxCapacity(%d)", initialCapacity, maxCapacity));
}

this.alloc = alloc;
setArray(allocateArray(initialCapacity));
setIndex(0, 0);
}

/**
Expand All @@ -53,28 +66,27 @@ protected UnpooledHeapByteBuf(ByteBufAllocator alloc, int initialCapacity, int m
* @param maxCapacity the max capacity of the underlying byte array
*/
protected UnpooledHeapByteBuf(ByteBufAllocator alloc, byte[] initialArray, int maxCapacity) {
this(alloc, initialArray, 0, initialArray.length, maxCapacity);
}

private UnpooledHeapByteBuf(
ByteBufAllocator alloc, byte[] initialArray, int readerIndex, int writerIndex, int maxCapacity) {

super(maxCapacity);

if (alloc == null) {
throw new NullPointerException("alloc");
}
if (initialArray == null) {
throw new NullPointerException("initialArray");
}
checkNotNull(alloc, "alloc");
checkNotNull(initialArray, "initialArray");

if (initialArray.length > maxCapacity) {
throw new IllegalArgumentException(String.format(
"initialCapacity(%d) > maxCapacity(%d)", initialArray.length, maxCapacity));
}

this.alloc = alloc;
setArray(initialArray);
setIndex(readerIndex, writerIndex);
setIndex(0, initialArray.length);
}

byte[] allocateArray(int initialCapacity) {
return new byte[initialCapacity];
}

void freeArray(byte[] array) {
// NOOP
}

private void setArray(byte[] initialArray) {
Expand Down Expand Up @@ -108,23 +120,26 @@ public ByteBuf capacity(int newCapacity) {
checkNewCapacity(newCapacity);

int oldCapacity = array.length;
byte[] oldArray = array;
if (newCapacity > oldCapacity) {
byte[] newArray = new byte[newCapacity];
System.arraycopy(array, 0, newArray, 0, array.length);
byte[] newArray = allocateArray(newCapacity);
System.arraycopy(oldArray, 0, newArray, 0, oldArray.length);
setArray(newArray);
freeArray(oldArray);
} else if (newCapacity < oldCapacity) {
byte[] newArray = new byte[newCapacity];
byte[] newArray = allocateArray(newCapacity);
int readerIndex = readerIndex();
if (readerIndex < newCapacity) {
int writerIndex = writerIndex();
if (writerIndex > newCapacity) {
writerIndex(writerIndex = newCapacity);
}
System.arraycopy(array, readerIndex, newArray, readerIndex, writerIndex - readerIndex);
System.arraycopy(oldArray, readerIndex, newArray, readerIndex, writerIndex - readerIndex);
} else {
setIndex(newCapacity, newCapacity);
}
setArray(newArray);
freeArray(oldArray);
}
return this;
}
Expand Down Expand Up @@ -534,6 +549,7 @@ private ByteBuffer internalNioBuffer() {

@Override
protected void deallocate() {
freeArray(array);
array = null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

import io.netty.util.internal.PlatformDependent;

final class UnpooledUnsafeHeapByteBuf extends UnpooledHeapByteBuf {
class UnpooledUnsafeHeapByteBuf extends UnpooledHeapByteBuf {

/**
* Creates a new heap buffer with a newly allocated byte array.
Expand Down
Loading

0 comments on commit 461f9a1

Please sign in to comment.