Skip to content

Commit

Permalink
reduce lock contention in resource leak
Browse files Browse the repository at this point in the history
Motivation:
ResourceLeakDetector shows two main problems, racy access and heavy lock contention.

Modifications:
This PR fixes this by doing two things:
1.  Replace the sampling counter with a ThreadLocalRandom.  This has two benefits.
    First, it makes the sampling ration no longer have to be a power of two.  Second,
    it de-noises the continuous races that fight over this single value.  Instead,
    this change uses slightly more CPU to decide if it should sample by using TLR.
2.  DefaultResourceLeaks need to be kept alive in order to catch leaks.  The means
    by which this happens is by a singular, doubly-linked list.  This creates a
    large amount of contention when allocating quickly.  This is noticeable when
    running on a multi core machine.

    Instead, this uses a concurrent hash map to keep track of active resources
    which has much better contention characteristics.

Results:
Better concurrent hygiene.  Running the gRPC QPS benchmark showed RLD taking about
3 CPU seconds for every 1 wall second when runnign with 12 threads.

There are some minor perks to this as well.  DefaultResourceLeak accounting is
moved to a central place which probably has better caching behavior.
  • Loading branch information
carl-mastrangelo authored and normanmaurer committed Nov 2, 2016
1 parent eb7f8e4 commit 42fba01
Showing 1 changed file with 9 additions and 41 deletions.
50 changes: 9 additions & 41 deletions common/src/main/java/io/netty/util/ResourceLeakDetector.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@

package io.netty.util;

import io.netty.util.internal.MathUtil;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.SystemPropertyUtil;
import io.netty.util.internal.ThreadLocalRandom;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;

Expand Down Expand Up @@ -143,22 +143,17 @@ public static Level getLevel() {
return level;
}

/** the linked list of active resources */
private final DefaultResourceLeak head = new DefaultResourceLeak(null);
private final DefaultResourceLeak tail = new DefaultResourceLeak(null);
/** the collection of active resources */
private final ConcurrentMap<DefaultResourceLeak, Boolean> allLeaks = PlatformDependent.newConcurrentHashMap();

private final ReferenceQueue<Object> refQueue = new ReferenceQueue<Object>();
private final ConcurrentMap<String, Boolean> reportedLeaks = PlatformDependent.newConcurrentHashMap();

private final String resourceType;
private final int samplingInterval;
private final int mask;
private final long maxActive;
private long active;
private final AtomicBoolean loggedTooManyActive = new AtomicBoolean();

private long leakCheckCnt;

/**
* @deprecated use {@link ResourceLeakDetectorFactory#newResourceLeakDetector(Class, int, long)}.
*/
Expand Down Expand Up @@ -198,14 +193,8 @@ public ResourceLeakDetector(String resourceType, int samplingInterval, long maxA
}

this.resourceType = resourceType;
this.samplingInterval = MathUtil.safeFindNextPositivePowerOfTwo(samplingInterval);
// samplingInterval is a power of two so we calculate a mask that we can use to
// check if we need to do any leak detection or not.
mask = this.samplingInterval - 1;
this.samplingInterval = samplingInterval;
this.maxActive = maxActive;

head.next = tail;
tail.prev = head;
}

/**
Expand All @@ -221,7 +210,7 @@ public final ResourceLeak open(T obj) {
}

if (level.ordinal() < Level.PARANOID.ordinal()) {
if ((++ leakCheckCnt & mask) == 0) {
if ((ThreadLocalRandom.current().nextInt(0, samplingInterval)) == 0) {
reportLeak(level);
return new DefaultResourceLeak(obj);
} else {
Expand All @@ -248,7 +237,7 @@ private void reportLeak(Level level) {

// Report too many instances.
int samplingInterval = level == Level.PARANOID? 1 : this.samplingInterval;
if (active * samplingInterval > maxActive && loggedTooManyActive.compareAndSet(false, true)) {
if (allLeaks.size() * samplingInterval > maxActive && loggedTooManyActive.compareAndSet(false, true)) {
reportInstancesLeak(resourceType);
}

Expand Down Expand Up @@ -314,9 +303,6 @@ protected void reportInstancesLeak(String resourceType) {
private final class DefaultResourceLeak extends PhantomReference<Object> implements ResourceLeak {
private final String creationRecord;
private final Deque<String> lastRecords = new ArrayDeque<String>();
private final AtomicBoolean freed;
private DefaultResourceLeak prev;
private DefaultResourceLeak next;
private int removedRecords;

DefaultResourceLeak(Object referent) {
Expand All @@ -330,18 +316,9 @@ private final class DefaultResourceLeak extends PhantomReference<Object> impleme
creationRecord = null;
}

// TODO: Use CAS to update the list.
synchronized (head) {
prev = head;
next = head.next;
head.next.prev = this;
head.next = this;
active ++;
}
freed = new AtomicBoolean();
allLeaks.put(this, Boolean.TRUE);
} else {
creationRecord = null;
freed = new AtomicBoolean(true);
}
}

Expand Down Expand Up @@ -374,17 +351,8 @@ private void record0(Object hint, int recordsToSkip) {

@Override
public boolean close() {
if (freed.compareAndSet(false, true)) {
synchronized (head) {
active --;
prev.next = next;
next.prev = prev;
prev = null;
next = null;
}
return true;
}
return false;
// Use the ConcurrentMap remove method, which avoids allocating an iterator.
return allLeaks.remove(this, Boolean.TRUE);
}

@Override
Expand Down

0 comments on commit 42fba01

Please sign in to comment.