Skip to content

Commit

Permalink
[Java] Track Image correlation ids in a subscription to be defensive …
Browse files Browse the repository at this point in the history
…and allow add and remove operations to be closer to constant time.
  • Loading branch information
mjpt777 committed Sep 12, 2017
1 parent 5ead930 commit b1f9ab6
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 12 deletions.
4 changes: 3 additions & 1 deletion aeron-client/src/main/java/io/aeron/ClientConductor.java
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,9 @@ public void onAvailableImage(
final String sourceIdentity)
{
final Subscription subscription = subscriptionByRegIdMap.get(subscriberRegistrationId);
if (null != subscription && subscription.registrationId() == subscriberRegistrationId)
if (null != subscription &&
subscription.registrationId() == subscriberRegistrationId &&
!subscription.containsImage(correlationId))
{
final Image image = new Image(
subscription,
Expand Down
23 changes: 12 additions & 11 deletions aeron-client/src/main/java/io/aeron/Subscription.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import io.aeron.logbuffer.*;
import org.agrona.collections.ArrayUtil;
import org.agrona.collections.Long2ObjectHashMap;

import java.util.*;
import java.util.function.Consumer;
Expand All @@ -37,6 +38,7 @@ class SubscriptionFields extends SubscriptionLhsPadding
protected volatile boolean isClosed = false;

protected volatile Image[] images = EMPTY_ARRAY;
protected final Long2ObjectHashMap<Image> imageByCorrelationIdMap = new Long2ObjectHashMap<>();
protected final ClientConductor clientConductor;
protected final String channel;
protected final AvailableImageHandler availableImageHandler;
Expand Down Expand Up @@ -405,6 +407,11 @@ void forceClose()
clientConductor.asyncReleaseSubscription(this);
}

boolean containsImage(final long correlationId)
{
return imageByCorrelationIdMap.containsKey(correlationId);
}

void addImage(final Image image)
{
if (isClosed)
Expand All @@ -413,23 +420,17 @@ void addImage(final Image image)
}
else
{
images = ArrayUtil.add(images, image);
if (null == imageByCorrelationIdMap.put(image.correlationId(), image))
{
images = ArrayUtil.add(images, image);
}
}
}

Image removeImage(final long correlationId)
{
final Image[] oldArray = images;
Image removedImage = null;

for (final Image image : oldArray)
{
if (image.correlationId() == correlationId)
{
removedImage = image;
break;
}
}
final Image removedImage = imageByCorrelationIdMap.remove(correlationId);

if (null != removedImage)
{
Expand Down
3 changes: 3 additions & 0 deletions aeron-client/src/test/java/io/aeron/SubscriptionTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ public class SubscriptionTest
@Before
public void setUp()
{
when(imageOneMock.correlationId()).thenReturn(1L);
when(imageTwoMock.correlationId()).thenReturn(2L);

when(header.flags()).thenReturn(FLAGS);
when(conductor.clientLock()).thenReturn(conductorLock);

Expand Down

0 comments on commit b1f9ab6

Please sign in to comment.