Skip to content

Commit

Permalink
GEODE-7458: Adding option in gfsh command "start gateway sender" to c…
Browse files Browse the repository at this point in the history
…ontrol clearing of existing queues (apache#4387)

* GEODE-7458: Solution for parallel GW sender queue restore

* GEODE-7458: update GatewaySender interface due to new option

* GEODE-7458: implement clean queue

* GEODE-7458: Update of interface after comments

* GEODE-7458: Update documentation

* GEODE-7458: patch for failed tests

* GEODE-7458: Remove comments and rename function

* GEODE-7458: Update option name

* GEODE-7458: Fix for integration tests

* GEODE-7458: Fix for lucene tests

* GEODE-7458: Added DUnit for command option

* GEODE-7458: Added distributed test for new functionality

* GEODE-7458: Update documentation

* GEODE-7458: Added UT

* GEODE-7458: Update after comments

* GEODE-7458: Modify OOM problematic UT

* GEODE-7458: Added distributed test for added new member

* GEODE-7458: remove closing of queue region at stop

* GEODE-7458: corrections for SerialGWSender test

* GEODE-7458: added parallel GW fix

* GEODE-7458: small fix

* GEODE-7458: optimize fix

* GEODE-7458: return deleted TC

* GEODE-7458: modify failed stress test, retry

* GEODE-7458: modification after rebase

* GEODE-7458: update after comments

* GEODE-7458: solution for failing tests

* Rebase PR after last changes in develop branch
  • Loading branch information
mivanac authored May 21, 2020
1 parent 7538de5 commit 19d5f78
Show file tree
Hide file tree
Showing 52 changed files with 1,383 additions and 119 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ protected void createGatewaySender() {
@Override
protected AbstractGatewaySenderEventProcessor getEventProcessor() {
ConcurrentParallelGatewaySenderEventProcessor processor =
spy(new ConcurrentParallelGatewaySenderEventProcessor(this.sender, null));
spy(new ConcurrentParallelGatewaySenderEventProcessor(this.sender, null, false));
return processor;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,15 @@ public ParallelAsyncEventQueueImpl(InternalCache cache, StatisticsFactory statis

@Override
public void start() {
this.start(false);
}

@Override
public void startWithCleanQueue() {
this.start(true);
}

private void start(boolean cleanQueues) {
this.getLifeCycleLock().writeLock().lock();
try {
if (isRunning()) {
Expand All @@ -80,7 +89,8 @@ public void start() {
* of Concurrent version of processor and queue.
*/
eventProcessor =
new ConcurrentParallelGatewaySenderEventProcessor(this, getThreadMonitorObj());
new ConcurrentParallelGatewaySenderEventProcessor(this, getThreadMonitorObj(),
cleanQueues);
if (startEventProcessorInPausedState) {
pauseEvenIfProcessorStopped();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,15 @@ public SerialAsyncEventQueueImpl(InternalCache cache, StatisticsFactory statisti

@Override
public void start() {
this.start(false);
}

@Override
public void startWithCleanQueue() {
this.start(true);
}

private void start(boolean cleanQueues) {
if (logger.isDebugEnabled()) {
logger.debug("Starting gatewaySender : {}", this);
}
Expand All @@ -86,7 +95,7 @@ public void start() {
getSenderAdvisor().makeSecondary();
}
}
eventProcessor = createEventProcessor();
eventProcessor = createEventProcessor(cleanQueues);

if (startEventProcessorInPausedState) {
pauseEvenIfProcessorStopped();
Expand All @@ -113,15 +122,15 @@ public void start() {
}
}

protected AbstractGatewaySenderEventProcessor createEventProcessor() {
protected AbstractGatewaySenderEventProcessor createEventProcessor(boolean cleanQueues) {
AbstractGatewaySenderEventProcessor eventProcessor;
if (getDispatcherThreads() > 1) {
eventProcessor = new ConcurrentSerialGatewaySenderEventProcessor(
SerialAsyncEventQueueImpl.this, getThreadMonitorObj());
SerialAsyncEventQueueImpl.this, getThreadMonitorObj(), cleanQueues);
} else {
eventProcessor =
new SerialGatewaySenderEventProcessor(SerialAsyncEventQueueImpl.this, getId(),
getThreadMonitorObj());
getThreadMonitorObj(), cleanQueues);
}
return eventProcessor;
}
Expand Down Expand Up @@ -184,8 +193,6 @@ public void stop() {
InternalDistributedSystem system =
(InternalDistributedSystem) this.cache.getDistributedSystem();
system.handleResourceEvent(ResourceEvent.GATEWAYSENDER_STOP, this);

this.eventProcessor = null;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,12 @@ enum OrderPolicy {
*/
void start();

/**
* Starts this GatewaySender and discards previous queue content. Once the GatewaySender is
* running, its configuration cannot be changed.
*/
void startWithCleanQueue();

/**
* Stops this GatewaySender. The scope of this operation is the VM on which it is invoked. In case
* the GatewaySender is parallel, the GatewaySender will be stopped on individual node where this
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ private static boolean hasOfflineColocatedChildRegions(PartitionedRegion region)
// Look through all of the disk stores for offline colocated child regions
for (DiskStore diskStore : stores) {
// Look at all of the partitioned regions.

for (Map.Entry<String, PRPersistentConfig> entry : ((DiskStoreImpl) diskStore).getAllPRs()
.entrySet()) {

Expand Down Expand Up @@ -242,7 +243,6 @@ private static boolean hasOfflineColocatedChildRegions(PartitionedRegion region)
return hasOfflineChildren;
}


private static boolean ignoreUnrecoveredQueue(PartitionedRegion region, String childName) {
// Hack for #50120 if the childRegion is an async queue, but we
// no longer define the async queue, ignore it.
Expand All @@ -251,6 +251,7 @@ private static boolean ignoreUnrecoveredQueue(PartitionedRegion region, String c
}

String senderId = ParallelGatewaySenderQueue.getSenderId(childName);

if (!region.getAsyncEventQueueIds().contains(senderId)
&& !region.getParallelGatewaySenderIds().contains(senderId) && IGNORE_UNRECOVERED_QUEUE) {
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -552,6 +552,9 @@ public boolean isForInternalUse() {
@Override
public abstract void start();

@Override
public abstract void startWithCleanQueue();

@Override
public abstract void stop();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ public int getTotalQueueSize() {
return getQueue().size();
}

protected abstract void initializeMessageQueue(String id);
protected abstract void initializeMessageQueue(String id, boolean cleanQueues);

public abstract void enqueueEvent(EnumListenerEvent operation, EntryEvent event,
Object substituteValue) throws IOException, CacheException;
Expand Down Expand Up @@ -442,6 +442,7 @@ protected void processQueue() {
for (;;) {
// check before sleeping
if (stopped()) {
this.resetLastPeekedEvents = true;
if (isDebugEnabled) {
logger.debug(
"GatewaySenderEventProcessor is stopped. Returning without peeking events.");
Expand Down Expand Up @@ -654,6 +655,7 @@ protected void processQueue() {
}
// check again, don't do post-processing if we're stopped.
if (stopped()) {
this.resetLastPeekedEvents = true;
break;
}

Expand Down Expand Up @@ -687,7 +689,11 @@ protected void processQueue() {
"During normal processing, unsuccessfully dispatched {} events (batch #{})",
conflatedEventsToBeDispatched.size(), getBatchId());
}
if (stopped() || resetLastPeekedEvents) {
if (stopped()) {
this.resetLastPeekedEvents = true;
break;
}
if (resetLastPeekedEvents) {
break;
}
try {
Expand All @@ -699,7 +705,9 @@ protected void processQueue() {
Thread.currentThread().interrupt();
}
}
incrementBatchId();
if (!resetLastPeekedEvents) {
incrementBatchId();
}
}
}
} // unsuccessful batch
Expand Down Expand Up @@ -1240,6 +1248,10 @@ public void closeProcessor() {
if (this.sender.isPrimary() && this.queue.size() > 0) {
logger.warn("Destroying GatewayEventDispatcher with actively queued data.");
}
if (resetLastPeekedEvents) {
resetLastPeekedEvents();
resetLastPeekedEvents = false;
}
} catch (RegionDestroyedException ignore) {
} catch (CancelException ignore) {
} catch (CacheException ignore) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public class ConcurrentParallelGatewaySenderEventProcessor
final int nDispatcher;

public ConcurrentParallelGatewaySenderEventProcessor(AbstractGatewaySender sender,
ThreadsMonitoring tMonitoring) {
ThreadsMonitoring tMonitoring, boolean cleanQueues) {
super("Event Processor for GatewaySender_" + sender.getId(), sender, tMonitoring);
// initializeMessageQueue(sender.getId());
logger.info("ConcurrentParallelGatewaySenderEventProcessor: dispatcher threads {}",
Expand Down Expand Up @@ -102,25 +102,26 @@ public ConcurrentParallelGatewaySenderEventProcessor(AbstractGatewaySender sende
logger.debug("The target PRs are {} Dispatchers: {}", targetRs, nDispatcher);
}

createProcessors(sender.getDispatcherThreads(), targetRs);
createProcessors(sender.getDispatcherThreads(), targetRs, cleanQueues);

// this.queue = parallelQueue;
this.queue = new ConcurrentParallelGatewaySenderQueue(sender, this.processors);
}

protected void createProcessors(int dispatcherThreads, Set<Region> targetRs) {
protected void createProcessors(int dispatcherThreads, Set<Region> targetRs,
boolean cleanQueues) {
processors = new ParallelGatewaySenderEventProcessor[sender.getDispatcherThreads()];
if (logger.isDebugEnabled()) {
logger.debug("Creating AsyncEventProcessor");
}
for (int i = 0; i < sender.getDispatcherThreads(); i++) {
processors[i] = new ParallelGatewaySenderEventProcessor(sender, targetRs, i,
sender.getDispatcherThreads(), getThreadMonitorObj());
sender.getDispatcherThreads(), getThreadMonitorObj(), cleanQueues);
}
}

@Override
protected void initializeMessageQueue(String id) {
protected void initializeMessageQueue(String id, boolean cleanQueues) {
// nothing
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.util.Set;
import java.util.concurrent.BlockingQueue;

import org.apache.geode.annotations.VisibleForTesting;
import org.apache.geode.cache.CacheException;
import org.apache.geode.cache.CacheListener;
import org.apache.geode.cache.Region;
Expand Down Expand Up @@ -81,6 +82,11 @@ public Set<PartitionedRegion> getRegions() {
return ((ParallelGatewaySenderQueue) (processors[0].getQueue())).getRegions();
}

@VisibleForTesting
public boolean getCleanQueues() {
return ((ParallelGatewaySenderQueue) (processors[0].getQueue())).getCleanQueues();
}

@Override
public Object take() throws CacheException, InterruptedException {
throw new UnsupportedOperationException("This method(take) is not supported");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,26 +47,27 @@ public class ParallelGatewaySenderEventProcessor extends AbstractGatewaySenderEv
final int nDispatcher;

protected ParallelGatewaySenderEventProcessor(AbstractGatewaySender sender,
ThreadsMonitoring tMonitoring) {
ThreadsMonitoring tMonitoring, boolean cleanQueues) {
super("Event Processor for GatewaySender_" + sender.getId(), sender, tMonitoring);
this.index = 0;
this.nDispatcher = 1;
initializeMessageQueue(sender.getId());
initializeMessageQueue(sender.getId(), cleanQueues);
}

/**
* use in concurrent scenario where queue is to be shared among all the processors.
*/
protected ParallelGatewaySenderEventProcessor(AbstractGatewaySender sender,
Set<Region> userRegions, int id, int nDispatcher, ThreadsMonitoring tMonitoring) {
Set<Region> userRegions, int id, int nDispatcher, ThreadsMonitoring tMonitoring,
boolean cleanQueues) {
super("Event Processor for GatewaySender_" + sender.getId() + "_" + id, sender, tMonitoring);
this.index = id;
this.nDispatcher = nDispatcher;
initializeMessageQueue(sender.getId());
initializeMessageQueue(sender.getId(), cleanQueues);
}

@Override
protected void initializeMessageQueue(String id) {
protected void initializeMessageQueue(String id, boolean cleanQueues) {
Set<Region> targetRs = new HashSet<Region>();
for (InternalRegion region : sender.getCache().getApplicationRegions()) {
if (region.getAllGatewaySenderIds().contains(id)) {
Expand All @@ -78,7 +79,8 @@ protected void initializeMessageQueue(String id) {
}

ParallelGatewaySenderQueue queue;
queue = new ParallelGatewaySenderQueue(this.sender, targetRs, this.index, this.nDispatcher);
queue = new ParallelGatewaySenderQueue(this.sender, targetRs, this.index, this.nDispatcher,
cleanQueues);

queue.start();
this.queue = queue;
Expand Down
Loading

0 comments on commit 19d5f78

Please sign in to comment.