Skip to content

Commit

Permalink
GEODE-6607: Moving client registration queue to CacheClientNotifier
Browse files Browse the repository at this point in the history
To avoid client subscription data inconsistencies, we need to ensure
that we minimize the chance that an event is processed while a client is
registering but before it has fully registered.  There are two major
phases in registration - one is to request filter info from a peer
already hosting the queue for the client, and the other is doing a GII
of the queue from a peer.  If an event which a client would be
interested in is processed concurrently during registration, but before
the filter info has been fully received and processed, the event will be
missed by the client.  To reduce this window, we will start queueing
events for the registering client as soon as possible (deserialization
of the client proxy membership ID).  After registration is complete, we
drain the queued events and put them into the clients subscription
queue.

To make this code unit testable, it was necessary to extract the logic
reading data off the socket/deserializing that data into a separate
class which can be injected, the ClientRegistrationMetadata class.  This
allows us to mock a ClientRegistrationMetadata without actually doing any IO.
The CacheClientNotifier could be futher broken up to allow for even more
unit testability, but this was a first step in the right direction.

Co-authored-by: Ryan McMahon <[email protected]>
Co-authored-by: Murtuza Boxwala <[email protected]>
Co-authored-by: Ernie Burghardt <[email protected]>
  • Loading branch information
3 people committed Apr 22, 2019
1 parent fe0ddc5 commit afc311c
Show file tree
Hide file tree
Showing 10 changed files with 1,239 additions and 739 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -1840,9 +1840,15 @@ public void run() {
logger.info(":Cache server: Initializing {} server-to-client communication socket: {}",
isPrimaryServerToClient ? "primary" : "secondary", socket);
try {
acceptor.getCacheClientNotifier().registerClient(socket, isPrimaryServerToClient,
acceptor.getAcceptorId(), acceptor.isNotifyBySubscription());
} catch (IOException ex) {
ClientRegistrationMetadata clientRegistrationMetadata =
new ClientRegistrationMetadata(acceptor.cache, socket);

if (clientRegistrationMetadata.initialize()) {
acceptor.getCacheClientNotifier().registerClient(clientRegistrationMetadata, socket,
isPrimaryServerToClient,
acceptor.getAcceptorId(), acceptor.isNotifyBySubscription());
}
} catch (final IOException ex) {
closeSocket(socket);
if (acceptor.isRunning()) {
if (!acceptor.loggedAcceptError) {
Expand Down
Loading

0 comments on commit afc311c

Please sign in to comment.