diff --git a/aeron-cluster/src/main/java/io/aeron/cluster/service/ClientSession.java b/aeron-cluster/src/main/java/io/aeron/cluster/service/ClientSession.java index 72eec86112..40ace0085e 100644 --- a/aeron-cluster/src/main/java/io/aeron/cluster/service/ClientSession.java +++ b/aeron-cluster/src/main/java/io/aeron/cluster/service/ClientSession.java @@ -38,12 +38,13 @@ public class ClientSession private final long id; private final int responseStreamId; private final String responseChannel; - private Publication responsePublication; private final byte[] encodedPrincipal; private final DirectBufferVector[] vectors = new DirectBufferVector[2]; private final DirectBufferVector messageBuffer = new DirectBufferVector(); private final SessionHeaderEncoder sessionHeaderEncoder = new SessionHeaderEncoder(); private final Cluster cluster; + private Publication responsePublication; + private boolean isClosing; ClientSession( final long sessionId, @@ -107,6 +108,16 @@ public byte[] encodedPrincipal() return encodedPrincipal; } + /** + * Indicates that a request to close this session has been made. + * + * @return whether a request to close this session has been made. + */ + public boolean isClosing() + { + return isClosing; + } + /** * Non-blocking publish of a partial buffer containing a message to a cluster. * @@ -145,6 +156,11 @@ void connect(final Aeron aeron) responsePublication = aeron.addPublication(responseChannel, responseStreamId); } + void markClosing() + { + this.isClosing = true; + } + void disconnect() { CloseHelper.close(responsePublication); diff --git a/aeron-cluster/src/main/java/io/aeron/cluster/service/ClusteredServiceAgent.java b/aeron-cluster/src/main/java/io/aeron/cluster/service/ClusteredServiceAgent.java index 4c2e90e6ed..e6b77cfae0 100644 --- a/aeron-cluster/src/main/java/io/aeron/cluster/service/ClusteredServiceAgent.java +++ b/aeron-cluster/src/main/java/io/aeron/cluster/service/ClusteredServiceAgent.java @@ -176,8 +176,11 @@ public Collection clientSessions() public boolean closeSession(final long clusterSessionId) { - if (sessionByIdMap.containsKey(clusterSessionId)) + final ClientSession clientSession = sessionByIdMap.get(clusterSessionId); + if (clientSession != null && !clientSession.isClosing()) { + clientSession.markClosing(); + serviceControlPublisher.closeSession(clusterSessionId); return true; }