diff --git a/geode-core/src/distributedTest/java/org/apache/geode/management/internal/cli/commands/AlterAsyncEventQueueCommandDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/management/internal/cli/commands/AlterAsyncEventQueueCommandDUnitTest.java index ba42d274032f..0a3c5758fb5b 100644 --- a/geode-core/src/distributedTest/java/org/apache/geode/management/internal/cli/commands/AlterAsyncEventQueueCommandDUnitTest.java +++ b/geode-core/src/distributedTest/java/org/apache/geode/management/internal/cli/commands/AlterAsyncEventQueueCommandDUnitTest.java @@ -15,6 +15,7 @@ package org.apache.geode.management.internal.cli.commands; +import static org.apache.geode.cache.asyncqueue.internal.AsyncEventQueueFactoryImpl.DEFAULT_BATCH_TIME_INTERVAL; import static org.assertj.core.api.Assertions.assertThat; import org.junit.Before; @@ -23,6 +24,7 @@ import org.junit.experimental.categories.Category; import org.apache.geode.cache.asyncqueue.AsyncEventQueue; +import org.apache.geode.cache.wan.GatewaySender; import org.apache.geode.internal.cache.InternalCache; import org.apache.geode.internal.cache.wan.MyAsyncEventListener; import org.apache.geode.test.dunit.rules.ClusterStartupRule; @@ -34,6 +36,10 @@ @Category({AEQTest.class}) public class AlterAsyncEventQueueCommandDUnitTest { + private static final int ALTERED_BATCH_SIZE = 200; + private static final int ALTERED_BATCH_TIME_INTERVAL = 300; + private static final int ALTERED_MAXIMUM_QUEUE_MEMORY = 400; + @Rule public ClusterStartupRule lsRule = new ClusterStartupRule(); @@ -60,22 +66,140 @@ public void testAlterAsyncEventQueue() throws Exception { server1.invoke(() -> { InternalCache cache = ClusterStartupRule.getCache(); AsyncEventQueue queue = cache.getAsyncEventQueue("queue1"); - assertThat(queue.getBatchSize()).isEqualTo(100); - assertThat(queue.getBatchTimeInterval()).isEqualTo(5); - assertThat(queue.getMaximumQueueMemory()).isEqualTo(100); + assertThat(queue.getBatchSize()).isEqualTo(GatewaySender.DEFAULT_BATCH_SIZE); + assertThat(queue.getBatchTimeInterval()).isEqualTo(DEFAULT_BATCH_TIME_INTERVAL); + assertThat(queue.getMaximumQueueMemory()) + .isEqualTo(GatewaySender.DEFAULT_MAXIMUM_QUEUE_MEMORY); + }); + + gfsh.executeAndAssertThat("alter async-event-queue --id=queue1 --batch-size=" + + ALTERED_BATCH_SIZE + " --batch-time-interval=" + ALTERED_BATCH_TIME_INTERVAL + + " --max-queue-memory=" + ALTERED_MAXIMUM_QUEUE_MEMORY).statusIsSuccess(); + + // verify that server1's event queue still has the default value + // without restart + server1.invoke(() -> { + InternalCache cache = ClusterStartupRule.getCache(); + AsyncEventQueue queue = cache.getAsyncEventQueue("queue1"); + assertThat(queue.getBatchSize()).isEqualTo(GatewaySender.DEFAULT_BATCH_SIZE); + assertThat(queue.getBatchTimeInterval()).isEqualTo(DEFAULT_BATCH_TIME_INTERVAL); + assertThat(queue.getMaximumQueueMemory()) + .isEqualTo(GatewaySender.DEFAULT_MAXIMUM_QUEUE_MEMORY); + assertThat(cache.getAsyncEventQueue("queue2")).isNull(); + }); + + // restart locator and server without clearing the file system + server1.stop(false); + locator.stop(false); + + locator = lsRule.startLocatorVM(0); + server1 = lsRule.startServerVM(1, "group1", locator.getPort()); + // verify that server1's queue is updated + server1.invoke(() -> { + InternalCache cache = ClusterStartupRule.getCache(); + AsyncEventQueue queue = cache.getAsyncEventQueue("queue1"); + assertThat(queue.getBatchSize()).isEqualTo(ALTERED_BATCH_SIZE); + assertThat(queue.getBatchTimeInterval()).isEqualTo(ALTERED_BATCH_TIME_INTERVAL); + assertThat(queue.getMaximumQueueMemory()).isEqualTo(ALTERED_MAXIMUM_QUEUE_MEMORY); + assertThat(cache.getAsyncEventQueue("queue2")).isNull(); + }); + } + + @Test + public void whenAlterCommandUsedToChangeFromPauseToResumeThenAEQBehaviorMustChange() + throws Exception { + gfsh.executeAndAssertThat( + "create async-event-queue --pause-event-processing=true --id=queue1 --group=group1 --listener=" + + MyAsyncEventListener.class.getName()) + .statusIsSuccess(); + + locator.waitUntilAsyncEventQueuesAreReadyOnExactlyThisManyServers("queue1", 1); + + // verify that server1's event queue has the default value + server1.invoke(() -> { + InternalCache cache = ClusterStartupRule.getCache(); + AsyncEventQueue queue = cache.getAsyncEventQueue("queue1"); + assertThat(queue.getBatchSize()).isEqualTo(GatewaySender.DEFAULT_BATCH_SIZE); + assertThat(queue.getBatchTimeInterval()).isEqualTo(DEFAULT_BATCH_TIME_INTERVAL); + assertThat(queue.isDispatchingPaused()).isTrue(); + assertThat(queue.getMaximumQueueMemory()) + .isEqualTo(GatewaySender.DEFAULT_MAXIMUM_QUEUE_MEMORY); + }); + + gfsh.executeAndAssertThat( + "alter async-event-queue --id=queue1 --pause-event-processing=false --batch-size=" + + ALTERED_BATCH_SIZE + " --batch-time-interval=" + ALTERED_BATCH_TIME_INTERVAL + + " --max-queue-memory=" + ALTERED_MAXIMUM_QUEUE_MEMORY) + .statusIsSuccess(); + + // verify that server1's event queue still has the default value + // without restart + server1.invoke(() -> { + InternalCache cache = ClusterStartupRule.getCache(); + AsyncEventQueue queue = cache.getAsyncEventQueue("queue1"); + assertThat(queue.getBatchSize()).isEqualTo(GatewaySender.DEFAULT_BATCH_SIZE); + assertThat(queue.getBatchTimeInterval()).isEqualTo(DEFAULT_BATCH_TIME_INTERVAL); + assertThat(queue.getMaximumQueueMemory()) + .isEqualTo(GatewaySender.DEFAULT_MAXIMUM_QUEUE_MEMORY); + assertThat(queue.isDispatchingPaused()).isTrue(); + assertThat(cache.getAsyncEventQueue("queue2")).isNull(); + }); + + // restart locator and server without clearing the file system + server1.stop(false); + locator.stop(false); + + locator = lsRule.startLocatorVM(0); + server1 = lsRule.startServerVM(1, "group1", locator.getPort()); + // verify that server1's queue is updated + server1.invoke(() -> { + InternalCache cache = ClusterStartupRule.getCache(); + AsyncEventQueue queue = cache.getAsyncEventQueue("queue1"); + assertThat(queue.getBatchSize()).isEqualTo(ALTERED_BATCH_SIZE); + assertThat(queue.getBatchTimeInterval()).isEqualTo(ALTERED_BATCH_TIME_INTERVAL); + assertThat(queue.getMaximumQueueMemory()).isEqualTo(ALTERED_MAXIMUM_QUEUE_MEMORY); + assertThat(queue.isDispatchingPaused()).isFalse(); + assertThat(cache.getAsyncEventQueue("queue2")).isNull(); + }); + } + + @Test + public void whenAlterCommandUsedToChangeFromResumeStateToPausedThenAEQBehaviorMustChange() + throws Exception { + gfsh.executeAndAssertThat( + "create async-event-queue --pause-event-processing=false --id=queue1 --group=group1 --listener=" + + MyAsyncEventListener.class.getName()) + .statusIsSuccess(); + + locator.waitUntilAsyncEventQueuesAreReadyOnExactlyThisManyServers("queue1", 1); + + // verify that server1's event queue has the default value + server1.invoke(() -> { + InternalCache cache = ClusterStartupRule.getCache(); + AsyncEventQueue queue = cache.getAsyncEventQueue("queue1"); + assertThat(queue.getBatchSize()).isEqualTo(GatewaySender.DEFAULT_BATCH_SIZE); + assertThat(queue.getBatchTimeInterval()).isEqualTo(DEFAULT_BATCH_TIME_INTERVAL); + assertThat(queue.isDispatchingPaused()).isFalse(); + assertThat(queue.getMaximumQueueMemory()) + .isEqualTo(GatewaySender.DEFAULT_MAXIMUM_QUEUE_MEMORY); }); - gfsh.executeAndAssertThat("alter async-event-queue --id=queue1 " + "--batch-size=200 " - + "--batch-time-interval=300 " + "--max-queue-memory=400").statusIsSuccess(); + gfsh.executeAndAssertThat( + "alter async-event-queue --id=queue1 --pause-event-processing=true --batch-size=" + + ALTERED_BATCH_SIZE + " --batch-time-interval=" + ALTERED_BATCH_TIME_INTERVAL + + " --max-queue-memory=" + ALTERED_MAXIMUM_QUEUE_MEMORY) + .statusIsSuccess(); // verify that server1's event queue still has the default value // without restart server1.invoke(() -> { InternalCache cache = ClusterStartupRule.getCache(); AsyncEventQueue queue = cache.getAsyncEventQueue("queue1"); - assertThat(queue.getBatchSize()).isEqualTo(100); - assertThat(queue.getBatchTimeInterval()).isEqualTo(5); - assertThat(queue.getMaximumQueueMemory()).isEqualTo(100); + assertThat(queue.getBatchSize()).isEqualTo(GatewaySender.DEFAULT_BATCH_SIZE); + assertThat(queue.getBatchTimeInterval()).isEqualTo(DEFAULT_BATCH_TIME_INTERVAL); + assertThat(queue.getMaximumQueueMemory()) + .isEqualTo(GatewaySender.DEFAULT_MAXIMUM_QUEUE_MEMORY); + assertThat(queue.isDispatchingPaused()).isFalse(); assertThat(cache.getAsyncEventQueue("queue2")).isNull(); }); @@ -89,9 +213,10 @@ public void testAlterAsyncEventQueue() throws Exception { server1.invoke(() -> { InternalCache cache = ClusterStartupRule.getCache(); AsyncEventQueue queue = cache.getAsyncEventQueue("queue1"); - assertThat(queue.getBatchSize()).isEqualTo(200); - assertThat(queue.getBatchTimeInterval()).isEqualTo(300); - assertThat(queue.getMaximumQueueMemory()).isEqualTo(400); + assertThat(queue.getBatchSize()).isEqualTo(ALTERED_BATCH_SIZE); + assertThat(queue.getBatchTimeInterval()).isEqualTo(ALTERED_BATCH_TIME_INTERVAL); + assertThat(queue.getMaximumQueueMemory()).isEqualTo(ALTERED_MAXIMUM_QUEUE_MEMORY); + assertThat(queue.isDispatchingPaused()).isTrue(); assertThat(cache.getAsyncEventQueue("queue2")).isNull(); }); } diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/cli/commands/AlterAsyncEventQueueCommand.java b/geode-core/src/main/java/org/apache/geode/management/internal/cli/commands/AlterAsyncEventQueueCommand.java index 2f33da104bbf..9c14a457a806 100644 --- a/geode-core/src/main/java/org/apache/geode/management/internal/cli/commands/AlterAsyncEventQueueCommand.java +++ b/geode-core/src/main/java/org/apache/geode/management/internal/cli/commands/AlterAsyncEventQueueCommand.java @@ -68,6 +68,9 @@ public class AlterAsyncEventQueueCommand extends SingleGfshCommand implements static final String BATCH_TIME_INTERVAL_HELP = CREATE_ASYNC_EVENT_QUEUE__BATCHTIMEINTERVAL__HELP; static final String MAXIMUM_QUEUE_MEMORY_HELP = CREATE_ASYNC_EVENT_QUEUE__MAXIMUM_QUEUE_MEMORY__HELP; + static final String PAUSE_EVENT_PROCESSING = "pause-event-processing"; + static final String PAUSE_EVENT_PROCESSING_HELP = + "Pause event processing when the async event queue is created"; @CliCommand(value = COMMAND_NAME, help = COMMAND_HELP) @CliMetaData( @@ -80,7 +83,10 @@ public ResultModel execute(@CliOption(key = ID, mandatory = true, help = ID_HELP help = BATCH_TIME_INTERVAL_HELP) Integer batchTimeInterval, @CliOption(key = MAX_QUEUE_MEMORY, help = MAXIMUM_QUEUE_MEMORY_HELP) Integer maxQueueMemory, @CliOption(key = IFEXISTS, help = IFEXISTS_HELP, specifiedDefaultValue = "true", - unspecifiedDefaultValue = "false") boolean ifExists) + unspecifiedDefaultValue = "false") boolean ifExists, + @CliOption(key = PAUSE_EVENT_PROCESSING, help = PAUSE_EVENT_PROCESSING_HELP, + specifiedDefaultValue = "true", + unspecifiedDefaultValue = "false") boolean pauseEventProcessing) throws IOException, SAXException, ParserConfigurationException, TransformerException, EntityNotFoundException { @@ -98,6 +104,7 @@ public ResultModel execute(@CliOption(key = ID, mandatory = true, help = ID_HELP CacheConfig.AsyncEventQueue aeqConfiguration = new CacheConfig.AsyncEventQueue(); aeqConfiguration.setId(id); + aeqConfiguration.setPauseEventProcessing(pauseEventProcessing); if (batchSize != null) { aeqConfiguration.setBatchSize(batchSize + ""); @@ -163,6 +170,9 @@ public boolean updateConfigForGroup(String group, CacheConfig config, Object con if (StringUtils.isNotBlank(aeqConfiguration.getMaximumQueueMemory())) { queue.setMaximumQueueMemory(aeqConfiguration.getMaximumQueueMemory()); } + if (aeqConfiguration.isPauseEventProcessing() != null) { + queue.setPauseEventProcessing(aeqConfiguration.isPauseEventProcessing()); + } aeqConfigsHaveBeenUpdated = true; }