Skip to content

Commit

Permalink
GEODE-7799: Distribute rebalance status to other locators (apache#4692)
Browse files Browse the repository at this point in the history
Co-authored-by: Joris Melchior <[email protected]>
Co-authored-by: Darrel Schneider <[email protected]>
Co-authored-by: Dale Emery <[email protected]>
  • Loading branch information
3 people authored Feb 20, 2020
1 parent d5d3c78 commit 22403e0
Show file tree
Hide file tree
Showing 43 changed files with 1,773 additions and 986 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package org.apache.geode.management.client;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

import org.junit.BeforeClass;
import org.junit.ClassRule;
Expand Down Expand Up @@ -69,9 +70,11 @@ public void buildWithHostnameOnly() {

@Test
public void buildWithTransportOnlyHavingRestTemplate() {
assertThat(new ClusterManagementServiceBuilder().setTransport(
assertThatThrownBy(() -> new ClusterManagementServiceBuilder().setTransport(
new RestTemplateClusterManagementServiceTransport(new RestTemplate())).build()
.isConnected()).isFalse();
.isConnected())
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("URI is not absolute");
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

import java.util.Objects;

import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
Expand All @@ -36,23 +38,25 @@
import org.apache.geode.test.dunit.rules.ClientVM;
import org.apache.geode.test.dunit.rules.ClusterStartupRule;
import org.apache.geode.test.dunit.rules.MemberVM;
import org.apache.geode.test.junit.rules.MemberStarterRule;


public class ClientClusterManagementServiceDunitTest {
@ClassRule
public static ClusterStartupRule cluster = new ClusterStartupRule(4);

private static MemberVM locator, server, serverWithGroupA;
private static MemberVM locator;
private static MemberVM server;
private static ClientVM client;

private static String groupA = "group-a";
private static ClusterManagementService cmsClient;

@BeforeClass
public static void beforeClass() {
locator = cluster.startLocatorVM(0, l -> l.withHttpService());
locator = cluster.startLocatorVM(0, MemberStarterRule::withHttpService);
server = cluster.startServerVM(1, locator.getPort());
serverWithGroupA = cluster.startServerVM(2, groupA, locator.getPort());
cluster.startServerVM(2, groupA, locator.getPort());
cmsClient = new ClusterManagementServiceBuilder()
.setPort(locator.getHttpPort())
.build();
Expand Down Expand Up @@ -115,7 +119,8 @@ public void createRegionWithGroup() {

locator.invoke(() -> {
InternalConfigurationPersistenceService persistenceService =
ClusterStartupRule.getLocator().getConfigurationPersistenceService();
Objects.requireNonNull(ClusterStartupRule.getLocator())
.getConfigurationPersistenceService();
CacheConfig clusterCacheConfig = persistenceService.getCacheConfig("cluster", true);
CacheConfig groupACacheConfig = persistenceService.getCacheConfig("group-a");
assertThat(find(clusterCacheConfig.getRegions(), "company")).isNull();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,14 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.Optional;
import java.util.concurrent.ExecutionException;

import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;

import org.apache.geode.management.api.ClusterManagementListOperationsResult;
import org.apache.geode.management.api.ClusterManagementOperationResult;
import org.apache.geode.management.api.ClusterManagementService;
import org.apache.geode.management.client.ClusterManagementServiceBuilder;
Expand All @@ -36,62 +37,66 @@
import org.apache.geode.management.operation.RebalanceOperation;
import org.apache.geode.management.runtime.RebalanceRegionResult;
import org.apache.geode.management.runtime.RebalanceResult;
import org.apache.geode.test.awaitility.GeodeAwaitility;
import org.apache.geode.test.dunit.IgnoredException;
import org.apache.geode.test.dunit.rules.ClusterStartupRule;
import org.apache.geode.test.dunit.rules.MemberVM;
import org.apache.geode.test.junit.rules.GfshCommandRule;
import org.apache.geode.test.junit.rules.MemberStarterRule;

public class RebalanceManagementDunitTest {

@ClassRule
public static ClusterStartupRule cluster = new ClusterStartupRule();

private static MemberVM locator, server1, server2;

private static ClusterManagementService client;

@ClassRule
public static GfshCommandRule gfsh = new GfshCommandRule();
private static ClusterManagementService client1, client2;

@BeforeClass
public static void beforeClass() throws Exception {
locator = cluster.startLocatorVM(0, MemberStarterRule::withHttpService);
server1 = cluster.startServerVM(1, "group1", locator.getPort());
server2 = cluster.startServerVM(2, "group2", locator.getPort());

client = new ClusterManagementServiceBuilder()
.setPort(locator.getHttpPort())
public static void beforeClass() {
MemberVM locator1 = cluster.startLocatorVM(0, MemberStarterRule::withHttpService);
int locator1Port = locator1.getPort();
MemberVM locator2 =
cluster.startLocatorVM(1, l -> l.withHttpService().withConnectionToLocator(locator1Port));
cluster.startServerVM(2, "group1", locator1Port);
cluster.startServerVM(3, "group2", locator1Port);

client1 = new ClusterManagementServiceBuilder()
.setHost("localhost")
.setPort(locator1.getHttpPort())
.build();
client2 = new ClusterManagementServiceBuilder()
.setHost("localhost")
.setPort(locator2.getHttpPort())
.build();
gfsh.connect(locator);

// create regions
Region regionConfig = new Region();
regionConfig.setName("customers1");
regionConfig.setType(RegionType.PARTITION);
client.create(regionConfig);
locator.waitUntilRegionIsReadyOnExactlyThisManyServers("/customers1", 2);
client1.create(regionConfig);
locator1.waitUntilRegionIsReadyOnExactlyThisManyServers("/customers1", 2);

regionConfig = new Region();
regionConfig.setName("customers2");
regionConfig.setType(RegionType.PARTITION);
client.create(regionConfig);
locator.waitUntilRegionIsReadyOnExactlyThisManyServers("/customers2", 2);
client1.create(regionConfig);
locator1.waitUntilRegionIsReadyOnExactlyThisManyServers("/customers2", 2);
}

@Test
public void rebalance() throws Exception {
ClusterManagementOperationResult<RebalanceResult> cmr =
client.start(new RebalanceOperation());
assertThat(cmr.isSuccessful()).isTrue();
ClusterManagementOperationResult<RebalanceOperation, RebalanceResult> startResult =
client1.start(new RebalanceOperation());
assertThat(startResult.isSuccessful()).isTrue();
long now = System.currentTimeMillis();
assertThat(cmr.getOperationStart().getTime()).isBetween(now - 60000, now);
assertThat(startResult.getOperationStart().getTime()).isBetween(now - 60000, now);

RebalanceResult result = cmr.getFutureResult().get();
long end = cmr.getFutureOperationEnded().get().getTime();
ClusterManagementOperationResult<RebalanceOperation, RebalanceResult> endResult =
client1.getFuture(new RebalanceOperation(), startResult.getOperationId()).get();
long end = endResult.getOperationEnd().getTime();
now = System.currentTimeMillis();
assertThat(end).isBetween(now - 60000, now)
.isGreaterThanOrEqualTo(cmr.getOperationStart().getTime());
.isGreaterThanOrEqualTo(endResult.getOperationStart().getTime());
RebalanceResult result = endResult.getOperationResult();
assertThat(result.getRebalanceRegionResults().size()).isEqualTo(2);
RebalanceRegionResult firstRegionSummary = result.getRebalanceRegionResults().get(0);
assertThat(firstRegionSummary.getRegionName()).isIn("customers1", "customers2");
Expand All @@ -103,25 +108,32 @@ public void rebalanceExistRegion() throws Exception {
includeRegions.add("customers2");
RebalanceOperation op = new RebalanceOperation();
op.setIncludeRegions(includeRegions);
ClusterManagementOperationResult<RebalanceResult> cmr = client.start(op);
int initialSize = client1.list(op).getResult().size();
ClusterManagementOperationResult<RebalanceOperation, RebalanceResult> cmr = client1.start(op);
assertThat(cmr.isSuccessful()).isTrue();

RebalanceResult result = cmr.getFutureResult().get();
RebalanceResult result = client1.getFuture(new RebalanceOperation(), cmr.getOperationId()).get()
.getOperationResult();
assertThat(result.getRebalanceRegionResults().size()).isEqualTo(1);
RebalanceRegionResult firstRegionSummary = result.getRebalanceRegionResults().get(0);
assertThat(firstRegionSummary.getRegionName()).isEqualTo("customers2");
assertThat(firstRegionSummary.getBucketCreateBytes()).isEqualTo(0);
assertThat(firstRegionSummary.getTimeInMilliseconds()).isGreaterThanOrEqualTo(0);

assertThat(client1.list(op).getResult()).hasSize(initialSize + 1);
assertThat(client2.list(op).getResult()).hasSize(initialSize + 1);
}

@Test
public void rebalanceExcludedRegion() throws Exception {
RebalanceOperation op = new RebalanceOperation();
op.setExcludeRegions(Collections.singletonList("customers1"));
ClusterManagementOperationResult<RebalanceResult> cmr = client.start(op);
ClusterManagementOperationResult<RebalanceOperation, RebalanceResult> cmr = client1.start(op);
assertThat(cmr.isSuccessful()).isTrue();

RebalanceResult result = cmr.getFutureResult().get();
RebalanceResult result =
client1.getFuture(new RebalanceOperation(), cmr.getOperationId()).get()
.getOperationResult();
assertThat(result.getRebalanceRegionResults().size()).isEqualTo(1);
RebalanceRegionResult firstRegionSummary = result.getRebalanceRegionResults().get(0);
assertThat(firstRegionSummary.getRegionName()).isEqualTo("customers2");
Expand All @@ -130,23 +142,43 @@ public void rebalanceExcludedRegion() throws Exception {
}

@Test
public void rebalanceNonExistRegion() throws Exception {
public void rebalanceNonExistRegion() {
IgnoredException.addIgnoredException(ExecutionException.class);
IgnoredException.addIgnoredException(RuntimeException.class);
RebalanceOperation op = new RebalanceOperation();
op.setIncludeRegions(Collections.singletonList("nonexisting_region"));
ClusterManagementOperationResult<RebalanceResult> cmr = client.start(op);
ClusterManagementOperationResult<RebalanceOperation, RebalanceResult> cmr = client1.start(op);
assertThat(cmr.isSuccessful()).isTrue();
String id = cmr.getOperationId();

List<ClusterManagementOperationResult<RebalanceOperation, RebalanceResult>> resultArrayList =
new ArrayList<>();
GeodeAwaitility.await().untilAsserted(() -> {
ClusterManagementOperationResult<RebalanceOperation, RebalanceResult> rebalanceResult =
getRebalanceResult(op, id);
if (rebalanceResult != null) {
resultArrayList.add(rebalanceResult);
}
assertThat(rebalanceResult).isNotNull();
});

assertThat(resultArrayList.get(0).isSuccessful()).isFalse();
assertThat(resultArrayList.get(0).getStatusMessage())
.contains("For the region /nonexisting_region, no member was found");

CompletableFuture<RebalanceResult> future = cmr.getFutureResult();
CompletableFuture<String> message = new CompletableFuture<>();
future.exceptionally((ex) -> {
message.complete(ex.getMessage());
return null;
}).get();
}

assertThat(future.isCompletedExceptionally()).isTrue();
assertThat(message.get()).contains("For the region /nonexisting_region, no member was found");
private ClusterManagementOperationResult<RebalanceOperation, RebalanceResult> getRebalanceResult(
RebalanceOperation op, String id) {
ClusterManagementListOperationsResult<RebalanceOperation, RebalanceResult> listOperationsResult =
client1.list(op);
Optional<ClusterManagementOperationResult<RebalanceOperation, RebalanceResult>> rebalanceResult =
listOperationsResult.getResult()
.stream()
.filter(rbalresult -> rbalresult.getOperationId().equals(id)
&& rbalresult.getOperationEnd() != null)
.findFirst();
return rebalanceResult.orElse(null);
}

@Test
Expand All @@ -155,10 +187,11 @@ public void rebalanceOneExistingOneNonExistingRegion() throws Exception {
IgnoredException.addIgnoredException(RuntimeException.class);
RebalanceOperation op = new RebalanceOperation();
op.setIncludeRegions(Arrays.asList("nonexisting_region", "customers1"));
ClusterManagementOperationResult<RebalanceResult> cmr = client.start(op);
ClusterManagementOperationResult<RebalanceOperation, RebalanceResult> cmr = client1.start(op);
assertThat(cmr.isSuccessful()).isTrue();

RebalanceResult result = cmr.getFutureResult().get();
RebalanceResult result = client1.getFuture(new RebalanceOperation(), cmr.getOperationId()).get()
.getOperationResult();
assertThat(result.getRebalanceRegionResults().size()).isEqualTo(1);
RebalanceRegionResult firstRegionSummary = result.getRebalanceRegionResults().get(0);
assertThat(firstRegionSummary.getRegionName()).isEqualTo("customers1");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ org/apache/geode/management/internal/cli/commands/ShowMetricsCommand$Category
org/apache/geode/management/internal/exceptions/UserErrorException
org/apache/geode/management/internal/json/AbstractJSONFormatter$PreventReserializationModule
org/apache/geode/management/internal/json/QueryResultFormatter$TypeSerializationEnforcerModule
org/apache/geode/management/internal/operation/OperationHistoryManager$OperationInstance
org/apache/geode/security/ResourcePermission
org/apache/geode/security/ResourcePermission$Operation
org/apache/geode/security/ResourcePermission$Resource
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,10 +239,17 @@ public RegionFactory(Properties distributedSystemProperties, String regionAttrib
/**
* Returns the cache used by this factory.
*/
private synchronized InternalCache getCache() {
protected synchronized InternalCache getCache() {
return this.cache;
}

/**
* Returns the Region attributes used by this Region factory.
*/
protected RegionAttributes<K, V> getRegionAttributes() {
return this.attrsFactory.create();
}

/**
* Sets the cache loader for the next {@code RegionAttributes} created.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,26 @@
*/
package org.apache.geode.internal.cache;

import java.io.IOException;

import org.apache.geode.InternalGemFireError;
import org.apache.geode.cache.CacheExistsException;
import org.apache.geode.cache.CacheWriterException;
import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionAttributes;
import org.apache.geode.cache.RegionExistsException;
import org.apache.geode.cache.RegionFactory;
import org.apache.geode.cache.RegionShortcut;
import org.apache.geode.cache.TimeoutException;

/**
* {@code RegionFactoryImpl} extends RegionFactory adding {@link RegionShortcut} support.
*
* @since GemFire 6.5
*/
public class RegionFactoryImpl<K, V> extends RegionFactory<K, V> {
private InternalRegionArguments internalRegionArguments;

public RegionFactoryImpl(InternalCache cache) {
super(cache);
}
Expand All @@ -32,7 +42,7 @@ public RegionFactoryImpl(InternalCache cache, RegionShortcut pra) {
super(cache, pra);
}

public RegionFactoryImpl(InternalCache cache, RegionAttributes ra) {
public RegionFactoryImpl(InternalCache cache, RegionAttributes<K, V> ra) {
super(cache, ra);
}

Expand All @@ -43,4 +53,22 @@ public RegionFactoryImpl(InternalCache cache, String regionAttributesId) {
public RegionFactoryImpl(RegionFactory<K, V> regionFactory) {
super(regionFactory);
}

public void setInternalRegionArguments(
InternalRegionArguments internalRegionArguments) {
this.internalRegionArguments = internalRegionArguments;
}

@Override
public Region<K, V> create(String name)
throws CacheExistsException, RegionExistsException, CacheWriterException, TimeoutException {
if (internalRegionArguments == null) {
return super.create(name);
}
try {
return getCache().createVMRegion(name, getRegionAttributes(), internalRegionArguments);
} catch (IOException | ClassNotFoundException e) {
throw new InternalGemFireError("unexpected exception", e);
}
}
}
Loading

0 comments on commit 22403e0

Please sign in to comment.