Skip to content

Commit

Permalink
[netty#1259] Add optimized queue for SCMP pattern and use it in NIO a…
Browse files Browse the repository at this point in the history
…nd native transport

This queue also produces less GC then CLQ when make use of OneTimeTask
  • Loading branch information
Norman Maurer committed Feb 27, 2014
1 parent b6aa032 commit 6efac61
Show file tree
Hide file tree
Showing 12 changed files with 381 additions and 23 deletions.
9 changes: 9 additions & 0 deletions NOTICE.txt
Original file line number Diff line number Diff line change
Expand Up @@ -119,3 +119,12 @@ by Google Inc, which can be obtained at:
* license/LICENSE.snappy.txt (New BSD License)
* HOMEPAGE:
* http://code.google.com/p/snappy/

This product contains a modified version of Roland Kuhn's ASL2
AbstractNodeQueue, which is based on Dmitriy Vyukov's non-intrusive MPSC queue.
It can be obtained at:

* LICENSE:
* license/LICENSE.abstractnodequeue.txt (Public Domain)
* HOMEPAGE:
* https://github.com/akka/akka/blob/wip-2.2.3-for-scala-2.11/akka-actor/src/main/java/akka/dispatch/AbstractNodeQueue.java
245 changes: 245 additions & 0 deletions common/src/main/java/io/netty/util/internal/MpscLinkedQueue.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,245 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package io.netty.util.internal;


import java.util.Collection;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicReference;

/**
* A lock-free concurrent {@link java.util.Queue} implementations for single-consumer multiple-producer pattern.
* <strong>It's important is is only used for this as otherwise it is not thread-safe.</strong>
*
* This implementation is based on:
* <ul>
* <li><a href="https://github.com/akka/akka/blob/wip-2.2.3-for-scala-2.11/akka-actor/src/main/java/akka/dispatch/
* AbstractNodeQueue.java">AbstractNodeQueue</a></li>
* <li><a href="http://www.1024cores.net/home/lock-free-algorithms/
* queues/non-intrusive-mpsc-node-based-queue">Non intrusive MPSC node based queue</a></li>
* </ul>
*
*/
@SuppressWarnings("serial")
final class MpscLinkedQueue extends AtomicReference<OneTimeTask> implements Queue<Runnable> {
private static final long tailOffset;

static {
try {
tailOffset = PlatformDependent.objectFieldOffset(
MpscLinkedQueue.class.getDeclaredField("tail"));
} catch (Throwable t) {
throw new ExceptionInInitializerError(t);
}
}

// Extends AtomicReference for the "head" slot (which is the one that is appended to)
// since Unsafe does not expose XCHG operation intrinsically
@SuppressWarnings({ "unused", "FieldMayBeFinal" })
private volatile OneTimeTask tail;

MpscLinkedQueue() {
final OneTimeTask task = new OneTimeTaskAdapter(null);
tail = task;
set(task);
}

@Override
public boolean add(Runnable runnable) {
if (runnable instanceof OneTimeTask) {
OneTimeTask node = (OneTimeTask) runnable;
node.setNext(null);
getAndSet(node).setNext(node);
} else {
final OneTimeTask n = new OneTimeTaskAdapter(runnable);
getAndSet(n).setNext(n);
}
return true;
}

@Override
public boolean offer(Runnable runnable) {
return add(runnable);
}

@Override
public Runnable remove() {
Runnable task = poll();
if (task == null) {
throw new NoSuchElementException();
}
return task;
}

@Override
public Runnable poll() {
final OneTimeTask next = peekTask();
if (next == null) {
return null;
}
final OneTimeTask ret = next;
PlatformDependent.putOrderedObject(this, tailOffset, next);
return unwrapIfNeeded(ret);
}

@Override
public Runnable element() {
final OneTimeTask next = peekTask();
if (next == null) {
throw new NoSuchElementException();
}
return unwrapIfNeeded(next);
}

@Override
public Runnable peek() {
final OneTimeTask next = peekTask();
if (next == null) {
return null;
}
return unwrapIfNeeded(next);
}

@Override
public int size() {
int count = 0;
OneTimeTask n = peekTask();
for (;;) {
if (n == null) {
break;
}
count++;
n = n.next();
}
return count;
}

@SuppressWarnings("unchecked")
private OneTimeTask peekTask() {
for (;;) {
final OneTimeTask tail = (OneTimeTask) PlatformDependent.getObjectVolatile(this, tailOffset);
final OneTimeTask next = tail.next();
if (next != null || get() == tail) {
return next;
}
}
}

@Override
public boolean isEmpty() {
return peek() == null;
}

@Override
public boolean contains(Object o) {
OneTimeTask n = peekTask();
for (;;) {
if (n == null) {
break;
}
if (unwrapIfNeeded(n) == o) {
return true;
}
n = n.next();
}
return false;
}

@Override
public Iterator<Runnable> iterator() {
throw new UnsupportedOperationException();
}

@Override
public Object[] toArray() {
throw new UnsupportedOperationException();
}

@Override
public <T> T[] toArray(T[] a) {
throw new UnsupportedOperationException();
}

@Override
public boolean remove(Object o) {
return false;
}

@Override
public boolean containsAll(Collection<?> c) {
for (Object o: c) {
if (!contains(o)) {
return false;
}
}
return true;
}

@Override
public boolean addAll(Collection<? extends Runnable> c) {
for (Runnable r: c) {
add(r);
}
return false;
}

@Override
public boolean removeAll(Collection<?> c) {
return false;
}

@Override
public boolean retainAll(Collection<?> c) {
return false;
}

@Override
public void clear() {
for (;;) {
if (poll() == null) {
break;
}
}
}

/**
* Unwrap {@link OneTimeTask} if needed and so return the proper queued task.
*/
private static Runnable unwrapIfNeeded(OneTimeTask task) {
if (task instanceof OneTimeTaskAdapter) {
return ((OneTimeTaskAdapter) task).task;
}
return task;
}

private static final class OneTimeTaskAdapter extends OneTimeTask {
private final Runnable task;

OneTimeTaskAdapter(Runnable task) {
this.task = task;
}

@Override
public void run() {
task.run();
}
}
}
56 changes: 56 additions & 0 deletions common/src/main/java/io/netty/util/internal/OneTimeTask.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright 2014 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.util.internal;

import io.netty.util.concurrent.EventExecutor;

/**
* {@link Runnable} which represent a one time task which may allow the {@link EventExecutor} to reduce the amount of
* produced garbage when queue it for execution.
*
* <strong>It is important this will not be reused. After submitted it is not allowed to get submitted again!</strong>
*/
public abstract class OneTimeTask implements Runnable {

private static final long nextOffset;

static {
if (PlatformDependent0.hasUnsafe()) {
try {
nextOffset = PlatformDependent.objectFieldOffset(
OneTimeTask.class.getDeclaredField("tail"));
} catch (Throwable t) {
throw new ExceptionInInitializerError(t);
}
} else {
nextOffset = -1;
}
}

@SuppressWarnings("unused")
private volatile OneTimeTask tail;

// Only use from MpscLinkedQueue and so we are sure Unsafe is present
@SuppressWarnings("unchecked")
final OneTimeTask next() {
return (OneTimeTask) PlatformDependent.getObjectVolatile(this, nextOffset);
}

// Only use from MpscLinkedQueue and so we are sure Unsafe is present
final void setNext(final OneTimeTask newNext) {
PlatformDependent.putOrderedObject(this, nextOffset, newNext);
}
}
22 changes: 22 additions & 0 deletions common/src/main/java/io/netty/util/internal/PlatformDependent.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,10 @@
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
Expand Down Expand Up @@ -266,6 +268,10 @@ public static Object getObject(Object object, long fieldOffset) {
return PlatformDependent0.getObject(object, fieldOffset);
}

public static Object getObjectVolatile(Object object, long fieldOffset) {
return PlatformDependent0.getObjectVolatile(object, fieldOffset);
}

public static int getInt(Object object, long fieldOffset) {
return PlatformDependent0.getInt(object, fieldOffset);
}
Expand All @@ -290,6 +296,10 @@ public static long getLong(long address) {
return PlatformDependent0.getLong(address);
}

public static void putOrderedObject(Object object, long address, Object value) {
PlatformDependent0.putOrderedObject(object, address, value);
}

public static void putByte(long address, byte value) {
PlatformDependent0.putByte(address, value);
}
Expand Down Expand Up @@ -369,6 +379,18 @@ public static <T> AtomicLongFieldUpdater<T> newAtomicLongFieldUpdater(
return null;
}

/**
* Create a new {@link Queue} which is safe to use for multiple producers (different threads) and a single
* consumer (one thread!).
*/
public static Queue<Runnable> newMpscQueue() {
if (hasUnsafe()) {
return new MpscLinkedQueue();
} else {
return new ConcurrentLinkedQueue<Runnable>();
}
}

private static boolean isAndroid0() {
boolean android;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,10 @@ static Object getObject(Object object, long fieldOffset) {
return UNSAFE.getObject(object, fieldOffset);
}

static Object getObjectVolatile(Object object, long fieldOffset) {
return UNSAFE.getObjectVolatile(object, fieldOffset);
}

static int getInt(Object object, long fieldOffset) {
return UNSAFE.getInt(object, fieldOffset);
}
Expand Down Expand Up @@ -251,6 +255,10 @@ static long getLong(long address) {
}
}

static void putOrderedObject(Object object, long address, Object value) {
UNSAFE.putOrderedObject(object, address, value);
}

static void putByte(long address, byte value) {
UNSAFE.putByte(address, value);
}
Expand Down
Loading

0 comments on commit 6efac61

Please sign in to comment.