Skip to content

Commit

Permalink
GEODE-8491: Do not store dropped events in stopped primary gateway se… (
Browse files Browse the repository at this point in the history
apache#5509)

* GEODE-8491: Do not store dropped events in stopped primary gateway sender when possible

Instead of storing dropped events in tmpDroppedEvents to later send batch
removal messages when the primary gateway sender is not started, try to send
the batch removal message when the event to be dropped is received.
That way, when the sender is stopped for a long time and there are events
coming, the memory of the AbstractGatewaySender will not grow with entries
in the tmpDroppedEvents member.
  • Loading branch information
albertogpz authored Sep 29, 2020
1 parent 502facc commit 169ca6a
Show file tree
Hide file tree
Showing 4 changed files with 641 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1034,10 +1034,7 @@ public void distribute(EnumListenerEvent operation, EntryEventImpl event,
// If this gateway is not running, return
if (!isRunning()) {
if (this.isPrimary()) {
tmpDroppedEvents.add(clonedEvent);
if (isDebugEnabled) {
logger.debug("add to tmpDroppedEvents for evnet {}", clonedEvent);
}
recordDroppedEvent(clonedEvent);
}
if (isDebugEnabled) {
logger.debug("Returning back without putting into the gateway sender queue:" + event);
Expand Down Expand Up @@ -1118,6 +1115,17 @@ public void distribute(EnumListenerEvent operation, EntryEventImpl event,
}
}

private void recordDroppedEvent(EntryEventImpl event) {
if (this.eventProcessor != null) {
this.eventProcessor.registerEventDroppedInPrimaryQueue(event);
} else {
tmpDroppedEvents.add(event);
if (logger.isDebugEnabled()) {
logger.debug("added to tmpDroppedEvents event: {}", event);
}
}
}

@VisibleForTesting
int getTmpDroppedEventSize() {
return tmpDroppedEvents.size();
Expand All @@ -1138,7 +1146,7 @@ int getTmpDroppedEventSize() {
public void enqueueTempEvents() {
if (this.eventProcessor != null) {// Fix for defect #47308
// process tmpDroppedEvents
EntryEventImpl droppedEvent = null;
EntryEventImpl droppedEvent;
while ((droppedEvent = tmpDroppedEvents.poll()) != null) {
this.eventProcessor.registerEventDroppedInPrimaryQueue(droppedEvent);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1646,6 +1646,20 @@ public static void resumeSender(String senderId) {
}
}

public static void stopSenderInVMsAsync(String senderId, VM... vms) {
List<AsyncInvocation<Void>> tasks = new LinkedList<>();
for (VM vm : vms) {
tasks.add(vm.invokeAsync(() -> stopSender(senderId)));
}
for (AsyncInvocation invocation : tasks) {
try {
invocation.await();
} catch (InterruptedException e) {
fail("Stopping senders was interrupted");
}
}
}

public static void stopSender(String senderId) {
final IgnoredException exln = IgnoredException.addIgnoredException("Could not connect");
IgnoredException exp =
Expand Down
Loading

0 comments on commit 169ca6a

Please sign in to comment.