Skip to content

Commit

Permalink
[Java] Add a close linger duration to the client which can be increas…
Browse files Browse the repository at this point in the history
…ed from its default of zero if clients could be starved out. When experiencing forced termination the client will additionally wait for the idle timeout.
  • Loading branch information
mjpt777 committed May 10, 2019
1 parent e303218 commit a630c2b
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 7 deletions.
56 changes: 52 additions & 4 deletions aeron-client/src/main/java/io/aeron/Aeron.java
Original file line number Diff line number Diff line change
Expand Up @@ -389,12 +389,23 @@ public static class Configuration
/**
* Default duration a resource should linger before deletion.
*/
public static final long RESOURCE_LINGER_DURATION_DEFAULT = TimeUnit.SECONDS.toNanos(3);
public static final long RESOURCE_LINGER_DURATION_DEFAULT_NS = TimeUnit.SECONDS.toNanos(3);

/**
* Duration to linger on close so that publishers subscribers have time to notice closed resources.
* This value can be set to a few seconds if the application is likely to experience CPU starvation or
* long GC pauses.
*/
public static final String CLOSE_LINGER_DURATION_PROP_NAME = "aeron.client.close.linger.duration";

/**
* Default duration to linger on close so that publishers subscribers have time to notice closed resources.
*/
public static final long CLOSE_LINGER_DURATION_DEFAULT_NS = 0;

/**
* The Default handler for Aeron runtime exceptions.
* When a {@link DriverTimeoutException} is encountered, this handler will
* exit the program.
* When a {@link DriverTimeoutException} is encountered, this handler will exit the program.
* <p>
* The error handler can be overridden by supplying an {@link Context} with a custom handler.
*
Expand All @@ -421,7 +432,19 @@ public static class Configuration
*/
public static long resourceLingerDurationNs()
{
return getDurationInNanos(RESOURCE_LINGER_DURATION_PROP_NAME, RESOURCE_LINGER_DURATION_DEFAULT);
return getDurationInNanos(RESOURCE_LINGER_DURATION_PROP_NAME, RESOURCE_LINGER_DURATION_DEFAULT_NS);
}

/**
* Duration to wait while lingering a entity such as an {@link Image} before deleting underlying resources
* such as memory mapped files.
*
* @return duration in nanoseconds to wait before deleting a expired resource.
* @see #RESOURCE_LINGER_DURATION_PROP_NAME
*/
public static long closeLingerDurationNs()
{
return getDurationInNanos(CLOSE_LINGER_DURATION_PROP_NAME, CLOSE_LINGER_DURATION_DEFAULT_NS);
}
}

Expand Down Expand Up @@ -461,6 +484,7 @@ public static class Context extends CommonContext
private long keepAliveIntervalNs = Configuration.KEEPALIVE_INTERVAL_NS;
private long interServiceTimeoutNs = 0;
private long resourceLingerDurationNs = Configuration.resourceLingerDurationNs();
private long closeLingerDurationNs = Configuration.closeLingerDurationNs();

private ThreadFactory threadFactory = Thread::new;

Expand Down Expand Up @@ -1005,6 +1029,30 @@ public long resourceLingerDurationNs()
return resourceLingerDurationNs;
}

/**
* Duration to linger on closing to allow publishers and subscribers time to notice closed resources.
*
* @param closeLingerDurationNs to wait before deleting resources when closing.
* @return this for a fluent API.
* @see Configuration#CLOSE_LINGER_DURATION_PROP_NAME
*/
public Context closeLingerDurationNs(final long closeLingerDurationNs)
{
this.closeLingerDurationNs = closeLingerDurationNs;
return this;
}

/**
* Duration to linger on closing to allow publishers and subscribers time to notice closed resources.
*
* @return duration in nanoseconds to wait before deleting resources when closing.
* @see Configuration#CLOSE_LINGER_DURATION_PROP_NAME
*/
public long closeLingerDurationNs()
{
return closeLingerDurationNs;
}

/**
* {@inheritDoc}
*/
Expand Down
21 changes: 18 additions & 3 deletions aeron-client/src/main/java/io/aeron/ClientConductor.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,11 @@
import java.util.ArrayList;
import java.util.concurrent.locks.Lock;

import static io.aeron.Aeron.Configuration.IDLE_SLEEP_MS;
import static io.aeron.Aeron.Configuration.IDLE_SLEEP_NS;
import static java.nio.charset.StandardCharsets.US_ASCII;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.NANOSECONDS;

/**
* Client conductor receives responses and notifications from Media Driver and acts on them in addition to forwarding
Expand All @@ -42,6 +44,7 @@ class ClientConductor implements Agent, DriverEventsListener
{
private static final long NO_CORRELATION_ID = Aeron.NULL_VALUE;

private final long closeLingerDurationMs;
private final long keepAliveIntervalNs;
private final long driverTimeoutMs;
private final long driverTimeoutNs;
Expand Down Expand Up @@ -84,6 +87,7 @@ class ClientConductor implements Agent, DriverEventsListener
keepAliveIntervalNs = ctx.keepAliveIntervalNs();
driverTimeoutMs = ctx.driverTimeoutMs();
driverTimeoutNs = MILLISECONDS.toNanos(driverTimeoutMs);
closeLingerDurationMs = NANOSECONDS.toMillis(ctx.closeLingerDurationNs());
interServiceTimeoutNs = ctx.interServiceTimeoutNs();
defaultAvailableImageHandler = ctx.availableImageHandler();
defaultUnavailableImageHandler = ctx.unavailableImageHandler();
Expand All @@ -108,7 +112,20 @@ public void onClose()
{
isClosed = true;
forceCloseResources();
Thread.yield();

try
{
if (isTerminating)
{
Thread.sleep(IDLE_SLEEP_MS);
}

Thread.sleep(closeLingerDurationMs);
}
catch (final InterruptedException ignore)
{
Thread.currentThread().interrupt();
}

for (int i = 0, size = lingeringResources.size(); i < size; i++)
{
Expand Down Expand Up @@ -832,7 +849,6 @@ private void checkServiceInterval(final long nowNs)
isTerminating = true;

forceCloseResources();
Thread.yield();

throw new ConductorServiceTimeoutException("service interval exceeded (ns): " + interServiceTimeoutNs);
}
Expand All @@ -847,7 +863,6 @@ private int checkLiveness(final long nowNs)
isTerminating = true;

forceCloseResources();
Thread.yield();

throw new DriverTimeoutException("MediaDriver keepalive older than (ms): " + driverTimeoutMs);
}
Expand Down

0 comments on commit a630c2b

Please sign in to comment.