Skip to content

Commit

Permalink
GEODE-9774: Clear networkHop variable at function execution exit (apa…
Browse files Browse the repository at this point in the history
…che#7051)

* GEODE-9774: Clear networkHop variable at function execution exit

* GEODE-9774: Added unit test after review
  • Loading branch information
albertogpz authored Nov 5, 2021
1 parent 60c62d8 commit 4c43c53
Show file tree
Hide file tree
Showing 4 changed files with 149 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

package org.apache.geode.internal.cache.execute;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
Expand All @@ -29,6 +30,8 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import org.apache.logging.log4j.Logger;
import org.junit.Test;
Expand Down Expand Up @@ -423,6 +426,41 @@ public void testBug40714() {
PRClientServerRegionFunctionExecutionDUnitTest::FunctionExecution_Inline_Bug40714);
}

@Test
public void testSingleHopFunctionExecutionDoingNetworkHopClearsNetworkHopVariableAtExit() {
ArrayList commonAttributes =
createCommonServerAttributes(PartitionedRegionName, null, 0, null);
createClientServerScenarioSingleHop(commonAttributes, 20, 20, 20);
Function forceNetworkHopFunction =
new TestFunction(true, TestFunction.TEST_FUNCTION_SINGLE_HOP_FORCE_NETWORK_HOP);
registerFunctionAtServer(forceNetworkHopFunction);
Function getNetworkHopFunction =
new TestFunction(true, TestFunction.TEST_FUNCTION_GET_NETWORK_HOP);
registerFunctionAtServer(getNetworkHopFunction);

// Populate region and at the same time update client metadata
Set<String> keys = IntStream.rangeClosed(0, totalNumBuckets * 10).boxed().map((x) -> "" + x)
.collect(Collectors.toSet());
for (String key : keys) {
client.invoke(() -> {
cache.getRegion(PartitionedRegionName).put(key, key);
});
}

Set<String> keyFilter = Collections.singleton("1");

// Execute the function that does one hop
client.invoke(() -> executeFunctionOnPartitionedRegion(PartitionedRegionName,
TestFunction.TEST_FUNCTION_SINGLE_HOP_FORCE_NETWORK_HOP, keyFilter, keys));

// Execute the function that gets the networkHop value previously set
Object result = client.invoke(() -> executeFunctionOnPartitionedRegion(PartitionedRegionName,
TestFunction.TEST_FUNCTION_GET_NETWORK_HOP, keyFilter, new HashSet()));
int networkHop = (int) ((List) result).get(0);

assertThat(networkHop).isEqualTo(0);
}

public static void registerFunction() {
FunctionService.registerFunction(new FunctionAdapter() {
@Override
Expand Down Expand Up @@ -526,6 +564,17 @@ private static Object executeFunctionHA() {
return l;
}

public Object executeFunctionOnPartitionedRegion(String regionName, String functionName,
Set filter, Object arguments) {
Region region = cache.getRegion(regionName);
Function function = new TestFunction(true, functionName);
FunctionService.registerFunction(function);
Execution execution = FunctionService.onRegion(region);
ResultCollector rc1 =
execution.withFilter(filter).setArguments(arguments).execute(function.getId());
return rc1.getResult();
}

private static void putOperation() {
Region<String, Integer> region = cache.getRegion(PartitionedRegionName);
assertNotNull(region);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,21 +215,27 @@ public PartitionedRegionFunctionExecutor(PartitionedRegion region, Set filter2,

@Override
public ResultCollector executeFunction(final Function function, long timeout, TimeUnit unit) {
if (!function.hasResult()) /* NO RESULT:fire-n-forget */ {
this.pr.executeFunction(function, this, null, this.executeOnBucketSet);
return NO_RESULT;
}
ResultCollector inRc = (rc == null) ? new DefaultResultCollector() : rc;
ResultCollector rcToReturn =
this.pr.executeFunction(function, this, inRc, this.executeOnBucketSet);
if (timeout > 0) {
try {
rcToReturn.getResult(timeout, unit);
} catch (Exception exception) {
throw new FunctionException(exception);
try {
if (!function.hasResult()) /* NO RESULT:fire-n-forget */ {
this.pr.executeFunction(function, this, null, this.executeOnBucketSet);
return NO_RESULT;
}
ResultCollector inRc = (rc == null) ? new DefaultResultCollector() : rc;
ResultCollector rcToReturn =
this.pr.executeFunction(function, this, inRc, this.executeOnBucketSet);
if (timeout > 0) {
try {
rcToReturn.getResult(timeout, unit);
} catch (Exception exception) {
throw new FunctionException(exception);
}
}
return rcToReturn;
} finally {
if (pr.getNetworkHopType() != PartitionedRegion.NETWORK_HOP_NONE) {
pr.clearNetworkHopData();
}
}
return rcToReturn;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.geode.internal.cache.execute;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import org.junit.Test;

import org.apache.geode.cache.execute.Function;
import org.apache.geode.cache.execute.FunctionContext;
import org.apache.geode.internal.cache.PartitionedRegion;

public class PartitionedRegionFunctionExecutorTest {

@Test
public void executeThatSetsNetworkHopClearsItAtExit() {
PartitionedRegion region = mock(PartitionedRegion.class);
when(region.getNetworkHopType()).thenReturn((byte) 1);
PartitionedRegionFunctionExecutor executor = new PartitionedRegionFunctionExecutor(region);
executor.execute(new TestFunction());
verify(region, times(1)).clearNetworkHopData();
}

@Test
public void executeThatDoesNotSetNetworkHopDoesNotClearItAtExit() {
PartitionedRegion region = mock(PartitionedRegion.class);
when(region.getNetworkHopType()).thenReturn((byte) 0);
PartitionedRegionFunctionExecutor executor = new PartitionedRegionFunctionExecutor(region);
executor.execute(new TestFunction());
verify(region, never()).clearNetworkHopData();
}

private static class TestFunction implements Function {
@Override
public void execute(FunctionContext context) {}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,9 @@ public class TestFunction<T> implements Function<T>, Declarable2, DataSerializab
"executeFunctionRunningForLongTime";
public static final String TEST_FUNCTION_BUCKET_FILTER = "TestFunctionBucketFilter";
public static final String TEST_FUNCTION_NONHA_NOP = "executeFunctionNoHANop";
public static final String TEST_FUNCTION_SINGLE_HOP_FORCE_NETWORK_HOP =
"executeFunctionSingleHopForceNetworkHop";
public static final String TEST_FUNCTION_GET_NETWORK_HOP = "executeFunctionGetNetworkHop";
private static final String ID = "id";
private static final String HAVE_RESULTS = "haveResults";
private final Properties props;
Expand Down Expand Up @@ -190,6 +193,10 @@ public void execute(FunctionContext context) {
executeFunctionBucketFilter(context);
} else if (id.equals(TEST_FUNCTION_NONHA_NOP)) {
execute1(context);
} else if (id.equals(TEST_FUNCTION_SINGLE_HOP_FORCE_NETWORK_HOP)) {
executeSingleHopForceNetworkHop(context);
} else if (id.equals(TEST_FUNCTION_GET_NETWORK_HOP)) {
executeGetNetworkHop(context);
} else if (noAckTest.equals("true")) {
execute1(context);
}
Expand Down Expand Up @@ -1015,6 +1022,25 @@ private void executeWithLastResult(FunctionContext context) {
context.getResultSender().lastResult(context.getArguments());
}

private void executeSingleHopForceNetworkHop(FunctionContext context) {
RegionFunctionContext rfContext = (RegionFunctionContext) context;
final PartitionedRegion pr = (PartitionedRegion) rfContext.getDataSet();
Set keySet = (Set) rfContext.getArguments();
// Read entries from the region to provoke hops to another server
for (Object key : keySet) {
pr.get(key);
}
int networkHopType = pr.getNetworkHopType();
context.getResultSender().lastResult(networkHopType);
}

private void executeGetNetworkHop(FunctionContext context) {
RegionFunctionContext rfContext = (RegionFunctionContext) context;
final PartitionedRegion pr = (PartitionedRegion) rfContext.getDataSet();
int networkHopType = pr.getNetworkHopType();
context.getResultSender().lastResult(networkHopType);
}

/**
* Get the function identifier, used by clients to invoke this function
*
Expand Down

0 comments on commit 4c43c53

Please sign in to comment.