Skip to content

Commit

Permalink
GEODE-4092 New protocol does not have an API to get the best server t…
Browse files Browse the repository at this point in the history
…o connect to

GetAllAvailableServers has been replaced by GetServer.  The handler uses
the ServerLocator's processRequest API to ensure we have the same
functionality as existing non-protobuf locator request handling.

If a server isn't found we are returning Success with a null server
location.  Should this be changed to return an error code?
  • Loading branch information
bschuchardt committed Jan 2, 2018
1 parent d0a6394 commit cadecc2
Show file tree
Hide file tree
Showing 9 changed files with 123 additions and 121 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Set;

Expand Down Expand Up @@ -60,9 +58,8 @@ public class ProtobufDriver implements Driver {
*/
ProtobufDriver(Set<InetSocketAddress> locators) throws IOException {
this.locators = locators;
Collection<InetSocketAddress> servers = getAvailableServers();
InetSocketAddress anyServer = servers.iterator().next();
socket = new Socket(anyServer.getAddress(), anyServer.getPort());
InetSocketAddress server = findAServer();
socket = new Socket(server.getAddress(), server.getPort());

final OutputStream outputStream = socket.getOutputStream();
ProtocolVersion.NewConnectionClientVersion.newBuilder()
Expand Down Expand Up @@ -103,13 +100,12 @@ public <K, V> Region<K, V> getRegion(String regionName) {
}

/**
* Queries a locator for the GemFire servers that have Protobuf enabled.
* Queries locators for a Geode server that has Protobuf enabled.
*
* @return Set of Internet-address-or-host-name/port pairs of the GemFire servers that have
* Protobuf enabled.
* @return The server chosen by the Locator service for this client
* @throws IOException
*/
private Collection<InetSocketAddress> getAvailableServers() throws IOException {
private InetSocketAddress findAServer() throws IOException {
IOException lastException = null;

for (InetSocketAddress locator : locators) {
Expand All @@ -131,22 +127,17 @@ private Collection<InetSocketAddress> getAvailableServers() throws IOException {

ClientProtocol.Message.newBuilder()
.setRequest(ClientProtocol.Request.newBuilder()
.setGetAvailableServersRequest(LocatorAPI.GetAvailableServersRequest.newBuilder()))
.setGetServerRequest(LocatorAPI.GetServerRequest.newBuilder()))
.build().writeDelimitedTo(outputStream);

LocatorAPI.GetAvailableServersResponse getAvailableServersResponse = ClientProtocol.Message
.parseDelimitedFrom(inputStream).getResponse().getGetAvailableServersResponse();
if (getAvailableServersResponse.getServersCount() < 1) {
LocatorAPI.GetServerResponse getServerResponse = ClientProtocol.Message
.parseDelimitedFrom(inputStream).getResponse().getGetServerResponse();
if (!getServerResponse.hasServer()) {
continue;
}

ArrayList<InetSocketAddress> availableServers =
new ArrayList<>(getAvailableServersResponse.getServersCount());
for (int i = 0; i < getAvailableServersResponse.getServersCount(); ++i) {
final BasicTypes.Server server = getAvailableServersResponse.getServers(i);
availableServers.add(new InetSocketAddress(server.getHostname(), server.getPort()));
}
return availableServers;
BasicTypes.Server server = getServerResponse.getServer();
return new InetSocketAddress(server.getHostname(), server.getPort());
} catch (IOException e) {
lastException = e;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ message Request {
GetAllRequest getAllRequest = 13;
RemoveRequest removeRequest = 14;

GetAvailableServersRequest getAvailableServersRequest = 40;
GetServerRequest getServerRequest = 40;

GetRegionNamesRequest getRegionNamesRequest = 41;
GetRegionRequest getRegionRequest = 42;

Expand All @@ -57,7 +58,8 @@ message Response {
GetAllResponse getAllResponse = 13;
RemoveResponse removeResponse = 14;

GetAvailableServersResponse getAvailableServersResponse = 40;
GetServerResponse getServerResponse = 40;

GetRegionNamesResponse getRegionNamesResponse = 41;
GetRegionResponse getRegionResponse = 42;

Expand Down
9 changes: 5 additions & 4 deletions geode-protobuf-messages/src/main/proto/v1/locator_API.proto
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,11 @@ package org.apache.geode.internal.protocol.protobuf.v1;

import "v1/basicTypes.proto";

message GetAvailableServersRequest {

message GetServerRequest {
repeated Server excludedServers = 1;
string serverGroup = 2;
}

message GetAvailableServersResponse {
repeated Server servers = 1;
message GetServerResponse {
Server server = 1;
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,22 @@
*/
package org.apache.geode.internal.protocol.protobuf.v1.operations;

import static org.apache.geode.internal.protocol.ProtocolErrorCode.INVALID_REQUEST;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

import org.apache.geode.annotations.Experimental;
import org.apache.geode.cache.client.internal.locator.ClientConnectionRequest;
import org.apache.geode.cache.client.internal.locator.ClientConnectionResponse;
import org.apache.geode.distributed.internal.InternalLocator;
import org.apache.geode.distributed.internal.ServerLocation;
import org.apache.geode.internal.exception.InvalidExecutionContextException;
import org.apache.geode.internal.protocol.Failure;
import org.apache.geode.internal.protocol.MessageExecutionContext;
import org.apache.geode.internal.protocol.Result;
import org.apache.geode.internal.protocol.Success;
Expand All @@ -31,38 +39,54 @@
import org.apache.geode.internal.protocol.protobuf.v1.ClientProtocol;
import org.apache.geode.internal.protocol.protobuf.v1.LocatorAPI;
import org.apache.geode.internal.protocol.protobuf.v1.ProtobufSerializationService;
import org.apache.geode.internal.protocol.protobuf.v1.utilities.ProtobufResponseUtilities;
import org.apache.geode.internal.protocol.serialization.SerializationService;
import org.apache.geode.internal.protocol.state.ConnectionTerminatingStateProcessor;

@Experimental
public class GetAvailableServersOperationHandler implements
ProtobufOperationHandler<LocatorAPI.GetAvailableServersRequest, LocatorAPI.GetAvailableServersResponse> {
public class GetServerOperationHandler
implements ProtobufOperationHandler<LocatorAPI.GetServerRequest, LocatorAPI.GetServerResponse> {

@Override
public Result<LocatorAPI.GetAvailableServersResponse, ClientProtocol.ErrorResponse> process(
ProtobufSerializationService serializationService,
LocatorAPI.GetAvailableServersRequest request,
public Result<LocatorAPI.GetServerResponse, ClientProtocol.ErrorResponse> process(
ProtobufSerializationService serializationService, LocatorAPI.GetServerRequest request,
MessageExecutionContext messageExecutionContext) throws InvalidExecutionContextException {

// A client may send a set of servers to exclude and/or a server-group.
Set<ServerLocation> excludedServers = new HashSet<>();
List<BasicTypes.Server> excludedServersList = request.getExcludedServersList();
for (BasicTypes.Server server : excludedServersList) {
excludedServers.add(new ServerLocation(server.getHostname(), server.getPort()));
}

// note: an empty string is okay - the ServerLocator code checks for this
String serverGroup = request.getServerGroup();

messageExecutionContext.setConnectionStateProcessor(new ConnectionTerminatingStateProcessor());
InternalLocator internalLocator = (InternalLocator) messageExecutionContext.getLocator();
ArrayList serversFromSnapshot =
internalLocator.getServerLocatorAdvisee().getLoadSnapshot().getServers(null);
if (serversFromSnapshot == null) {
serversFromSnapshot = new ArrayList();

// In order to ensure that proper checks are performed on the request we will use
// the locator's processRequest() API. We assume that all servers have Protobuf
// enabled.
ClientConnectionRequest clientConnectionRequest =
new ClientConnectionRequest(excludedServers, serverGroup);
ClientConnectionResponse connectionResponse = (ClientConnectionResponse) internalLocator
.getServerLocatorAdvisee().processRequest(clientConnectionRequest);

ServerLocation serverLocation = null;
if (connectionResponse != null) {
serverLocation = connectionResponse.getServer();
}

Collection<BasicTypes.Server> servers = (Collection<BasicTypes.Server>) serversFromSnapshot
.stream().map(serverLocation -> getServerProtobufMessage((ServerLocation) serverLocation))
.collect(Collectors.toList());
LocatorAPI.GetAvailableServersResponse.Builder builder =
LocatorAPI.GetAvailableServersResponse.newBuilder().addAllServers(servers);
return Success.of(builder.build());
}
LocatorAPI.GetServerResponse.Builder builder = LocatorAPI.GetServerResponse.newBuilder();

private BasicTypes.Server getServerProtobufMessage(ServerLocation serverLocation) {
BasicTypes.Server.Builder serverBuilder = BasicTypes.Server.newBuilder();
serverBuilder.setHostname(serverLocation.getHostName()).setPort(serverLocation.getPort());
return serverBuilder.build();
if (serverLocation != null) {
BasicTypes.Server.Builder serverBuilder = BasicTypes.Server.newBuilder();
serverBuilder.setHostname(serverLocation.getHostName()).setPort(serverLocation.getPort());
BasicTypes.Server server = serverBuilder.build();
builder.setServer(server);
}

return Success.of(builder.build());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@
import org.apache.geode.internal.protocol.protobuf.v1.ClientProtocol.Request.RequestAPICase;
import org.apache.geode.internal.protocol.protobuf.v1.ProtobufOperationContext;
import org.apache.geode.internal.protocol.protobuf.v1.operations.GetAllRequestOperationHandler;
import org.apache.geode.internal.protocol.protobuf.v1.operations.GetAvailableServersOperationHandler;
import org.apache.geode.internal.protocol.protobuf.v1.operations.GetRegionNamesRequestOperationHandler;
import org.apache.geode.internal.protocol.protobuf.v1.operations.GetRegionRequestOperationHandler;
import org.apache.geode.internal.protocol.protobuf.v1.operations.GetRequestOperationHandler;
import org.apache.geode.internal.protocol.protobuf.v1.operations.GetServerOperationHandler;
import org.apache.geode.internal.protocol.protobuf.v1.operations.PutAllRequestOperationHandler;
import org.apache.geode.internal.protocol.protobuf.v1.operations.PutRequestOperationHandler;
import org.apache.geode.internal.protocol.protobuf.v1.operations.RemoveRequestOperationHandler;
Expand Down Expand Up @@ -103,10 +103,10 @@ private void addContexts() {
new ResourcePermission(ResourcePermission.Resource.DATA,
ResourcePermission.Operation.READ)));

operationContexts.put(RequestAPICase.GETAVAILABLESERVERSREQUEST,
new ProtobufOperationContext<>(ClientProtocol.Request::getGetAvailableServersRequest,
new GetAvailableServersOperationHandler(),
opsResp -> ClientProtocol.Response.newBuilder().setGetAvailableServersResponse(opsResp),
operationContexts.put(RequestAPICase.GETSERVERREQUEST,
new ProtobufOperationContext<>(ClientProtocol.Request::getGetServerRequest,
new GetServerOperationHandler(),
opsResp -> ClientProtocol.Response.newBuilder().setGetServerResponse(opsResp),
new ResourcePermission(ResourcePermission.Resource.CLUSTER,
ResourcePermission.Operation.READ)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,8 @@ public static ClientProtocol.Request createPutAllRequest(String regionName,
return ClientProtocol.Request.newBuilder().setPutAllRequest(putAllRequestBuilder).build();
}

public static LocatorAPI.GetAvailableServersRequest createGetAvailableServersRequest() {
LocatorAPI.GetAvailableServersRequest.Builder builder =
LocatorAPI.GetAvailableServersRequest.newBuilder();
public static LocatorAPI.GetServerRequest createGetServerRequest() {
LocatorAPI.GetServerRequest.Builder builder = LocatorAPI.GetServerRequest.newBuilder();
return builder.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,33 +17,28 @@

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;

import java.io.IOException;
import java.net.Socket;
import java.util.List;
import java.util.Properties;

import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.ExpectedException;

import org.apache.geode.cache.server.CacheServer;
import org.apache.geode.distributed.ConfigurationProperties;
import org.apache.geode.distributed.Locator;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.protocol.exception.InvalidProtocolMessageException;
import org.apache.geode.internal.protocol.protobuf.v1.BasicTypes.Server;
import org.apache.geode.internal.protocol.protobuf.v1.ClientProtocol;
import org.apache.geode.internal.protocol.protobuf.v1.ConnectionAPI;
import org.apache.geode.internal.protocol.protobuf.v1.MessageUtil;
import org.apache.geode.internal.protocol.protobuf.v1.serializer.ProtobufProtocolSerializer;
import org.apache.geode.internal.protocol.protobuf.v1.utilities.ProtobufRequestUtilities;
import org.apache.geode.internal.protocol.protobuf.v1.utilities.ProtobufUtilities;
import org.apache.geode.security.SimpleTestSecurityManager;
import org.apache.geode.test.dunit.DistributedTestUtils;
import org.apache.geode.test.dunit.Host;
import org.apache.geode.test.dunit.cache.internal.JUnit4CacheTestCase;
import org.apache.geode.test.dunit.rules.DistributedRestoreSystemProperties;
Expand Down Expand Up @@ -83,8 +78,8 @@ private Socket createSocket() throws IOException {
}

/**
* Test that if the locator has a security manager, an authorized client is allowed to get the
* available servers
* Test that if the locator has a security manager, an authorized client is allowed to get an
* available server
*/
@Test
public void authorizedClientCanGetServersIfSecurityIsEnabled() throws Throwable {
Expand All @@ -97,9 +92,9 @@ public void authorizedClientCanGetServersIfSecurityIsEnabled() throws Throwable
.putCredentials("security-username", "cluster").putCredentials("security-password",
"cluster"))
.build());
ClientProtocol.Message getAvailableServersRequestMessage = ProtobufUtilities
.createProtobufMessage(protobufRequestBuilder.setGetAvailableServersRequest(
ProtobufRequestUtilities.createGetAvailableServersRequest()).build());
ClientProtocol.Message GetServerRequestMessage =
ProtobufUtilities.createProtobufMessage(protobufRequestBuilder
.setGetServerRequest(ProtobufRequestUtilities.createGetServerRequest()).build());

ProtobufProtocolSerializer protobufProtocolSerializer = new ProtobufProtocolSerializer();

Expand All @@ -110,14 +105,12 @@ public void authorizedClientCanGetServersIfSecurityIsEnabled() throws Throwable
protobufProtocolSerializer.deserialize(socket.getInputStream());
assertEquals(true,
authorizationResponse.getResponse().getAuthenticationResponse().getAuthenticated());
protobufProtocolSerializer.serialize(getAvailableServersRequestMessage,
socket.getOutputStream());
protobufProtocolSerializer.serialize(GetServerRequestMessage, socket.getOutputStream());

ClientProtocol.Message getAvailableServersResponseMessage =
ClientProtocol.Message GetServerResponseMessage =
protobufProtocolSerializer.deserialize(socket.getInputStream());
assertEquals("Got response: " + getAvailableServersResponseMessage, 1,
getAvailableServersResponseMessage.getResponse().getGetAvailableServersResponse()
.getServersCount());
assertTrue("Got response: " + GetServerResponseMessage,
GetServerResponseMessage.getResponse().getGetServerResponse().hasServer());
}
}

Expand All @@ -129,20 +122,19 @@ public void authorizedClientCanGetServersIfSecurityIsEnabled() throws Throwable
public void unauthorizedClientCannotGetServersIfSecurityIsEnabled() throws Throwable {
ClientProtocol.Request.Builder protobufRequestBuilder =
ProtobufUtilities.createProtobufRequestBuilder();
ClientProtocol.Message getAvailableServersRequestMessage = ProtobufUtilities
.createProtobufMessage(protobufRequestBuilder.setGetAvailableServersRequest(
ProtobufRequestUtilities.createGetAvailableServersRequest()).build());
ClientProtocol.Message getServerRequestMessage =
ProtobufUtilities.createProtobufMessage(protobufRequestBuilder
.setGetServerRequest(ProtobufRequestUtilities.createGetServerRequest()).build());

ProtobufProtocolSerializer protobufProtocolSerializer = new ProtobufProtocolSerializer();

try (Socket socket = createSocket()) {
protobufProtocolSerializer.serialize(getAvailableServersRequestMessage,
socket.getOutputStream());
protobufProtocolSerializer.serialize(getServerRequestMessage, socket.getOutputStream());

ClientProtocol.Message getAvailableServersResponseMessage =
ClientProtocol.Message getServerResponseMessage =
protobufProtocolSerializer.deserialize(socket.getInputStream());
assertNotNull("Got response: " + getAvailableServersResponseMessage,
getAvailableServersRequestMessage.getResponse().getErrorResponse());
assertNotNull("Got response: " + getServerResponseMessage,
getServerRequestMessage.getResponse().getErrorResponse());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

package org.apache.geode.internal.protocol.protobuf.v1.acceptance;

import static org.apache.geode.internal.Assert.assertTrue;
import static org.apache.geode.internal.cache.tier.CommunicationMode.ProtobufClientServerProtocol;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
Expand Down Expand Up @@ -84,9 +85,9 @@ private Socket createSocket() throws IOException {
public void testGetAvailableServersWithStats() throws Throwable {
ClientProtocol.Request.Builder protobufRequestBuilder =
ProtobufUtilities.createProtobufRequestBuilder();
ClientProtocol.Message getAvailableServersRequestMessage = ProtobufUtilities
.createProtobufMessage(protobufRequestBuilder.setGetAvailableServersRequest(
ProtobufRequestUtilities.createGetAvailableServersRequest()).build());
ClientProtocol.Message getAvailableServersRequestMessage =
ProtobufUtilities.createProtobufMessage(protobufRequestBuilder
.setGetServerRequest(ProtobufRequestUtilities.createGetServerRequest()).build());

ProtobufProtocolSerializer protobufProtocolSerializer = new ProtobufProtocolSerializer();

Expand Down Expand Up @@ -220,11 +221,10 @@ private void validateGetAvailableServersResponse(
assertEquals(ClientProtocol.Message.MessageTypeCase.RESPONSE,
getAvailableServersResponseMessage.getMessageTypeCase());
ClientProtocol.Response messageResponse = getAvailableServersResponseMessage.getResponse();
assertEquals(ClientProtocol.Response.ResponseAPICase.GETAVAILABLESERVERSRESPONSE,
assertEquals(ClientProtocol.Response.ResponseAPICase.GETSERVERRESPONSE,
messageResponse.getResponseAPICase());
LocatorAPI.GetAvailableServersResponse getAvailableServersResponse =
messageResponse.getGetAvailableServersResponse();
assertEquals(1, getAvailableServersResponse.getServersCount());
LocatorAPI.GetServerResponse getServerResponse = messageResponse.getGetServerResponse();
assertTrue(getServerResponse.hasServer());
}

private void validateStats(long messagesReceived, long messagesSent, long bytesReceived,
Expand Down
Loading

0 comments on commit cadecc2

Please sign in to comment.