Skip to content

Commit

Permalink
GEODE-3127: add function execution to new client protocol. (apache#1357)
Browse files Browse the repository at this point in the history
* GEODE-3127: add function execution to new client protocol.

This only supports function execution on a region so far. It assumes the
default result collector, which returns a list of results, so the response message returns a list of results.

* MessageExecutionContext uses InternalCache instead of Cache
* Remove super.setUp calls from operation handlers because the
  @before annotation already ensures these are called before the
  @before methods of child classes.
  • Loading branch information
galen-pivotal authored Feb 2, 2018
1 parent b22db64 commit 6011e09
Show file tree
Hide file tree
Showing 22 changed files with 730 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
package org.apache.geode.internal.cache.client.protocol;

import org.apache.geode.StatisticsFactory;
import org.apache.geode.cache.Cache;
import org.apache.geode.distributed.internal.InternalLocator;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.security.SecurityService;

/**
Expand All @@ -32,7 +32,8 @@ public interface ClientProtocolService {
* handshake has happened.
*
*/
ClientProtocolProcessor createProcessorForCache(Cache cache, SecurityService securityService);
ClientProtocolProcessor createProcessorForCache(InternalCache cache,
SecurityService securityService);

/**
* Create a locator processor. The locator does not currently provide any authentication.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import "v1/region_API.proto";
import "v1/locator_API.proto";
import "v1/basicTypes.proto";
import "v1/connection_API.proto";
import "v1/function_API.proto";

message Message {
oneof messageType {
Expand All @@ -46,6 +47,8 @@ message Request {
GetRegionNamesRequest getRegionNamesRequest = 41;
GetRegionRequest getRegionRequest = 42;

ExecuteFunctionOnRegionRequest executeFunctionOnRegionRequest = 43;

AuthenticationRequest authenticationRequest = 100;
}
}
Expand All @@ -63,6 +66,8 @@ message Response {
GetRegionNamesResponse getRegionNamesResponse = 41;
GetRegionResponse getRegionResponse = 42;

ExecuteFunctionOnRegionResponse executeFunctionOnRegionResponse= 43;

AuthenticationResponse authenticationResponse = 100;

ErrorResponse errorResponse = 1000;
Expand Down
29 changes: 29 additions & 0 deletions geode-protobuf-messages/src/main/proto/v1/function_API.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
syntax = "proto3";
package org.apache.geode.internal.protocol.protobuf.v1;

import "v1/basicTypes.proto";

message ExecuteFunctionOnRegionRequest {
string functionID = 1;
string region = 2;
EncodedValue arguments = 3;
repeated EncodedValue keyFilter = 4;
}

message ExecuteFunctionOnRegionResponse {
repeated EncodedValue results = 1; // some functions don't return arguments.
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@


import org.apache.geode.annotations.Experimental;
import org.apache.geode.cache.Cache;
import org.apache.geode.distributed.Locator;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.exception.InvalidExecutionContextException;
import org.apache.geode.internal.protocol.protobuf.statistics.ClientStatistics;
import org.apache.geode.internal.protocol.protobuf.v1.state.ProtobufConnectionStateProcessor;
Expand All @@ -41,7 +41,7 @@ public LocatorMessageExecutionContext(Locator locator, ClientStatistics statisti
* @throws InvalidExecutionContextException if there is no cache available
*/
@Override
public Cache getCache() throws InvalidExecutionContextException {
public InternalCache getCache() throws InvalidExecutionContextException {
setConnectionStateProcessor(new ProtobufConnectionTerminatingStateProcessor());
throw new InvalidExecutionContextException(
"Operations on the locator should not to try to operate on a server");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
package org.apache.geode.internal.protocol.protobuf.v1;

import org.apache.geode.annotations.Experimental;
import org.apache.geode.cache.Cache;
import org.apache.geode.distributed.Locator;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.exception.InvalidExecutionContextException;
import org.apache.geode.internal.protocol.protobuf.statistics.ClientStatistics;
import org.apache.geode.internal.protocol.protobuf.v1.state.ProtobufConnectionStateProcessor;
Expand All @@ -36,7 +36,7 @@ public ProtobufConnectionStateProcessor getConnectionStateProcessor() {
return protobufConnectionStateProcessor;
}

public abstract Cache getCache() throws InvalidExecutionContextException;
public abstract InternalCache getCache() throws InvalidExecutionContextException;

public abstract Locator getLocator() throws InvalidExecutionContextException;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
package org.apache.geode.internal.protocol.protobuf.v1;

import org.apache.geode.StatisticsFactory;
import org.apache.geode.cache.Cache;
import org.apache.geode.distributed.internal.InternalLocator;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.client.protocol.ClientProtocolProcessor;
import org.apache.geode.internal.cache.client.protocol.ClientProtocolService;
import org.apache.geode.internal.protocol.protobuf.ProtocolVersion;
Expand All @@ -38,7 +38,7 @@ public synchronized void initializeStatistics(String statisticsName, StatisticsF
}

@Override
public ClientProtocolProcessor createProcessorForCache(Cache cache,
public ClientProtocolProcessor createProcessorForCache(InternalCache cache,
SecurityService securityService) {
assert (statistics != null);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,15 @@ public ProtobufSerializationService() {}
* @param value the value to be encoded
*
* @return EncodedValue message with the serialized value
* @throws EncodingException
*/
@Override
public BasicTypes.EncodedValue encode(Object value) throws EncodingException {
if (value == null) {
return BasicTypes.EncodedValue.getDefaultInstance();
}

BasicTypes.EncodedValue.Builder builder = BasicTypes.EncodedValue.newBuilder();

try {
ProtobufEncodingTypes protobufEncodingTypes = ProtobufEncodingTypes.valueOf(value.getClass());
switch (protobufEncodingTypes) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,19 @@


import org.apache.geode.annotations.Experimental;
import org.apache.geode.cache.Cache;
import org.apache.geode.distributed.Locator;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.exception.InvalidExecutionContextException;
import org.apache.geode.internal.protocol.protobuf.statistics.ClientStatistics;
import org.apache.geode.internal.protocol.protobuf.v1.state.ProtobufConnectionStateProcessor;

@Experimental
public class ServerMessageExecutionContext extends MessageExecutionContext {
private final Cache cache;
private final InternalCache cache;

public ServerMessageExecutionContext(Cache cache, ClientStatistics statistics,
ProtobufConnectionStateProcessor initialProtobufConnectionStateProcessor) {
super(statistics, initialProtobufConnectionStateProcessor);
public ServerMessageExecutionContext(InternalCache cache, ClientStatistics statistics,
ProtobufConnectionStateProcessor initialConnectionStateProcessor) {
super(statistics, initialConnectionStateProcessor);
this.cache = cache;
}

Expand All @@ -40,7 +40,7 @@ public ServerMessageExecutionContext(Cache cache, ClientStatistics statistics,
* @throws InvalidExecutionContextException if there is no cache available
*/
@Override
public Cache getCache() throws InvalidExecutionContextException {
public InternalCache getCache() throws InvalidExecutionContextException {
return cache;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.protocol.protobuf.v1.operations;

import java.util.HashSet;
import java.util.List;
import java.util.Set;

import org.apache.logging.log4j.Logger;

import org.apache.geode.cache.Region;
import org.apache.geode.cache.execute.Execution;
import org.apache.geode.cache.execute.Function;
import org.apache.geode.cache.execute.FunctionException;
import org.apache.geode.cache.execute.FunctionService;
import org.apache.geode.cache.execute.ResultCollector;
import org.apache.geode.internal.exception.InvalidExecutionContextException;
import org.apache.geode.internal.logging.LogService;
import org.apache.geode.internal.protocol.operations.ProtobufOperationHandler;
import org.apache.geode.internal.protocol.protobuf.v1.BasicTypes;
import org.apache.geode.internal.protocol.protobuf.v1.ClientProtocol;
import org.apache.geode.internal.protocol.protobuf.v1.Failure;
import org.apache.geode.internal.protocol.protobuf.v1.FunctionAPI;
import org.apache.geode.internal.protocol.protobuf.v1.MessageExecutionContext;
import org.apache.geode.internal.protocol.protobuf.v1.ProtobufSerializationService;
import org.apache.geode.internal.protocol.protobuf.v1.Result;
import org.apache.geode.internal.protocol.protobuf.v1.Success;
import org.apache.geode.internal.protocol.protobuf.v1.serialization.exception.EncodingException;
import org.apache.geode.internal.protocol.protobuf.v1.state.exception.ConnectionStateException;
import org.apache.geode.internal.security.SecurityService;
import org.apache.geode.security.NotAuthorizedException;

public class ExecuteFunctionOnRegionRequestOperationHandler implements
ProtobufOperationHandler<FunctionAPI.ExecuteFunctionOnRegionRequest, FunctionAPI.ExecuteFunctionOnRegionResponse> {
@Override
public Result<FunctionAPI.ExecuteFunctionOnRegionResponse, ClientProtocol.ErrorResponse> process(
ProtobufSerializationService serializationService,
FunctionAPI.ExecuteFunctionOnRegionRequest request,
MessageExecutionContext messageExecutionContext)
throws InvalidExecutionContextException, ConnectionStateException {

final String functionID = request.getFunctionID();
final String regionName = request.getRegion();

final Function<?> function = FunctionService.getFunction(functionID);
if (function == null) {
return Failure.of(ClientProtocol.ErrorResponse.newBuilder()
.setError(BasicTypes.Error.newBuilder().setErrorCode(BasicTypes.ErrorCode.INVALID_REQUEST)
.setMessage("Function with ID \"" + functionID + "\" not found").build())
.build());
}

final Region<Object, Object> region = messageExecutionContext.getCache().getRegion(regionName);
if (region == null) {
return Failure.of(ClientProtocol.ErrorResponse.newBuilder()
.setError(BasicTypes.Error.newBuilder().setErrorCode(BasicTypes.ErrorCode.INVALID_REQUEST)
.setMessage("Region \"" + regionName + "\" not found"))
.build());
}

final SecurityService securityService = messageExecutionContext.getCache().getSecurityService();

try {
// check security for function.
function.getRequiredPermissions(regionName).forEach(securityService::authorize);
} catch (NotAuthorizedException ex) {
return Failure.of(ClientProtocol.ErrorResponse.newBuilder()
.setError(BasicTypes.Error.newBuilder()
.setMessage("Authorization failed for function \"" + functionID + "\"")
.setErrorCode(BasicTypes.ErrorCode.AUTHORIZATION_FAILED))
.build());
}

try {
Execution execution = FunctionService.onRegion(region);

final Object arguments = serializationService.decode(request.getArguments());

if (arguments != null) {
execution = execution.setArguments(arguments);
}

execution = execution.withFilter(parseFilter(serializationService, request));

final ResultCollector<Object, List<Object>> resultCollector = execution.execute(functionID);

if (function.hasResult()) {
List<Object> results = resultCollector.getResult();

final FunctionAPI.ExecuteFunctionOnRegionResponse.Builder responseMessage =
FunctionAPI.ExecuteFunctionOnRegionResponse.newBuilder();
for (Object result : results) {
responseMessage.addResults(serializationService.encode(result));
}
return Success.of(responseMessage.build());
} else {
// This is fire and forget.
return Success.of(FunctionAPI.ExecuteFunctionOnRegionResponse.newBuilder().build());
}
} catch (FunctionException ex) {
return Failure.of(ClientProtocol.ErrorResponse.newBuilder()
.setError(BasicTypes.Error.newBuilder().setErrorCode(BasicTypes.ErrorCode.SERVER_ERROR)
.setMessage("Function execution failed: " + ex.toString()))
.build());
} catch (EncodingException ex) {
return Failure.of(ClientProtocol.ErrorResponse.newBuilder()
.setError(BasicTypes.Error.newBuilder().setErrorCode(BasicTypes.ErrorCode.SERVER_ERROR)
.setMessage("Encoding failed: " + ex.toString()))
.build());
}
}

private Set<Object> parseFilter(ProtobufSerializationService serializationService,
FunctionAPI.ExecuteFunctionOnRegionRequest request) throws EncodingException {
List<BasicTypes.EncodedValue> encodedFilter = request.getKeyFilterList();
Set<Object> filter = new HashSet<>();

for (BasicTypes.EncodedValue filterKey : encodedFilter) {
filter.add(serializationService.decode(filterKey));
}
return filter;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.geode.internal.protocol.protobuf.v1.ClientProtocol;
import org.apache.geode.internal.protocol.protobuf.v1.ClientProtocol.Request.RequestAPICase;
import org.apache.geode.internal.protocol.protobuf.v1.ProtobufOperationContext;
import org.apache.geode.internal.protocol.protobuf.v1.operations.ExecuteFunctionOnRegionRequestOperationHandler;
import org.apache.geode.internal.protocol.protobuf.v1.operations.GetAllRequestOperationHandler;
import org.apache.geode.internal.protocol.protobuf.v1.operations.GetRegionNamesRequestOperationHandler;
import org.apache.geode.internal.protocol.protobuf.v1.operations.GetRegionRequestOperationHandler;
Expand Down Expand Up @@ -51,8 +52,7 @@ private void addContexts() {
new ProtobufOperationContext<>(ClientProtocol.Request::getAuthenticationRequest,
new AuthenticationRequestOperationHandler(),
opsResp -> ClientProtocol.Response.newBuilder().setAuthenticationResponse(opsResp),
new ResourcePermission(ResourcePermission.Resource.DATA,
ResourcePermission.Operation.READ)));
new ResourcePermission(ResourcePermission.NULL, ResourcePermission.NULL)));

operationContexts.put(RequestAPICase.GETREQUEST,
new ProtobufOperationContext<>(ClientProtocol.Request::getGetRequest,
Expand Down Expand Up @@ -109,5 +109,14 @@ private void addContexts() {
opsResp -> ClientProtocol.Response.newBuilder().setGetServerResponse(opsResp),
new ResourcePermission(ResourcePermission.Resource.CLUSTER,
ResourcePermission.Operation.READ)));

operationContexts.put(RequestAPICase.EXECUTEFUNCTIONONREGIONREQUEST,
new ProtobufOperationContext<>(ClientProtocol.Request::getExecuteFunctionOnRegionRequest,
new ExecuteFunctionOnRegionRequestOperationHandler(),
opsResp -> ClientProtocol.Response.newBuilder()
.setExecuteFunctionOnRegionResponse(opsResp),
// Resource permissions get handled per-function, since they have varying permission
// requirements.
new ResourcePermission(ResourcePermission.NULL, ResourcePermission.NULL)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,16 @@
*/
package org.apache.geode.internal.protocol;

import org.apache.geode.cache.Cache;
import org.apache.geode.distributed.internal.InternalLocator;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.protocol.protobuf.statistics.NoOpStatistics;
import org.apache.geode.internal.protocol.protobuf.v1.LocatorMessageExecutionContext;
import org.apache.geode.internal.protocol.protobuf.v1.MessageExecutionContext;
import org.apache.geode.internal.protocol.protobuf.v1.ServerMessageExecutionContext;
import org.apache.geode.internal.protocol.protobuf.v1.state.NoSecurityProtobufConnectionStateProcessor;

public class TestExecutionContext {
public static MessageExecutionContext getNoAuthCacheExecutionContext(Cache cache) {
public static MessageExecutionContext getNoAuthCacheExecutionContext(InternalCache cache) {
return new ServerMessageExecutionContext(cache, new NoOpStatistics(),
new NoSecurityProtobufConnectionStateProcessor());
}
Expand Down
Loading

0 comments on commit 6011e09

Please sign in to comment.