Skip to content

Commit

Permalink
GEODE-2534 concurrently started locators fail to create a unified system
Browse files Browse the repository at this point in the history
GMS services were being installed in the locator before they were started,
causing the locator to not know its own member ID.  This caused the
concurrent startup registry to not contain its member ID, allowing
FindCoordinatorResponses to be incorrect.
  • Loading branch information
bschuchardt committed Feb 24, 2017
1 parent c35f442 commit 35f6d82
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,10 @@ public String asString() {
}
}
StringBuilder locatorString = new StringBuilder(String.valueOf(ba));
locatorString.append('[').append(this.getPort()).append(']');
Integer port = getPort();
if (port != null && port.intValue() > 0) {
locatorString.append('[').append(this.getPort()).append(']');
}
return locatorString.toString();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,12 +135,6 @@ protected void init() {
this.manager.init(this);
this.joinLeave.init(this);
this.healthMon.init(this);
InternalLocator l = (InternalLocator) org.apache.geode.distributed.Locator.getLocator();
if (l != null && l.getLocatorHandler() != null) {
if (l.getLocatorHandler().setMembershipManager((MembershipManager) this.manager)) {
this.locator = (Locator) l.getLocatorHandler();
}
}
}

protected void start() {
Expand Down Expand Up @@ -176,6 +170,12 @@ protected void start() {
this.joinLeave.started();
this.healthMon.started();
this.manager.started();
InternalLocator l = (InternalLocator) org.apache.geode.distributed.Locator.getLocator();
if (l != null && l.getLocatorHandler() != null) {
if (l.getLocatorHandler().setMembershipManager((MembershipManager) this.manager)) {
this.locator = (Locator) l.getLocatorHandler();
}
}
logger.debug("All membership services have been started");
try {
this.manager.joinDistributedSystem();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ public String toString() {
return "FindCoordinatorResponse(coordinator=" + coordinator + ")";
} else {
return "FindCoordinatorResponse(coordinator=" + coordinator + ", fromView=" + fromView
+ ", viewId=" + (view == null ? "nul" : view.getViewId()) + ", registrants="
+ ", viewId=" + (view == null ? "null" : view.getViewId()) + ", registrants="
+ (registrants == null ? 0 : registrants.size()) + ", senderId=" + senderId
+ ", network partition detection enabled=" + this.networkPartitionDetectionEnabled
+ ", locators preferred as coordinators=" + this.usePreferredCoordinators + ")";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,11 @@ public GMSLocator(InetAddress bindAddress, File stateFile, String locatorString,
@Override
public synchronized boolean setMembershipManager(MembershipManager mgr) {
if (services == null || services.isStopped()) {
logger.info("Peer locator is connecting to local membership services");
services = ((GMSMembershipManager) mgr).getServices();
localAddress = services.getMessenger().getMemberID();
assert localAddress != null : "member address should have been established";
logger.info("Peer locator is connecting to local membership services with ID {}",
localAddress);
services.setLocator(this);
NetView newView = services.getJoinLeave().getView();
if (newView != null) {
Expand Down Expand Up @@ -142,7 +144,7 @@ private synchronized void findServices() {
}
if (services == null) {
try {
wait(2000);
wait(10000);
} catch (InterruptedException e) {
}
}
Expand Down Expand Up @@ -177,12 +179,14 @@ public Object processRequest(Object request) throws IOException {
}
} else if (request instanceof FindCoordinatorRequest) {
findServices();

FindCoordinatorRequest findRequest = (FindCoordinatorRequest) request;
if (!findRequest.getDHAlgo().equals(securityUDPDHAlgo)) {
return new FindCoordinatorResponse(
"Rejecting findCoordinatorRequest, as member not configured same udp security("
+ findRequest.getDHAlgo() + " )as locator (" + securityUDPDHAlgo + ")");
}

if (services != null) {
services.getMessenger().setPublicKey(findRequest.getMyPublicKey(),
findRequest.getMemberID());
Expand All @@ -193,14 +197,22 @@ public Object processRequest(Object request) throws IOException {
registerMbrVsPK.put(new InternalDistributedMemberWrapper(findRequest.getMemberID()),
findRequest.getMyPublicKey());
}
logger.debug("Rejecting a request to find the coordinator - membership services are"
+ " still initializing");
return null;
}

if (findRequest.getMemberID() != null) {
InternalDistributedMember coord = null;

// at this level we want to return the coordinator known to membership services,
// which may be more up-to-date than the one known by the membership manager
if (view == null) {
findServices();
if (services == null) {
// we must know this process's identity in order to respond
return null;
}
}

boolean fromView = false;
Expand Down Expand Up @@ -237,9 +249,7 @@ public Object processRequest(Object request) throws IOException {
}
synchronized (registrants) {
registrants.add(findRequest.getMemberID());
if (services != null) {
coord = services.getJoinLeave().getMemberID();
}
coord = services.getJoinLeave().getMemberID();
for (InternalDistributedMember mbr : registrants) {
if (mbr != coord && (coord == null || mbr.compareTo(coord) < 0)) {
if (!rejections.contains(mbr) && (mbr.getNetMember().preferredForCoordinator()
Expand All @@ -258,12 +268,7 @@ public Object processRequest(Object request) throws IOException {
coordPk = (byte[]) view.getPublicKey(coord);
}
if (coordPk == null) {
if (services != null) {
coordPk = services.getMessenger().getPublicKey(coord);
} else {
// coordPk = GMSEncrypt.getRegisteredPublicKey(coord);
coordPk = registerMbrVsPK.get(new InternalDistributedMemberWrapper(coord));
}
coordPk = services.getMessenger().getPublicKey(coord);
}
response = new FindCoordinatorResponse(coord, localAddress, fromView, view,
new HashSet<InternalDistributedMember>(registrants),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import org.apache.geode.distributed.internal.membership.gms.Services;
import org.apache.geode.test.dunit.AsyncInvocation;
import org.awaitility.Awaitility;
import org.apache.geode.ForcedDisconnectException;
import org.apache.geode.GemFireConfigException;
Expand Down Expand Up @@ -408,10 +410,28 @@ public void testStartTwoLocatorsWithMultiKeystoreSSL() throws Exception {
}

private void startVerifyAndStopLocator(VM loc1, VM loc2, int port1, int port2,
Properties properties) {
Properties properties) throws Exception {
try {
loc2.invoke("startLocator2", () -> startLocatorWithPortAndProperties(port2, properties));
loc1.invoke("startLocator1", () -> startLocatorWithPortAndProperties(port1, properties));
getBlackboard().initBlackboard();
AsyncInvocation<Boolean> async1 = loc1.invokeAsync("startLocator1", () -> {
getBlackboard().signalGate("locator1");
getBlackboard().waitForGate("go", 10, TimeUnit.SECONDS);
return startLocatorWithPortAndProperties(port1, properties);
});

AsyncInvocation<Boolean> async2 = loc2.invokeAsync("startLocator2", () -> {
getBlackboard().signalGate("locator2");
getBlackboard().waitForGate("go", 10, TimeUnit.SECONDS);
return startLocatorWithPortAndProperties(port2, properties);
});

getBlackboard().waitForGate("locator1", 10, TimeUnit.SECONDS);
getBlackboard().waitForGate("locator2", 10, TimeUnit.SECONDS);
getBlackboard().signalGate("go");

async1.await();
async2.await();

} finally {
try {
// verify that they found each other
Expand Down

0 comments on commit 35f6d82

Please sign in to comment.