Skip to content

Commit

Permalink
[Java] Javadoc.
Browse files Browse the repository at this point in the history
  • Loading branch information
mjpt777 committed Sep 22, 2018
1 parent 22236b3 commit bc10fe1
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -143,19 +143,19 @@ public E poll()
}
}

public int drain(final Consumer<E> elementHandler)
public int drain(final Consumer<E> elementConsumer)
{
return drain(elementHandler, size());
return drain(elementConsumer, size());
}

public int drain(final Consumer<E> elementHandler, final int limit)
public int drain(final Consumer<E> elementConsumer, final int limit)
{
int count = 0;

E e;
while (count < limit && null != (e = poll()))
{
elementHandler.accept(e);
elementConsumer.accept(e);
++count;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,13 +83,13 @@ public E poll()
return (E)e;
}

public int drain(final Consumer<E> elementHandler)
public int drain(final Consumer<E> elementConsumer)
{
return drain(elementHandler, (int)(tail - head));
return drain(elementConsumer, (int)(tail - head));
}

@SuppressWarnings("unchecked")
public int drain(final Consumer<E> elementHandler, final int limit)
public int drain(final Consumer<E> elementConsumer, final int limit)
{
final Object[] buffer = this.buffer;
final long mask = this.capacity - 1;
Expand All @@ -110,7 +110,7 @@ public int drain(final Consumer<E> elementHandler, final int limit)
UNSAFE.putOrderedObject(buffer, elementOffset, null);
nextSequence++;
UNSAFE.putOrderedLong(this, HEAD_OFFSET, nextSequence);
elementHandler.accept((E)item);
elementConsumer.accept((E)item);
}

return (int)(nextSequence - currentHead);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,13 +81,13 @@ public E poll()
return (E)e;
}

public int drain(final Consumer<E> elementHandler)
public int drain(final Consumer<E> elementConsumer)
{
return drain(elementHandler, (int)(tail - head));
return drain(elementConsumer, (int)(tail - head));
}

@SuppressWarnings("unchecked")
public int drain(final Consumer<E> elementHandler, final int limit)
public int drain(final Consumer<E> elementConsumer, final int limit)
{
final Object[] buffer = this.buffer;
final long mask = this.capacity - 1;
Expand All @@ -108,7 +108,7 @@ public int drain(final Consumer<E> elementHandler, final int limit)
UNSAFE.putOrderedObject(buffer, elementOffset, null);
nextSequence++;
UNSAFE.putOrderedLong(this, HEAD_OFFSET, nextSequence);
elementHandler.accept((E)item);
elementConsumer.accept((E)item);
}

return (int)(nextSequence - currentHead);
Expand Down
24 changes: 16 additions & 8 deletions agrona/src/main/java/org/agrona/concurrent/Pipe.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import java.util.function.Consumer;

/**
* A container for items processed in sequence
* A container for items exchanged from producers to consumers.
*/
public interface Pipe<E>
{
Expand All @@ -44,6 +44,13 @@ public interface Pipe<E>
*/
int capacity();

/**
* The number of items currently in the container.
*
* @return number of items currently in the container.
*/
int size();

/**
* Get the remaining capacity for elements in the container given the current size.
*
Expand All @@ -56,29 +63,30 @@ public interface Pipe<E>
* <p>
* If possible, implementations should use smart batching to best handle burst traffic.
*
* @param elementHandler {@link Consumer} for processing elements
* @param elementConsumer {@link Consumer} for processing elements
* @return the number of elements drained
*/
int drain(Consumer<E> elementHandler);
int drain(Consumer<E> elementConsumer);

/**
* Drain the minimum of a limit and the number of elements present in a collection at the time the operation starts.
* <p>
* If possible, implementations should use smart batching to best handle burst traffic.
*
* @param elementHandler {@link Consumer} for processing elements
* @param limit maximum number of elements to be drained.
* @param elementConsumer {@link Consumer} for processing elements
* @param limit maximum number of elements to be drained in a drain operation.
* @return the number of elements drained
*/
int drain(Consumer<E> elementHandler, int limit);
int drain(Consumer<E> elementConsumer, int limit);

/**
* Drain available elements into the provided {@link java.util.Collection} up to a provided maximum limit of elements.
* Drain available elements into the provided {@link java.util.Collection} up to a provided maximum limit of
* elements.
* <p>
* If possible, implementations should use smart batching to best handle burst traffic.
*
* @param target in to which elements are drained.
* @param limit of the maximum number of elements to drain.
* @param limit maximum number of elements to be drained in a drain operation.
* @return the number of elements actually drained.
*/
int drainTo(Collection<? super E> target, int limit);
Expand Down
4 changes: 2 additions & 2 deletions agrona/src/main/java/org/agrona/concurrent/QueuedPipe.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
import java.util.Queue;

/**
* Composed interface for concurrent queues and sequenced containers.
* Composed interface for concurrent {@link Queue} and {@link Pipe}.
*
* @param <E> type of the elements stored in the {@link java.util.Queue}.
* @param <E> type of the elements stored in the {@link Queue}.
*/
public interface QueuedPipe<E> extends Queue<E>, Pipe<E>
{
Expand Down

0 comments on commit bc10fe1

Please sign in to comment.