Skip to content

Commit

Permalink
GEODE-10318: do not add duplicate entries in the locators list (apach…
Browse files Browse the repository at this point in the history
…e#7703)

* move locator parsing to api package
  • Loading branch information
jinmeiliao authored May 23, 2022
1 parent 1e873a6 commit 7e052cd
Show file tree
Hide file tree
Showing 9 changed files with 360 additions and 178 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*
*/

package org.apache.geode.distributed.internal;

import static org.assertj.core.api.Assertions.assertThat;

import java.net.UnknownHostException;

import org.junit.Rule;
import org.junit.Test;

import org.apache.geode.internal.AvailablePortHelper;
import org.apache.geode.internal.inet.LocalHostUtil;
import org.apache.geode.test.junit.rules.LocatorStarterRule;

public class LocatorIntegrationTest {
@Rule
public LocatorStarterRule locator = new LocatorStarterRule();

@Test
public void locatorConfigurationShouldNotBeModifiedIfExists() throws UnknownHostException {
int port = AvailablePortHelper.getRandomAvailableTCPPort();
locator.withPort(port);
String hostAddress = LocalHostUtil.getLocalHost().getHostAddress();
String originalLocators = hostAddress + "[" + port + "]";
locator.withProperty("locators", originalLocators);
locator.startLocator();

String locators = locator.getLocator().getConfig().getLocators();
assertThat(locators).isEqualTo(originalLocators);
}

@Test
public void locatorConfigurationWillBeModifiedToIncludeItselfWithHostName()
throws UnknownHostException {
int port = AvailablePortHelper.getRandomAvailableTCPPort();
locator.withPort(port);
locator.startLocator();

String locators = locator.getLocator().getConfig().getLocators();
String hostAddress = LocalHostUtil.getLocalHost().getCanonicalHostName();
assertThat(locators).isEqualTo(hostAddress + "[" + port + "]");
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,15 @@

import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.net.URI;
import java.net.UnknownHostException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
Expand Down Expand Up @@ -64,13 +66,15 @@
import org.apache.geode.distributed.internal.InternalDistributedSystem.ConnectListener;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.distributed.internal.membership.adapter.ServiceConfig;
import org.apache.geode.distributed.internal.membership.api.LocatorConfigurationParser;
import org.apache.geode.distributed.internal.membership.api.MemberIdentifier;
import org.apache.geode.distributed.internal.membership.api.MembershipConfig;
import org.apache.geode.distributed.internal.membership.api.MembershipConfigurationException;
import org.apache.geode.distributed.internal.membership.api.MembershipLocator;
import org.apache.geode.distributed.internal.membership.api.MembershipLocatorBuilder;
import org.apache.geode.distributed.internal.membership.api.QuorumChecker;
import org.apache.geode.distributed.internal.tcpserver.HostAddress;
import org.apache.geode.distributed.internal.tcpserver.HostAndPort;
import org.apache.geode.distributed.internal.tcpserver.InfoRequest;
import org.apache.geode.distributed.internal.tcpserver.TcpHandler;
import org.apache.geode.distributed.internal.tcpserver.TcpServer;
Expand Down Expand Up @@ -707,71 +711,102 @@ private void startDistributedSystem() throws IOException {
// LOG: changed from config to info
logger.info("Using existing distributed system: {}", existing);
startCache(existing);
} else {
return;
}

StringBuilder sb = new StringBuilder(100);
if (hostAddress != null && !StringUtils.isEmpty(hostAddress.getHostName())) {
sb.append(hostAddress.getHostName());
} else {
sb.append(LocalHostUtil.getLocalHost().getCanonicalHostName());
}
sb.append('[').append(getPort()).append(']');
String thisLocator = sb.toString();

if (peerLocator) {
// append this locator to the locators list from the config properties
boolean setLocatorsProp = false;
String locatorsConfigValue = distributionConfig.getLocators();
if (StringUtils.isNotBlank(locatorsConfigValue)) {
if (!locatorsConfigValue.contains(thisLocator)) {
locatorsConfigValue = locatorsConfigValue + ',' + thisLocator;
setLocatorsProp = true;
}
} else {
locatorsConfigValue = thisLocator;
setLocatorsProp = true;
}
if (setLocatorsProp) {
Properties updateEnv = new Properties();
updateEnv.setProperty(LOCATORS, locatorsConfigValue);
distributionConfig.setApiProps(updateEnv);
String locatorsPropertyName = GEMFIRE_PREFIX + LOCATORS;
if (System.getProperty(locatorsPropertyName) != null) {
System.setProperty(locatorsPropertyName, locatorsConfigValue);
}
}
// No longer default mcast-port to zero.
HostAddress thisLocator = getHostAddress(hostAddress);
if (peerLocator) {
String existingLocators = distributionConfig.getLocators();
String newLocators = addLocatorIfMissing(existingLocators, thisLocator, getPort());
if (!newLocators.equals(existingLocators)) {
setLocatorProperties(newLocators);
}
}

Properties distributedSystemProperties = new Properties();
// LogWriterAppender is now shared via that class
// using a DistributionConfig earlier in this method
distributedSystemProperties.put(DistributionConfig.DS_CONFIG_NAME, distributionConfig);

logger.info("Starting distributed system");

internalDistributedSystem =
InternalDistributedSystem
.connectInternal(distributedSystemProperties, null,
new InternalDistributedSystemMetricsService.Builder(),
membershipLocator);

if (peerLocator) {
// We've created a peer location message handler - it needs to be connected to
// the membership service in order to get membership view notifications
membershipLocator
.setMembership(internalDistributedSystem.getDM()
.getDistribution().getMembership());
}
Properties distributedSystemProperties = new Properties();
// LogWriterAppender is now shared via that class
// using a DistributionConfig earlier in this method
distributedSystemProperties.put(DistributionConfig.DS_CONFIG_NAME, distributionConfig);

logger.info("Starting distributed system");

internalDistributedSystem =
InternalDistributedSystem
.connectInternal(distributedSystemProperties, null,
new InternalDistributedSystemMetricsService.Builder(),
membershipLocator);

if (peerLocator) {
// We've created a peer location message handler - it needs to be connected to
// the membership service in order to get membership view notifications
membershipLocator
.setMembership(internalDistributedSystem.getDM()
.getDistribution().getMembership());
}

internalDistributedSystem.addDisconnectListener(sys -> stop(false, false, false));

startCache(internalDistributedSystem);

internalDistributedSystem.addDisconnectListener(sys -> stop(false, false, false));
logger.info("Locator started on {}[{}]", thisLocator, getPort());

startCache(internalDistributedSystem);
}

HostAddress getHostAddress(HostAddress hostAddress) throws UnknownHostException {
HostAddress thisLocator;
if (hostAddress != null && !StringUtils.isEmpty(hostAddress.getHostName())) {
thisLocator = hostAddress;
} else {
thisLocator = new HostAddress(LocalHostUtil.getLocalHost());
}
return thisLocator;
}

/**
* add this locator to the locators list if not already in, note the locator's
* hostname is used, but if it resoves to be the same as what's already in the
* list, what's specified by the user is preserved
*
* @param existingLocators the existing configured locators
* @param thisLocator this locator's address
* @param port this locator's port
* @return the comma separated list of locators with this locator added
* if this locator is not already in the list
*/
String addLocatorIfMissing(String existingLocators, HostAddress thisLocator, int port)
throws IOException {
String thisLocatorHostnameAndPort = String.format("%s[%d]", thisLocator.getHostName(), port);
if (StringUtils.isBlank(existingLocators)) {
return thisLocatorHostnameAndPort;
}

List<HostAndPort> hostAndPorts;
try {
hostAndPorts = LocatorConfigurationParser.parseLocators(existingLocators, (InetAddress) null);
} catch (MembershipConfigurationException e) {
throw new IOException(e.getMessage(), e);
}

logger.info("Locator started on {}", thisLocator);
if (hostAndPorts.stream().anyMatch(
(hp) -> thisLocator.getAddress().equals(hp.getAddress())
&& port == hp.getPort())) {
return existingLocators;
}
// if not found in existing locators
return existingLocators + "," + thisLocatorHostnameAndPort;
}

private void setLocatorProperties(String locatorsConfigValue) {
Properties updateEnv = new Properties();
updateEnv.setProperty(LOCATORS, locatorsConfigValue);
distributionConfig.setApiProps(updateEnv);
String locatorsPropertyName = GEMFIRE_PREFIX + LOCATORS;
if (System.getProperty(locatorsPropertyName) != null) {
System.setProperty(locatorsPropertyName, locatorsConfigValue);
}
}


private void startCache(DistributedSystem system) throws IOException {
InternalCache internalCache = GemFireCacheImpl.getInstance();
if (internalCache == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

package org.apache.geode.distributed.internal;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doThrow;
Expand All @@ -33,9 +34,11 @@

import org.apache.geode.cache.RegionShortcut;
import org.apache.geode.cache.internal.HttpService;
import org.apache.geode.distributed.internal.tcpserver.HostAddress;
import org.apache.geode.internal.AvailablePortHelper;
import org.apache.geode.internal.cache.InternalCacheForClientAccess;
import org.apache.geode.internal.cache.InternalRegionFactory;
import org.apache.geode.internal.inet.LocalHostUtil;
import org.apache.geode.internal.security.SecurableCommunicationChannel;
import org.apache.geode.logging.internal.LoggingSession;
import org.apache.geode.management.internal.AgentUtil;
Expand Down Expand Up @@ -67,6 +70,8 @@ public void setup() throws URISyntaxException {
when(distributionConfig.getSecurableCommunicationChannels())
.thenReturn(new SecurableCommunicationChannel[] {});
when(distributionConfig.getSecurityAuthTokenEnabledComponents()).thenReturn(new String[] {});
when(distributionConfig.getSSLProtocols()).thenReturn("any");
when(distributionConfig.getClusterSSLProtocols()).thenReturn("any");
when(cache.createInternalRegionFactory(RegionShortcut.REPLICATE)).thenReturn(regionFactory);
when(cache.getOptionalService(HttpService.class))
.thenReturn(Optional.of(httpService));
Expand Down Expand Up @@ -133,4 +138,52 @@ public void startClusterManagementServiceWithRestServiceEnabledDoesNotThrowWhenS
verify(httpService, never()).addWebApplication(eq("/management"), any(), any());
}

@Test
public void getHostAddress() throws Exception {
// use localhost if no bindAddress
HostAddress locator = internalLocator.getHostAddress(null);
assertThat(locator.getAddress()).isEqualTo(LocalHostUtil.getLocalHost());

// use bindAddress name
HostAddress bindAddress = new HostAddress("test");
locator = internalLocator.getHostAddress(bindAddress);
assertThat(locator).isEqualTo(bindAddress);
}

@Test
public void addLocatorToBlankConfig() throws Exception {
String localHostname = LocalHostUtil.getCanonicalLocalHostName();
HostAddress localhost = new HostAddress(LocalHostUtil.getLocalHost());
String locator = internalLocator.addLocatorIfMissing(null, localhost, 1234);
assertThat(locator).isEqualTo(localHostname + "[1234]");
}

@Test
public void configurePeerLocatorWithNoMatchLocatorList() throws Exception {
String localHostname = LocalHostUtil.getCanonicalLocalHostName();
HostAddress localhost = new HostAddress(LocalHostUtil.getLocalHost());
String existing = "10.10.10.10[12345]";
String locator = internalLocator.addLocatorIfMissing(existing, localhost, 1234);
assertThat(locator).isEqualTo(existing + "," + localHostname + "[1234]");
}

@Test
public void configurePeerLocatorWithMatchingLocatorList() throws Exception {
HostAddress localhost = new HostAddress(LocalHostUtil.getLocalHost());
String localAddress = LocalHostUtil.getLocalHost().getHostAddress();
String existing = localAddress + "[12345]";
String locator = internalLocator.addLocatorIfMissing(existing, localhost, 12345);
// what's returned is what's specified in the original configuration
assertThat(locator).isEqualTo(existing);
}

@Test
public void configurePeerLocatorWithMatchingAddressButNoMatchingPort() throws Exception {
HostAddress localhost = new HostAddress(LocalHostUtil.getLocalHost());
String localAddress = LocalHostUtil.getLocalHost().getHostAddress();
String existing = localAddress + "[11110]";
String locator = internalLocator.addLocatorIfMissing(existing, localhost, 12345);
// what's returned is what's specified in the original configuration
assertThat(locator).isEqualTo(existing + "," + localhost.getHostName() + "[12345]");
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
Expand All @@ -11,11 +12,12 @@
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*
*/

package org.apache.geode.distributed.internal.membership.gms;

import static org.apache.geode.distributed.internal.membership.gms.GMSUtil.parseLocators;
import static org.apache.geode.distributed.internal.membership.api.LocatorConfigurationParser.parseLocators;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

Expand All @@ -31,7 +33,7 @@
import org.apache.geode.test.junit.runners.GeodeParamsRunner;

@RunWith(GeodeParamsRunner.class)
public class GMSUtilTest {
public class LocatorConfigurationParserTest {

static final int PORT = 1234; // any old port--no need to have anything actually bound here

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,14 @@
import org.mockito.verification.Timeout;

import org.apache.geode.distributed.internal.membership.api.Authenticator;
import org.apache.geode.distributed.internal.membership.api.LocatorConfigurationParser;
import org.apache.geode.distributed.internal.membership.api.MemberDataBuilder;
import org.apache.geode.distributed.internal.membership.api.MemberIdentifier;
import org.apache.geode.distributed.internal.membership.api.MemberIdentifierFactoryImpl;
import org.apache.geode.distributed.internal.membership.api.MemberStartupException;
import org.apache.geode.distributed.internal.membership.api.MembershipConfig;
import org.apache.geode.distributed.internal.membership.api.MembershipConfigurationException;
import org.apache.geode.distributed.internal.membership.gms.GMSMembershipView;
import org.apache.geode.distributed.internal.membership.gms.GMSUtil;
import org.apache.geode.distributed.internal.membership.gms.MemberIdentifierImpl;
import org.apache.geode.distributed.internal.membership.gms.Services;
import org.apache.geode.distributed.internal.membership.gms.Services.Stopper;
Expand Down Expand Up @@ -443,7 +443,7 @@ public void testRemoveAndLeaveIsNotACrash() throws Exception {

@Test
public void multipleLocatorsWithSameAddressAreCanonicalized() throws Exception {
List<HostAndPort> locators = GMSUtil.parseLocators(
List<HostAndPort> locators = LocatorConfigurationParser.parseLocators(
"localhost[1234],localhost[1234],localhost[1234]", (InetAddress) null);
assertThat(locators.size()).isEqualTo(1);
}
Expand Down
Loading

0 comments on commit 7e052cd

Please sign in to comment.