Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master'
Browse files Browse the repository at this point in the history
  • Loading branch information
mjpt777 committed Feb 22, 2020
2 parents e8a232c + f50e8fd commit 01184ef
Show file tree
Hide file tree
Showing 7 changed files with 239 additions and 52 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ on:
branches:
- master
pull_request:
types: [opened, synchronize, reopened]
types: [opened, synchronize]
branches:
- master

Expand Down
82 changes: 81 additions & 1 deletion agrona/src/main/java/org/agrona/BufferUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,14 @@
*/
package org.agrona;

import java.nio.*;
import java.lang.invoke.MethodHandle;
import java.lang.invoke.MethodHandles;
import java.nio.Buffer;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.charset.StandardCharsets;

import static java.lang.invoke.MethodType.methodType;
import static org.agrona.BitUtil.isPowerOfTwo;
import static org.agrona.UnsafeAccess.UNSAFE;

Expand All @@ -26,6 +31,9 @@
*/
public final class BufferUtil
{
private static final MethodHandle INVOKE_CLEANER;
private static final MethodHandle GET_CLEANER;
private static final MethodHandle CLEAN;
public static final byte[] NULL_BYTES = "null".getBytes(StandardCharsets.UTF_8);
public static final ByteOrder NATIVE_BYTE_ORDER = ByteOrder.nativeOrder();
public static final long ARRAY_BASE_OFFSET = UNSAFE.arrayBaseOffset(byte[].class);
Expand All @@ -44,6 +52,30 @@ public final class BufferUtil
ByteBuffer.class.getDeclaredField("offset"));

BYTE_BUFFER_ADDRESS_FIELD_OFFSET = UNSAFE.objectFieldOffset(Buffer.class.getDeclaredField("address"));

MethodHandle invokeCleaner = null;
MethodHandle getCleaner = null;
MethodHandle clean = null;
final MethodHandles.Lookup lookup = MethodHandles.lookup();

try
{
invokeCleaner = lookup.findVirtual(
UNSAFE.getClass(), "invokeCleaner", methodType(void.class, ByteBuffer.class));
}
catch (final NoSuchMethodException ex)
{
// JDK 8 fallback
final Class<?> directBuffer = Class.forName("sun.nio.ch.DirectBuffer");
final Class<?> cleaner = Class.forName("sun.misc.Cleaner");
getCleaner = lookup.findVirtual(directBuffer, "cleaner", methodType(cleaner));
clean = lookup.findVirtual(cleaner, "clean", methodType(void.class));

}

INVOKE_CLEANER = invokeCleaner;
GET_CLEANER = getCleaner;
CLEAN = clean;
}
catch (final Exception ex)
{
Expand Down Expand Up @@ -158,4 +190,52 @@ public static ByteBuffer allocateDirectAligned(final int capacity, final int ali

return buffer.slice();
}

/**
* Free the underlying direct {@link ByteBuffer} by invoking {@code Cleaner} on it. No op if {@code null} or if the
* underlying {@link ByteBuffer} non-direct.
*
* @param buffer to be freed
* @see ByteBuffer#isDirect()
*/
public static void free(final DirectBuffer buffer)
{
if (null != buffer)
{
free(buffer.byteBuffer());
}
}

/**
* Free direct {@link ByteBuffer} by invoking {@code Cleaner} on it. No op if {@code null} or non-direct
* {@link ByteBuffer}.
*
* @param buffer to be freed
* @see ByteBuffer#isDirect()
*/
public static void free(final ByteBuffer buffer)
{
if (null != buffer && buffer.isDirect())
{
try
{
if (null != INVOKE_CLEANER) // JDK 9+
{
INVOKE_CLEANER.invokeExact(UNSAFE, buffer);
}
else // JDK 8
{
final Object cleaner = GET_CLEANER.invoke(buffer);
if (null != cleaner)
{
CLEAN.invoke(cleaner);
}
}
}
catch (final Throwable throwable)
{
LangUtil.rethrowUnchecked(throwable);
}
}
}
}
44 changes: 15 additions & 29 deletions agrona/src/main/java/org/agrona/IoUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.lang.reflect.InvocationTargetException;
import java.lang.invoke.MethodHandle;
import java.lang.invoke.MethodHandles;
import java.lang.reflect.Method;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
Expand Down Expand Up @@ -48,19 +49,20 @@ public final class IoUtil

static class MappingMethods
{
static final Method MAP_ADDRESS;
static final Method UNMAP_ADDRESS;
static final Method UNMAP_BUFFER;
static final MethodHandle MAP_ADDRESS;
static final MethodHandle UNMAP_ADDRESS;

static
{
try
{
final Class<?> fileChannelClass = Class.forName("sun.nio.ch.FileChannelImpl");
final MethodHandles.Lookup lookup = MethodHandles.lookup();

MAP_ADDRESS = getFileChannelMethod(fileChannelClass, "map0", int.class, long.class, long.class);
UNMAP_ADDRESS = getFileChannelMethod(fileChannelClass, "unmap0", long.class, long.class);
UNMAP_BUFFER = getFileChannelMethod(fileChannelClass, "unmap", MappedByteBuffer.class);
MAP_ADDRESS = lookup.unreflect(
getFileChannelMethod(fileChannelClass, "map0", int.class, long.class, long.class));
UNMAP_ADDRESS = lookup.unreflect(
getFileChannelMethod(fileChannelClass, "unmap0", long.class, long.class));
}
catch (final Exception ex)
{
Expand Down Expand Up @@ -493,20 +495,11 @@ public static void checkFileExists(final File file, final String name)
* Unmap a {@link MappedByteBuffer} without waiting for the next GC cycle.
*
* @param buffer to be unmapped.
* @see BufferUtil#free(ByteBuffer)
*/
public static void unmap(final MappedByteBuffer buffer)
{
if (null != buffer)
{
try
{
MappingMethods.UNMAP_BUFFER.invoke(null, buffer);
}
catch (final Exception ex)
{
LangUtil.rethrowUnchecked(ex);
}
}
BufferUtil.free(buffer);
}

/**
Expand All @@ -525,7 +518,7 @@ public static long map(
{
return (long)MappingMethods.MAP_ADDRESS.invoke(fileChannel, getMode(mode), offset, length);
}
catch (final IllegalAccessException | InvocationTargetException ex)
catch (final Throwable ex)
{
LangUtil.rethrowUnchecked(ex);
}
Expand All @@ -544,9 +537,9 @@ public static void unmap(final FileChannel fileChannel, final long address, fina
{
try
{
MappingMethods.UNMAP_ADDRESS.invoke(fileChannel, address, length);
MappingMethods.UNMAP_ADDRESS.invoke(address, length);
}
catch (final IllegalAccessException | InvocationTargetException ex)
catch (final Throwable ex)
{
LangUtil.rethrowUnchecked(ex);
}
Expand Down Expand Up @@ -605,14 +598,7 @@ public static void removeTrailingSlashes(final StringBuilder builder)

private static String getFileMode(final FileChannel.MapMode mode)
{
if (mode == READ_ONLY)
{
return "r";
}
else
{
return "rw";
}
return mode == READ_ONLY ? "r" : "rw";
}

private static int getMode(final FileChannel.MapMode mode)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,11 @@
/**
* An {@link ErrorHandler} which calls {@link AtomicCounter#increment()} before delegating the exception.
*/
public class CountedErrorHandler implements ErrorHandler
public class CountedErrorHandler implements ErrorHandler, AutoCloseable
{
private final ErrorHandler errorHandler;
private final AtomicCounter errorCounter;
private volatile boolean isClosed;

/**
* Construct a counted error handler with a delegate and counter.
Expand All @@ -43,13 +44,38 @@ public CountedErrorHandler(final ErrorHandler errorHandler, final AtomicCounter
this.errorCounter = errorCounter;
}

/**
* Close so that {@link #onError(Throwable)} will not delegate and instead print to {@link System#err}
*/
public void close()
{
isClosed = true;
}

/**
* Has this instance been closed.
*
* @return true if {@link #close()} has previously be called, otherwise false.
*/
public boolean isClosed()
{
return isClosed;
}

public void onError(final Throwable throwable)
{
if (!errorCounter.isClosed())
if (isClosed)
{
errorCounter.increment();
throwable.printStackTrace(System.err);
}
else
{
if (!errorCounter.isClosed())
{
errorCounter.increment();
}

errorHandler.onError(throwable);
errorHandler.onError(throwable);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public class AtomicCounter implements AutoCloseable
private final int id;
private final long addressOffset;
private final byte[] byteArray;
private final CountersManager countersManager;
private CountersManager countersManager;

@SuppressWarnings({"FieldCanBeLocal", "unused"})
private final ByteBuffer byteBuffer; // retained to keep the buffer from being GC'ed
Expand Down Expand Up @@ -76,6 +76,29 @@ public int id()
return id;
}

/**
* Disconnect from {@link CountersManager} if allocated so it can be closed without freeing the slot.
*/
public void disconnectCountersManager()
{
countersManager = null;
}

/**
* Close counter and free the counter slot for reuse of connected to {@link CountersManager}.
*/
public void close()
{
if (!isClosed)
{
isClosed = true;
if (null != countersManager)
{
countersManager.free(id);
}
}
}

/**
* Has this counter been closed?
*
Expand Down Expand Up @@ -291,27 +314,13 @@ public boolean proposeMaxOrdered(final long proposedValue)
return updated;
}

/**
* Free the counter slot for reuse.
*/
public void close()
{
if (!isClosed)
{
isClosed = true;
if (null != countersManager)
{
countersManager.free(id);
}
}
}

public String toString()
{
return "AtomicCounter{" +
"isClosed=" + isClosed() +
", id=" + id +
", value=" + (isClosed() ? -1 : get()) +
", countersManager=" + countersManager +
'}';
}
}
Loading

0 comments on commit 01184ef

Please sign in to comment.