Skip to content

Commit

Permalink
[Java]: move aeron client liveness to millis. Use eol indication for …
Browse files Browse the repository at this point in the history
…client timeout for counters, pubs, and subs. Add heartbeat status indicator for each client to stats.
  • Loading branch information
tmontgomery committed Apr 19, 2018
1 parent b9422bf commit c3b0415
Show file tree
Hide file tree
Showing 7 changed files with 166 additions and 33 deletions.
81 changes: 81 additions & 0 deletions aeron-client/src/main/java/io/aeron/status/HeartbeatStatus.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Copyright 2014-2018 Real Logic Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.aeron.status;

import org.agrona.MutableDirectBuffer;
import org.agrona.concurrent.status.AtomicCounter;
import org.agrona.concurrent.status.CountersManager;

import static org.agrona.BitUtil.SIZE_OF_LONG;

/**
* Allocate a counter for tracking the last heartbeat of an entity.
*/
public class HeartbeatStatus
{
/**
* Offset in the key meta data for the registration id of the counter.
*/
public static final int REGISTRATION_ID_OFFSET = 0;

/**
* Allocate a counter for tracking the last heartbeat of an entity.
*
* @param tempBuffer to be used for labels and key.
* @param name of the counter for the label.
* @param typeId of the counter for classification.
* @param countersManager from which to allocated the underlying storage.
* @param registrationId to be associated with the counter.
* @return a new {@link AtomicCounter} for tracking the last heartbeat.
*/
public static AtomicCounter allocate(
final MutableDirectBuffer tempBuffer,
final String name,
final int typeId,
final CountersManager countersManager,
final long registrationId)
{
return new AtomicCounter(
countersManager.valuesBuffer(),
allocateCounterId(tempBuffer, name, typeId, countersManager, registrationId),
countersManager);
}

public static int allocateCounterId(
final MutableDirectBuffer tempBuffer,
final String name,
final int typeId,
final CountersManager countersManager,
final long registrationId)
{
tempBuffer.putLong(REGISTRATION_ID_OFFSET, registrationId);
final int keyLength = REGISTRATION_ID_OFFSET + SIZE_OF_LONG;

int labelLength = 0;
labelLength += tempBuffer.putStringWithoutLengthAscii(keyLength + labelLength, name);
labelLength += tempBuffer.putStringWithoutLengthAscii(keyLength + labelLength, ": ");
labelLength += tempBuffer.putLongAscii(keyLength + labelLength, registrationId);

return countersManager.allocate(
typeId,
tempBuffer,
0,
keyLength,
tempBuffer,
keyLength,
labelLength);
}
}
42 changes: 27 additions & 15 deletions aeron-driver/src/main/java/io/aeron/driver/AeronClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,50 +15,62 @@
*/
package io.aeron.driver;

import org.agrona.concurrent.status.AtomicCounter;

import java.util.concurrent.TimeUnit;

/**
* Aeron client library tracker.
*/
public class AeronClient implements DriverManagedResource
{
private final long clientId;
private final long clientLivenessTimeoutNs;
private long timeOfLastKeepaliveNs;
private final long clientLivenessTimeoutMs;
private final AtomicCounter heartbeatStatus;
private long timeOfLastKeepaliveMs;
private boolean reachedEndOfLife = false;

public AeronClient(final long clientId, final long clientLivenessTimeoutNs, final long nowNs)
public AeronClient(
final long clientId,
final long clientLivenessTimeoutNs,
final long nowMs,
final AtomicCounter heartbeatStatus)
{
this.clientId = clientId;
this.clientLivenessTimeoutNs = clientLivenessTimeoutNs;
this.timeOfLastKeepaliveNs = nowNs;
this.clientLivenessTimeoutMs = Math.max(1, TimeUnit.NANOSECONDS.toMillis(clientLivenessTimeoutNs));
this.timeOfLastKeepaliveMs = nowMs;
this.heartbeatStatus = heartbeatStatus;

heartbeatStatus.setOrdered(nowMs);
}

public void close()
{
if (!heartbeatStatus.isClosed())
{
heartbeatStatus.close();
}
}

public long clientId()
{
return clientId;
}

public long timeOfLastKeepalive()
public void timeOfLastKeepaliveMs(final long nowMs)
{
return timeOfLastKeepaliveNs;
timeOfLastKeepaliveMs = nowMs;
heartbeatStatus.setOrdered(nowMs);
}

public void timeOfLastKeepalive(final long nowNs)
public boolean hasTimedOut()
{
timeOfLastKeepaliveNs = nowNs;
}

public boolean hasTimedOut(final long nowNs)
{
return nowNs > (timeOfLastKeepaliveNs + clientLivenessTimeoutNs);
return reachedEndOfLife;
}

public void onTimeEvent(final long timeNs, final long timeMs, final DriverConductor conductor)
{
if (timeNs > (timeOfLastKeepaliveNs + clientLivenessTimeoutNs))
if (timeMs > (timeOfLastKeepaliveMs + clientLivenessTimeoutMs))
{
reachedEndOfLife = true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public int counterId()

public void onTimeEvent(final long timeNs, final long timeMs, final DriverConductor conductor)
{
if (client.hasTimedOut(timeNs))
if (client.hasTimedOut())
{
reachedEndOfLife = true;
}
Expand Down
16 changes: 10 additions & 6 deletions aeron-driver/src/main/java/io/aeron/driver/DriverConductor.java
Original file line number Diff line number Diff line change
Expand Up @@ -616,7 +616,7 @@ void onAddNetworkSubscription(

final AeronClient client = getOrAddClient(clientId);
final SubscriptionLink subscription = new NetworkSubscriptionLink(
registrationId, channelEndpoint, streamId, channel, client, clientLivenessTimeoutNs, params);
registrationId, channelEndpoint, streamId, channel, client, params);

subscriptionLinks.add(subscription);
clientProxy.onSubscriptionReady(registrationId, channelEndpoint.statusIndicatorCounterId());
Expand All @@ -629,7 +629,7 @@ void onAddIpcSubscription(final String channel, final int streamId, final long r
final ArrayList<SubscriberPosition> subscriberPositions = new ArrayList<>();
final SubscriptionParams params = SubscriptionParams.getSubscriptionParams(ChannelUri.parse(channel));
final IpcSubscriptionLink subscriptionLink = new IpcSubscriptionLink(
registrationId, streamId, channel, getOrAddClient(clientId), clientLivenessTimeoutNs, params);
registrationId, streamId, channel, getOrAddClient(clientId), params);

subscriptionLinks.add(subscriptionLink);

Expand Down Expand Up @@ -668,7 +668,7 @@ void onAddSpySubscription(final String channel, final int streamId, final long r
final SubscriptionParams params = SubscriptionParams.getSubscriptionParams(udpChannel.channelUri());
final ArrayList<SubscriberPosition> subscriberPositions = new ArrayList<>();
final SpySubscriptionLink subscriptionLink = new SpySubscriptionLink(
registrationId, udpChannel, streamId, client, clientLivenessTimeoutNs, params);
registrationId, udpChannel, streamId, client, params);

subscriptionLinks.add(subscriptionLink);

Expand Down Expand Up @@ -748,7 +748,7 @@ void onClientKeepalive(final long clientId)
final AeronClient client = findClient(clients, clientId);
if (null != client)
{
client.timeOfLastKeepalive(cachedNanoClock.nanoTime());
client.timeOfLastKeepaliveMs(cachedEpochClock.time());
}
}

Expand Down Expand Up @@ -803,7 +803,7 @@ void onClientClose(final long clientId, final long correlationId)
final AeronClient client = findClient(clients, clientId);
if (null != client)
{
client.timeOfLastKeepalive(0);
client.timeOfLastKeepaliveMs(0);

clientProxy.operationSucceeded(correlationId);
}
Expand Down Expand Up @@ -1223,7 +1223,11 @@ private AeronClient getOrAddClient(final long clientId)
AeronClient client = findClient(clients, clientId);
if (null == client)
{
client = new AeronClient(clientId, clientLivenessTimeoutNs, cachedNanoClock.nanoTime());
client = new AeronClient(
clientId,
clientLivenessTimeoutNs,
cachedEpochClock.time(),
ClientHeartbeatStatus.allocate(tempBuffer, countersManager, clientId));
clients.add(client);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public long registrationId()

public void onTimeEvent(final long timeNs, final long timeMs, final DriverConductor conductor)
{
if (client.hasTimedOut(timeNs))
if (client.hasTimedOut())
{
reachedEndOfLife = true;
}
Expand Down
14 changes: 4 additions & 10 deletions aeron-driver/src/main/java/io/aeron/driver/SubscriptionLink.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
public abstract class SubscriptionLink implements DriverManagedResource
{
protected final long registrationId;
protected final long clientLivenessTimeoutNs;
protected final int streamId;
protected final int sessionId;
protected final boolean hasSessionId;
Expand All @@ -43,14 +42,12 @@ protected SubscriptionLink(
final int streamId,
final String channel,
final AeronClient aeronClient,
final long clientLivenessTimeoutNs,
final SubscriptionParams params)
{
this.registrationId = registrationId;
this.streamId = streamId;
this.channel = channel;
this.aeronClient = aeronClient;
this.clientLivenessTimeoutNs = clientLivenessTimeoutNs;
this.hasSessionId = params.hasSessionId;
this.sessionId = params.sessionId;
}
Expand Down Expand Up @@ -138,7 +135,7 @@ public void close()

public void onTimeEvent(final long timeNs, final long timeMs, final DriverConductor conductor)
{
if (timeNs > (aeronClient.timeOfLastKeepalive() + clientLivenessTimeoutNs))
if (aeronClient.hasTimedOut())
{
reachedEndOfLife = true;
conductor.cleanupSubscriptionLink(this);
Expand Down Expand Up @@ -167,10 +164,9 @@ class NetworkSubscriptionLink extends SubscriptionLink
final int streamId,
final String channelUri,
final AeronClient aeronClient,
final long clientLivenessTimeoutNs,
final SubscriptionParams params)
{
super(registrationId, streamId, channelUri, aeronClient, clientLivenessTimeoutNs, params);
super(registrationId, streamId, channelUri, aeronClient, params);

this.isReliable = params.isReliable;
this.channelEndpoint = channelEndpoint;
Expand Down Expand Up @@ -219,10 +215,9 @@ class IpcSubscriptionLink extends SubscriptionLink
final int streamId,
final String channelUri,
final AeronClient aeronClient,
final long clientLivenessTimeoutNs,
final SubscriptionParams params)
{
super(registrationId, streamId, channelUri, aeronClient, clientLivenessTimeoutNs, params);
super(registrationId, streamId, channelUri, aeronClient, params);
}

public boolean matches(final IpcPublication publication)
Expand All @@ -240,10 +235,9 @@ class SpySubscriptionLink extends SubscriptionLink
final UdpChannel spiedChannel,
final int streamId,
final AeronClient aeronClient,
final long clientLivenessTimeoutNs,
final SubscriptionParams params)
{
super(registrationId, streamId, spiedChannel.originalUriString(), aeronClient, clientLivenessTimeoutNs, params);
super(registrationId, streamId, spiedChannel.originalUriString(), aeronClient, params);

this.udpChannel = spiedChannel;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright 2014-2018 Real Logic Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.aeron.driver.status;

import io.aeron.status.HeartbeatStatus;
import org.agrona.MutableDirectBuffer;
import org.agrona.concurrent.status.AtomicCounter;
import org.agrona.concurrent.status.CountersManager;

public class ClientHeartbeatStatus
{
/**
* Type id of an Aeron client heartbeat status indicator.
*/
public static final int CLIENT_HEARTBEAT_TYPE_ID = 11;

/**
* Human readable name for the counter.
*/
public static final String NAME = "client-heartbeat";

public static AtomicCounter allocate(
final MutableDirectBuffer tempBuffer,
final CountersManager countersManager,
final long registrationId)
{
return HeartbeatStatus.allocate(tempBuffer, NAME, CLIENT_HEARTBEAT_TYPE_ID, countersManager, registrationId);
}
}

0 comments on commit c3b0415

Please sign in to comment.