Skip to content

Commit

Permalink
GEODE-7736: Remove depencies on core from MembershipOnlyTest and add …
Browse files Browse the repository at this point in the history
…two member test (apache#4621)

* Don't use geode-core serializer and IDM in MembershipOnlyTest

These two classes are not necessary in the MembershipOnlyTest. As part of this
change, I discovered that StaticSerialization was swizzling some class names
for old client support during serialization but not during deserialization.
Fixing that.

* Registering DSFIDS inside MembershipLocatorBuilder.create

We need the locator to register all of the necessary DSFIDs with it's
serializer when it is starting.

* Giving a separate DSFID to MemberIdentifierImpl

Using a separate DSFID for this class, rather than using the same id as
InternalDistributedMember.

*  Adding a test of two members connecting to MembershipOnlyTest
  • Loading branch information
upthewaterspout authored Jan 29, 2020
1 parent 6ab8757 commit 9de8d20
Show file tree
Hide file tree
Showing 9 changed files with 58 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package org.apache.geode.distributed.internal.membership.gms;

import static org.apache.geode.distributed.internal.membership.adapter.TcpSocketCreatorAdapter.asTcpSocketCreator;
import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
Expand All @@ -30,7 +31,6 @@
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.distributed.internal.membership.api.LifecycleListener;
import org.apache.geode.distributed.internal.membership.api.MemberIdentifier;
import org.apache.geode.distributed.internal.membership.api.MemberIdentifierFactoryImpl;
Expand All @@ -44,10 +44,10 @@
import org.apache.geode.distributed.internal.membership.api.MembershipLocatorBuilder;
import org.apache.geode.distributed.internal.tcpserver.TcpClient;
import org.apache.geode.distributed.internal.tcpserver.TcpSocketCreator;
import org.apache.geode.internal.InternalDataSerializer;
import org.apache.geode.internal.admin.SSLConfig;
import org.apache.geode.internal.net.SocketCreator;
import org.apache.geode.internal.serialization.DSFIDSerializer;
import org.apache.geode.internal.serialization.DSFIDSerializerFactory;
import org.apache.geode.logging.internal.executors.LoggingExecutors;

public class MembershipOnlyTest {
Expand All @@ -63,20 +63,17 @@ public class MembershipOnlyTest {
public void before() throws IOException, MembershipConfigurationException {
localHost = InetAddress.getLocalHost();

// TODO - using geode-core serializer. This is needed to have be able to
// read InternalDistributedMember.
dsfidSerializer = InternalDataSerializer.getDSFIDSerializer();
dsfidSerializer = new DSFIDSerializerFactory().create();

// TODO - using geode-core socket creator
socketCreator = asTcpSocketCreator(new SocketCreator(new SSLConfig.Builder().build()));

final Supplier<ExecutorService> executorServiceSupplier =
() -> LoggingExecutors.newCachedThreadPool("membership", false);
membershipLocator = MembershipLocatorBuilder.<InternalDistributedMember>newLocatorBuilder(
membershipLocator = MembershipLocatorBuilder.<MemberIdentifierImpl>newLocatorBuilder(
socketCreator,
dsfidSerializer.getObjectSerializer(),
dsfidSerializer.getObjectDeserializer(),
temporaryFolder.newFile("locator").toPath(),
dsfidSerializer,
temporaryFolder.newFolder("locator").toPath(),
executorServiceSupplier)
.create();

Expand All @@ -95,7 +92,22 @@ public void locatorStarts() {

@Test
public void memberCanConnectToSelfHostedLocator() throws MemberStartupException {
Membership<MemberIdentifierImpl> membership = startMember("member", membershipLocator);
assertThat(membership.getView().getMembers()).hasSize(1);
}


@Test
public void twoMembersCanConnect() throws MemberStartupException {
Membership<MemberIdentifierImpl> member1 = startMember("member1", membershipLocator);
Membership<MemberIdentifierImpl> member2 = startMember("member2", null);
await().untilAsserted(() -> assertThat(member1.getView().getMembers()).hasSize(2));
await().untilAsserted(() -> assertThat(member2.getView().getMembers()).hasSize(2));
}

private Membership<MemberIdentifierImpl> startMember(String name,
final MembershipLocator embeddedLocator)
throws MemberStartupException {
MembershipConfig config = new MembershipConfig() {
public String getLocators() {
return localHost.getHostName() + '[' + membershipLocator.getPort() + ']';
Expand All @@ -106,14 +118,18 @@ public String getLocators() {
// being associated with a locator
@Override
public int getVmKind() {
return MemberIdentifier.LOCATOR_DM_TYPE;
return embeddedLocator != null ? MemberIdentifier.LOCATOR_DM_TYPE
: MemberIdentifier.NORMAL_DM_TYPE;
}

@Override
public String getName() {
return name;
}
};

// TODO - using geode-core InternalDistributedMember
MemberIdentifierFactoryImpl memberIdFactory = new MemberIdentifierFactoryImpl();


TcpClient locatorClient = new TcpClient(socketCreator, dsfidSerializer.getObjectSerializer(),
dsfidSerializer.getObjectDeserializer());

Expand All @@ -126,16 +142,17 @@ public int getVmKind() {
.setLifecycleListener(lifeCycleListener)
.create();


// TODO - the membership *must* be installed in the locator at this special
// point during membership startup for the start to succeed
doAnswer(invocation -> {
membershipLocator.setMembership(membership);
return null;
}).when(lifeCycleListener).started();

if (embeddedLocator != null) {
doAnswer(invocation -> {
embeddedLocator.setMembership(membership);
return null;
}).when(lifeCycleListener).started();
}

membership.start();
assertThat(membership.getView().getMembers()).hasSize(1);
membership.startEventProcessing();
return membership;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -546,8 +546,7 @@ MAX_POOL_SIZE, new DelayedPoolStatHelper(),
membershipLocator =
MembershipLocatorBuilder.<InternalDistributedMember>newLocatorBuilder(
socketCreator,
InternalDataSerializer.getDSFIDSerializer().getObjectSerializer(),
InternalDataSerializer.getDSFIDSerializer().getObjectDeserializer(),
InternalDataSerializer.getDSFIDSerializer(),
workingDirectory,
executor)
.setConfig(config)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,6 @@ public class InternalDistributedMember extends MemberIdentifierImpl
// Used only by deserialization
public InternalDistributedMember() {}

@Override
public int getDSFID() {
return DISTRIBUTED_MEMBER;
}

/**
* Construct a InternalDistributedMember
* <p>
Expand Down Expand Up @@ -300,6 +295,11 @@ protected void defaultToCurrentHost() {
}
}

@Override
public int getDSFID() {
return DISTRIBUTED_MEMBER;
}

@Override
public void toDataPre_GFE_9_0_0_0(DataOutput out, SerializationContext context)
throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -608,7 +608,7 @@ public void readExternal(ObjectInput in) throws IOException, ClassNotFoundExcept

@Override
public int getDSFID() {
return DEFAULT_MEMBER_ID;
return MEMBER_IDENTIFIER;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@
import org.apache.geode.distributed.internal.tcpserver.ProtocolChecker;
import org.apache.geode.distributed.internal.tcpserver.TcpHandler;
import org.apache.geode.distributed.internal.tcpserver.TcpSocketCreator;
import org.apache.geode.internal.serialization.ObjectDeserializer;
import org.apache.geode.internal.serialization.ObjectSerializer;
import org.apache.geode.internal.serialization.DSFIDSerializer;

public interface MembershipLocatorBuilder<ID extends MemberIdentifier> {
MembershipLocatorBuilder<ID> setPort(int port);
Expand All @@ -47,11 +46,10 @@ MembershipLocator<ID> create()

static <ID extends MemberIdentifier> MembershipLocatorBuilder<ID> newLocatorBuilder(
final TcpSocketCreator socketCreator,
final ObjectSerializer objectSerializer,
final ObjectDeserializer objectDeserializer,
final DSFIDSerializer serializer,
final Path workingDirectory,
final Supplier<ExecutorService> executorServiceSupplier) {
return new MembershipLocatorBuilderImpl<ID>(socketCreator, objectSerializer, objectDeserializer,
return new MembershipLocatorBuilderImpl<ID>(socketCreator, serializer,
workingDirectory, executorServiceSupplier);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,33 +30,29 @@
import org.apache.geode.distributed.internal.tcpserver.ProtocolChecker;
import org.apache.geode.distributed.internal.tcpserver.TcpHandler;
import org.apache.geode.distributed.internal.tcpserver.TcpSocketCreator;
import org.apache.geode.internal.serialization.ObjectDeserializer;
import org.apache.geode.internal.serialization.ObjectSerializer;
import org.apache.geode.internal.serialization.DSFIDSerializer;

public final class MembershipLocatorBuilderImpl<ID extends MemberIdentifier> implements
MembershipLocatorBuilder<ID> {
private final DSFIDSerializer serializer;
private int port = 0;
private InetAddress bindAddress = null;
private ProtocolChecker protocolChecker = (socket, input, firstByte) -> false;
private TcpHandler fallbackHandler = new TcpHandlerNoOp();
private MembershipLocatorStatistics locatorStats = new MembershipLocatorStatisticsNoOp();
private boolean locatorsAreCoordinators = true;
private final TcpSocketCreator socketCreator;
private final ObjectSerializer objectSerializer;
private final ObjectDeserializer objectDeserializer;
private final Path workingDirectory;
private MembershipConfig config = new MembershipConfig() {};
private final Supplier<ExecutorService> executorServiceSupplier;

public MembershipLocatorBuilderImpl(
final TcpSocketCreator socketCreator,
final ObjectSerializer objectSerializer,
final ObjectDeserializer objectDeserializer,
final DSFIDSerializer serializer,
final Path workingDirectory,
final Supplier<ExecutorService> executorServiceSupplier) {
this.socketCreator = socketCreator;
this.objectSerializer = objectSerializer;
this.objectDeserializer = objectDeserializer;
this.serializer = serializer;
this.workingDirectory = workingDirectory;
this.executorServiceSupplier = executorServiceSupplier;
}
Expand Down Expand Up @@ -106,9 +102,11 @@ public MembershipLocatorBuilder<ID> setLocatorStats(MembershipLocatorStatistics
@Override
public MembershipLocator<ID> create()
throws UnknownHostException, MembershipConfigurationException {
Services.registerSerializables(serializer);
return new MembershipLocatorImpl<ID>(port, bindAddress, protocolChecker,
executorServiceSupplier,
socketCreator, objectSerializer, objectDeserializer, fallbackHandler,
socketCreator, serializer.getObjectSerializer(), serializer.getObjectDeserializer(),
fallbackHandler,
locatorsAreCoordinators, locatorStats, workingDirectory, config);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
*/
package org.apache.geode.distributed.internal.membership.gms;

import static org.apache.geode.internal.serialization.DataSerializableFixedID.DEFAULT_MEMBER_ID;
import static org.apache.geode.internal.serialization.DataSerializableFixedID.FINAL_CHECK_PASSED_MESSAGE;
import static org.apache.geode.internal.serialization.DataSerializableFixedID.FIND_COORDINATOR_REQ;
import static org.apache.geode.internal.serialization.DataSerializableFixedID.FIND_COORDINATOR_RESP;
Expand All @@ -26,6 +25,7 @@
import static org.apache.geode.internal.serialization.DataSerializableFixedID.JOIN_REQUEST;
import static org.apache.geode.internal.serialization.DataSerializableFixedID.JOIN_RESPONSE;
import static org.apache.geode.internal.serialization.DataSerializableFixedID.LEAVE_REQUEST_MESSAGE;
import static org.apache.geode.internal.serialization.DataSerializableFixedID.MEMBER_IDENTIFIER;
import static org.apache.geode.internal.serialization.DataSerializableFixedID.NETVIEW;
import static org.apache.geode.internal.serialization.DataSerializableFixedID.NETWORK_PARTITION_MESSAGE;
import static org.apache.geode.internal.serialization.DataSerializableFixedID.REMOVE_MEMBER_REQUEST;
Expand Down Expand Up @@ -175,7 +175,8 @@ public static void registerSerializables(DSFIDSerializer serializer) {
serializer.registerDSFID(FIND_COORDINATOR_RESP, FindCoordinatorResponse.class);
serializer.registerDSFID(JOIN_RESPONSE, JoinResponseMessage.class);
serializer.registerDSFID(JOIN_REQUEST, JoinRequestMessage.class);
serializer.registerDSFID(DEFAULT_MEMBER_ID, MemberIdentifierImpl.class);
serializer.registerDSFID(MEMBER_IDENTIFIER, MemberIdentifierImpl.class);

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ public interface DataSerializableFixedID extends SerializationVersions, BasicSer

short VIEW_ACK_MESSAGE = -151;
short INSTALL_VIEW_MESSAGE = -150;
short DEFAULT_MEMBER_ID = -149; // membership module's default identifier
short NETVIEW = -148;
short GET_VIEW_REQ = -147;
short GET_VIEW_RESP = -146;
Expand Down Expand Up @@ -673,6 +672,7 @@ public interface DataSerializableFixedID extends SerializationVersions, BasicSer
short GATEWAY_SENDER_QUEUE_ENTRY_SYNCHRONIZATION_MESSAGE = 2181;
short GATEWAY_SENDER_QUEUE_ENTRY_SYNCHRONIZATION_ENTRY = 2182;
short ABORT_BACKUP_REQUEST = 2183;
short MEMBER_IDENTIFIER = 2184;

// NOTE, codes > 65535 will take 4 bytes to serialize

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,7 @@ public static Class<?> readClass(DataInput in) throws IOException, ClassNotFound
byte typeCode = in.readByte();
if (typeCode == DSCODE.CLASS.toByte()) {
String className = readString(in);
className = processIncomingClassName(className);
return Class.forName(className);
} else {
return StaticSerialization.decodePrimitiveClass(typeCode);
Expand Down

0 comments on commit 9de8d20

Please sign in to comment.