Skip to content

Commit

Permalink
Merge pull request apache#4046 from BenjaminPerryRoss/feature/GEODE-7179
Browse files Browse the repository at this point in the history
GEODE-7179: Add option to AlterAsyncEventQueue gfsh command for pause-event processing
  • Loading branch information
gesterzhou authored Sep 12, 2019
2 parents 91176d6 + 80976b6 commit 3c2a906
Show file tree
Hide file tree
Showing 2 changed files with 147 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

package org.apache.geode.management.internal.cli.commands;

import static org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueFactoryImpl.DEFAULT_BATCH_TIME_INTERVAL;
import static org.assertj.core.api.Assertions.assertThat;

import org.junit.Before;
Expand All @@ -23,6 +24,7 @@
import org.junit.experimental.categories.Category;

import org.apache.geode.cache.asyncqueue.AsyncEventQueue;
import org.apache.geode.cache.wan.GatewaySender;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.wan.MyAsyncEventListener;
import org.apache.geode.test.dunit.rules.ClusterStartupRule;
Expand All @@ -34,6 +36,10 @@
@Category({AEQTest.class})
public class AlterAsyncEventQueueCommandDUnitTest {

private static final int ALTERED_BATCH_SIZE = 200;
private static final int ALTERED_BATCH_TIME_INTERVAL = 300;
private static final int ALTERED_MAXIMUM_QUEUE_MEMORY = 400;

@Rule
public ClusterStartupRule lsRule = new ClusterStartupRule();

Expand All @@ -60,22 +66,140 @@ public void testAlterAsyncEventQueue() throws Exception {
server1.invoke(() -> {
InternalCache cache = ClusterStartupRule.getCache();
AsyncEventQueue queue = cache.getAsyncEventQueue("queue1");
assertThat(queue.getBatchSize()).isEqualTo(100);
assertThat(queue.getBatchTimeInterval()).isEqualTo(5);
assertThat(queue.getMaximumQueueMemory()).isEqualTo(100);
assertThat(queue.getBatchSize()).isEqualTo(GatewaySender.DEFAULT_BATCH_SIZE);
assertThat(queue.getBatchTimeInterval()).isEqualTo(DEFAULT_BATCH_TIME_INTERVAL);
assertThat(queue.getMaximumQueueMemory())
.isEqualTo(GatewaySender.DEFAULT_MAXIMUM_QUEUE_MEMORY);
});

gfsh.executeAndAssertThat("alter async-event-queue --id=queue1 --batch-size="
+ ALTERED_BATCH_SIZE + " --batch-time-interval=" + ALTERED_BATCH_TIME_INTERVAL
+ " --max-queue-memory=" + ALTERED_MAXIMUM_QUEUE_MEMORY).statusIsSuccess();

// verify that server1's event queue still has the default value
// without restart
server1.invoke(() -> {
InternalCache cache = ClusterStartupRule.getCache();
AsyncEventQueue queue = cache.getAsyncEventQueue("queue1");
assertThat(queue.getBatchSize()).isEqualTo(GatewaySender.DEFAULT_BATCH_SIZE);
assertThat(queue.getBatchTimeInterval()).isEqualTo(DEFAULT_BATCH_TIME_INTERVAL);
assertThat(queue.getMaximumQueueMemory())
.isEqualTo(GatewaySender.DEFAULT_MAXIMUM_QUEUE_MEMORY);
assertThat(cache.getAsyncEventQueue("queue2")).isNull();
});

// restart locator and server without clearing the file system
server1.stop(false);
locator.stop(false);

locator = lsRule.startLocatorVM(0);
server1 = lsRule.startServerVM(1, "group1", locator.getPort());
// verify that server1's queue is updated
server1.invoke(() -> {
InternalCache cache = ClusterStartupRule.getCache();
AsyncEventQueue queue = cache.getAsyncEventQueue("queue1");
assertThat(queue.getBatchSize()).isEqualTo(ALTERED_BATCH_SIZE);
assertThat(queue.getBatchTimeInterval()).isEqualTo(ALTERED_BATCH_TIME_INTERVAL);
assertThat(queue.getMaximumQueueMemory()).isEqualTo(ALTERED_MAXIMUM_QUEUE_MEMORY);
assertThat(cache.getAsyncEventQueue("queue2")).isNull();
});
}

@Test
public void whenAlterCommandUsedToChangeFromPauseToResumeThenAEQBehaviorMustChange()
throws Exception {
gfsh.executeAndAssertThat(
"create async-event-queue --pause-event-processing=true --id=queue1 --group=group1 --listener="
+ MyAsyncEventListener.class.getName())
.statusIsSuccess();

locator.waitUntilAsyncEventQueuesAreReadyOnExactlyThisManyServers("queue1", 1);

// verify that server1's event queue has the default value
server1.invoke(() -> {
InternalCache cache = ClusterStartupRule.getCache();
AsyncEventQueue queue = cache.getAsyncEventQueue("queue1");
assertThat(queue.getBatchSize()).isEqualTo(GatewaySender.DEFAULT_BATCH_SIZE);
assertThat(queue.getBatchTimeInterval()).isEqualTo(DEFAULT_BATCH_TIME_INTERVAL);
assertThat(queue.isDispatchingPaused()).isTrue();
assertThat(queue.getMaximumQueueMemory())
.isEqualTo(GatewaySender.DEFAULT_MAXIMUM_QUEUE_MEMORY);
});

gfsh.executeAndAssertThat(
"alter async-event-queue --id=queue1 --pause-event-processing=false --batch-size="
+ ALTERED_BATCH_SIZE + " --batch-time-interval=" + ALTERED_BATCH_TIME_INTERVAL
+ " --max-queue-memory=" + ALTERED_MAXIMUM_QUEUE_MEMORY)
.statusIsSuccess();

// verify that server1's event queue still has the default value
// without restart
server1.invoke(() -> {
InternalCache cache = ClusterStartupRule.getCache();
AsyncEventQueue queue = cache.getAsyncEventQueue("queue1");
assertThat(queue.getBatchSize()).isEqualTo(GatewaySender.DEFAULT_BATCH_SIZE);
assertThat(queue.getBatchTimeInterval()).isEqualTo(DEFAULT_BATCH_TIME_INTERVAL);
assertThat(queue.getMaximumQueueMemory())
.isEqualTo(GatewaySender.DEFAULT_MAXIMUM_QUEUE_MEMORY);
assertThat(queue.isDispatchingPaused()).isTrue();
assertThat(cache.getAsyncEventQueue("queue2")).isNull();
});

// restart locator and server without clearing the file system
server1.stop(false);
locator.stop(false);

locator = lsRule.startLocatorVM(0);
server1 = lsRule.startServerVM(1, "group1", locator.getPort());
// verify that server1's queue is updated
server1.invoke(() -> {
InternalCache cache = ClusterStartupRule.getCache();
AsyncEventQueue queue = cache.getAsyncEventQueue("queue1");
assertThat(queue.getBatchSize()).isEqualTo(ALTERED_BATCH_SIZE);
assertThat(queue.getBatchTimeInterval()).isEqualTo(ALTERED_BATCH_TIME_INTERVAL);
assertThat(queue.getMaximumQueueMemory()).isEqualTo(ALTERED_MAXIMUM_QUEUE_MEMORY);
assertThat(queue.isDispatchingPaused()).isFalse();
assertThat(cache.getAsyncEventQueue("queue2")).isNull();
});
}

@Test
public void whenAlterCommandUsedToChangeFromResumeStateToPausedThenAEQBehaviorMustChange()
throws Exception {
gfsh.executeAndAssertThat(
"create async-event-queue --pause-event-processing=false --id=queue1 --group=group1 --listener="
+ MyAsyncEventListener.class.getName())
.statusIsSuccess();

locator.waitUntilAsyncEventQueuesAreReadyOnExactlyThisManyServers("queue1", 1);

// verify that server1's event queue has the default value
server1.invoke(() -> {
InternalCache cache = ClusterStartupRule.getCache();
AsyncEventQueue queue = cache.getAsyncEventQueue("queue1");
assertThat(queue.getBatchSize()).isEqualTo(GatewaySender.DEFAULT_BATCH_SIZE);
assertThat(queue.getBatchTimeInterval()).isEqualTo(DEFAULT_BATCH_TIME_INTERVAL);
assertThat(queue.isDispatchingPaused()).isFalse();
assertThat(queue.getMaximumQueueMemory())
.isEqualTo(GatewaySender.DEFAULT_MAXIMUM_QUEUE_MEMORY);
});

gfsh.executeAndAssertThat("alter async-event-queue --id=queue1 " + "--batch-size=200 "
+ "--batch-time-interval=300 " + "--max-queue-memory=400").statusIsSuccess();
gfsh.executeAndAssertThat(
"alter async-event-queue --id=queue1 --pause-event-processing=true --batch-size="
+ ALTERED_BATCH_SIZE + " --batch-time-interval=" + ALTERED_BATCH_TIME_INTERVAL
+ " --max-queue-memory=" + ALTERED_MAXIMUM_QUEUE_MEMORY)
.statusIsSuccess();

// verify that server1's event queue still has the default value
// without restart
server1.invoke(() -> {
InternalCache cache = ClusterStartupRule.getCache();
AsyncEventQueue queue = cache.getAsyncEventQueue("queue1");
assertThat(queue.getBatchSize()).isEqualTo(100);
assertThat(queue.getBatchTimeInterval()).isEqualTo(5);
assertThat(queue.getMaximumQueueMemory()).isEqualTo(100);
assertThat(queue.getBatchSize()).isEqualTo(GatewaySender.DEFAULT_BATCH_SIZE);
assertThat(queue.getBatchTimeInterval()).isEqualTo(DEFAULT_BATCH_TIME_INTERVAL);
assertThat(queue.getMaximumQueueMemory())
.isEqualTo(GatewaySender.DEFAULT_MAXIMUM_QUEUE_MEMORY);
assertThat(queue.isDispatchingPaused()).isFalse();
assertThat(cache.getAsyncEventQueue("queue2")).isNull();
});

Expand All @@ -89,9 +213,10 @@ public void testAlterAsyncEventQueue() throws Exception {
server1.invoke(() -> {
InternalCache cache = ClusterStartupRule.getCache();
AsyncEventQueue queue = cache.getAsyncEventQueue("queue1");
assertThat(queue.getBatchSize()).isEqualTo(200);
assertThat(queue.getBatchTimeInterval()).isEqualTo(300);
assertThat(queue.getMaximumQueueMemory()).isEqualTo(400);
assertThat(queue.getBatchSize()).isEqualTo(ALTERED_BATCH_SIZE);
assertThat(queue.getBatchTimeInterval()).isEqualTo(ALTERED_BATCH_TIME_INTERVAL);
assertThat(queue.getMaximumQueueMemory()).isEqualTo(ALTERED_MAXIMUM_QUEUE_MEMORY);
assertThat(queue.isDispatchingPaused()).isTrue();
assertThat(cache.getAsyncEventQueue("queue2")).isNull();
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ public class AlterAsyncEventQueueCommand extends SingleGfshCommand implements
static final String BATCH_TIME_INTERVAL_HELP = CREATE_ASYNC_EVENT_QUEUE__BATCHTIMEINTERVAL__HELP;
static final String MAXIMUM_QUEUE_MEMORY_HELP =
CREATE_ASYNC_EVENT_QUEUE__MAXIMUM_QUEUE_MEMORY__HELP;
static final String PAUSE_EVENT_PROCESSING = "pause-event-processing";
static final String PAUSE_EVENT_PROCESSING_HELP =
"Pause event processing when the async event queue is created";

@CliCommand(value = COMMAND_NAME, help = COMMAND_HELP)
@CliMetaData(
Expand All @@ -80,7 +83,10 @@ public ResultModel execute(@CliOption(key = ID, mandatory = true, help = ID_HELP
help = BATCH_TIME_INTERVAL_HELP) Integer batchTimeInterval,
@CliOption(key = MAX_QUEUE_MEMORY, help = MAXIMUM_QUEUE_MEMORY_HELP) Integer maxQueueMemory,
@CliOption(key = IFEXISTS, help = IFEXISTS_HELP, specifiedDefaultValue = "true",
unspecifiedDefaultValue = "false") boolean ifExists)
unspecifiedDefaultValue = "false") boolean ifExists,
@CliOption(key = PAUSE_EVENT_PROCESSING, help = PAUSE_EVENT_PROCESSING_HELP,
specifiedDefaultValue = "true",
unspecifiedDefaultValue = "false") boolean pauseEventProcessing)
throws IOException, SAXException, ParserConfigurationException, TransformerException,
EntityNotFoundException {

Expand All @@ -98,6 +104,7 @@ public ResultModel execute(@CliOption(key = ID, mandatory = true, help = ID_HELP

CacheConfig.AsyncEventQueue aeqConfiguration = new CacheConfig.AsyncEventQueue();
aeqConfiguration.setId(id);
aeqConfiguration.setPauseEventProcessing(pauseEventProcessing);

if (batchSize != null) {
aeqConfiguration.setBatchSize(batchSize + "");
Expand Down Expand Up @@ -163,6 +170,9 @@ public boolean updateConfigForGroup(String group, CacheConfig config, Object con
if (StringUtils.isNotBlank(aeqConfiguration.getMaximumQueueMemory())) {
queue.setMaximumQueueMemory(aeqConfiguration.getMaximumQueueMemory());
}
if (aeqConfiguration.isPauseEventProcessing() != null) {
queue.setPauseEventProcessing(aeqConfiguration.isPauseEventProcessing());
}
aeqConfigsHaveBeenUpdated = true;
}

Expand Down

0 comments on commit 3c2a906

Please sign in to comment.