From 4c43c53b6a9dda8e8af0eaf586ccc3b63fd408d0 Mon Sep 17 00:00:00 2001 From: Alberto Gomez Date: Fri, 5 Nov 2021 07:54:33 +0100 Subject: [PATCH] GEODE-9774: Clear networkHop variable at function execution exit (#7051) * GEODE-9774: Clear networkHop variable at function execution exit * GEODE-9774: Added unit test after review --- ...onFunctionExecutionSingleHopDUnitTest.java | 49 +++++++++++++++++ .../PartitionedRegionFunctionExecutor.java | 32 ++++++----- ...PartitionedRegionFunctionExecutorTest.java | 55 +++++++++++++++++++ .../cache/functions/TestFunction.java | 26 +++++++++ 4 files changed, 149 insertions(+), 13 deletions(-) create mode 100644 geode-core/src/test/java/org/apache/geode/internal/cache/execute/PartitionedRegionFunctionExecutorTest.java diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/execute/PRClientServerRegionFunctionExecutionSingleHopDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/execute/PRClientServerRegionFunctionExecutionSingleHopDUnitTest.java index 5f05ef081c0d..b195016c4451 100644 --- a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/execute/PRClientServerRegionFunctionExecutionSingleHopDUnitTest.java +++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/execute/PRClientServerRegionFunctionExecutionSingleHopDUnitTest.java @@ -15,6 +15,7 @@ package org.apache.geode.internal.cache.execute; +import static org.assertj.core.api.Assertions.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; @@ -29,6 +30,8 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.IntStream; import org.apache.logging.log4j.Logger; import org.junit.Test; @@ -423,6 +426,41 @@ public void testBug40714() { PRClientServerRegionFunctionExecutionDUnitTest::FunctionExecution_Inline_Bug40714); } + @Test + public void testSingleHopFunctionExecutionDoingNetworkHopClearsNetworkHopVariableAtExit() { + ArrayList commonAttributes = + createCommonServerAttributes(PartitionedRegionName, null, 0, null); + createClientServerScenarioSingleHop(commonAttributes, 20, 20, 20); + Function forceNetworkHopFunction = + new TestFunction(true, TestFunction.TEST_FUNCTION_SINGLE_HOP_FORCE_NETWORK_HOP); + registerFunctionAtServer(forceNetworkHopFunction); + Function getNetworkHopFunction = + new TestFunction(true, TestFunction.TEST_FUNCTION_GET_NETWORK_HOP); + registerFunctionAtServer(getNetworkHopFunction); + + // Populate region and at the same time update client metadata + Set keys = IntStream.rangeClosed(0, totalNumBuckets * 10).boxed().map((x) -> "" + x) + .collect(Collectors.toSet()); + for (String key : keys) { + client.invoke(() -> { + cache.getRegion(PartitionedRegionName).put(key, key); + }); + } + + Set keyFilter = Collections.singleton("1"); + + // Execute the function that does one hop + client.invoke(() -> executeFunctionOnPartitionedRegion(PartitionedRegionName, + TestFunction.TEST_FUNCTION_SINGLE_HOP_FORCE_NETWORK_HOP, keyFilter, keys)); + + // Execute the function that gets the networkHop value previously set + Object result = client.invoke(() -> executeFunctionOnPartitionedRegion(PartitionedRegionName, + TestFunction.TEST_FUNCTION_GET_NETWORK_HOP, keyFilter, new HashSet())); + int networkHop = (int) ((List) result).get(0); + + assertThat(networkHop).isEqualTo(0); + } + public static void registerFunction() { FunctionService.registerFunction(new FunctionAdapter() { @Override @@ -526,6 +564,17 @@ private static Object executeFunctionHA() { return l; } + public Object executeFunctionOnPartitionedRegion(String regionName, String functionName, + Set filter, Object arguments) { + Region region = cache.getRegion(regionName); + Function function = new TestFunction(true, functionName); + FunctionService.registerFunction(function); + Execution execution = FunctionService.onRegion(region); + ResultCollector rc1 = + execution.withFilter(filter).setArguments(arguments).execute(function.getId()); + return rc1.getResult(); + } + private static void putOperation() { Region region = cache.getRegion(PartitionedRegionName); assertNotNull(region); diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/PartitionedRegionFunctionExecutor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/PartitionedRegionFunctionExecutor.java index 370641800247..5b1186c5c69d 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/PartitionedRegionFunctionExecutor.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/PartitionedRegionFunctionExecutor.java @@ -215,21 +215,27 @@ public PartitionedRegionFunctionExecutor(PartitionedRegion region, Set filter2, @Override public ResultCollector executeFunction(final Function function, long timeout, TimeUnit unit) { - if (!function.hasResult()) /* NO RESULT:fire-n-forget */ { - this.pr.executeFunction(function, this, null, this.executeOnBucketSet); - return NO_RESULT; - } - ResultCollector inRc = (rc == null) ? new DefaultResultCollector() : rc; - ResultCollector rcToReturn = - this.pr.executeFunction(function, this, inRc, this.executeOnBucketSet); - if (timeout > 0) { - try { - rcToReturn.getResult(timeout, unit); - } catch (Exception exception) { - throw new FunctionException(exception); + try { + if (!function.hasResult()) /* NO RESULT:fire-n-forget */ { + this.pr.executeFunction(function, this, null, this.executeOnBucketSet); + return NO_RESULT; + } + ResultCollector inRc = (rc == null) ? new DefaultResultCollector() : rc; + ResultCollector rcToReturn = + this.pr.executeFunction(function, this, inRc, this.executeOnBucketSet); + if (timeout > 0) { + try { + rcToReturn.getResult(timeout, unit); + } catch (Exception exception) { + throw new FunctionException(exception); + } + } + return rcToReturn; + } finally { + if (pr.getNetworkHopType() != PartitionedRegion.NETWORK_HOP_NONE) { + pr.clearNetworkHopData(); } } - return rcToReturn; } @Override diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/execute/PartitionedRegionFunctionExecutorTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/execute/PartitionedRegionFunctionExecutorTest.java new file mode 100644 index 000000000000..98f4ba506c62 --- /dev/null +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/execute/PartitionedRegionFunctionExecutorTest.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.geode.internal.cache.execute; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import org.junit.Test; + +import org.apache.geode.cache.execute.Function; +import org.apache.geode.cache.execute.FunctionContext; +import org.apache.geode.internal.cache.PartitionedRegion; + +public class PartitionedRegionFunctionExecutorTest { + + @Test + public void executeThatSetsNetworkHopClearsItAtExit() { + PartitionedRegion region = mock(PartitionedRegion.class); + when(region.getNetworkHopType()).thenReturn((byte) 1); + PartitionedRegionFunctionExecutor executor = new PartitionedRegionFunctionExecutor(region); + executor.execute(new TestFunction()); + verify(region, times(1)).clearNetworkHopData(); + } + + @Test + public void executeThatDoesNotSetNetworkHopDoesNotClearItAtExit() { + PartitionedRegion region = mock(PartitionedRegion.class); + when(region.getNetworkHopType()).thenReturn((byte) 0); + PartitionedRegionFunctionExecutor executor = new PartitionedRegionFunctionExecutor(region); + executor.execute(new TestFunction()); + verify(region, never()).clearNetworkHopData(); + } + + private static class TestFunction implements Function { + @Override + public void execute(FunctionContext context) {} + } +} diff --git a/geode-dunit/src/main/java/org/apache/geode/internal/cache/functions/TestFunction.java b/geode-dunit/src/main/java/org/apache/geode/internal/cache/functions/TestFunction.java index b488763eb71b..97cb119a5658 100755 --- a/geode-dunit/src/main/java/org/apache/geode/internal/cache/functions/TestFunction.java +++ b/geode-dunit/src/main/java/org/apache/geode/internal/cache/functions/TestFunction.java @@ -96,6 +96,9 @@ public class TestFunction implements Function, Declarable2, DataSerializab "executeFunctionRunningForLongTime"; public static final String TEST_FUNCTION_BUCKET_FILTER = "TestFunctionBucketFilter"; public static final String TEST_FUNCTION_NONHA_NOP = "executeFunctionNoHANop"; + public static final String TEST_FUNCTION_SINGLE_HOP_FORCE_NETWORK_HOP = + "executeFunctionSingleHopForceNetworkHop"; + public static final String TEST_FUNCTION_GET_NETWORK_HOP = "executeFunctionGetNetworkHop"; private static final String ID = "id"; private static final String HAVE_RESULTS = "haveResults"; private final Properties props; @@ -190,6 +193,10 @@ public void execute(FunctionContext context) { executeFunctionBucketFilter(context); } else if (id.equals(TEST_FUNCTION_NONHA_NOP)) { execute1(context); + } else if (id.equals(TEST_FUNCTION_SINGLE_HOP_FORCE_NETWORK_HOP)) { + executeSingleHopForceNetworkHop(context); + } else if (id.equals(TEST_FUNCTION_GET_NETWORK_HOP)) { + executeGetNetworkHop(context); } else if (noAckTest.equals("true")) { execute1(context); } @@ -1015,6 +1022,25 @@ private void executeWithLastResult(FunctionContext context) { context.getResultSender().lastResult(context.getArguments()); } + private void executeSingleHopForceNetworkHop(FunctionContext context) { + RegionFunctionContext rfContext = (RegionFunctionContext) context; + final PartitionedRegion pr = (PartitionedRegion) rfContext.getDataSet(); + Set keySet = (Set) rfContext.getArguments(); + // Read entries from the region to provoke hops to another server + for (Object key : keySet) { + pr.get(key); + } + int networkHopType = pr.getNetworkHopType(); + context.getResultSender().lastResult(networkHopType); + } + + private void executeGetNetworkHop(FunctionContext context) { + RegionFunctionContext rfContext = (RegionFunctionContext) context; + final PartitionedRegion pr = (PartitionedRegion) rfContext.getDataSet(); + int networkHopType = pr.getNetworkHopType(); + context.getResultSender().lastResult(networkHopType); + } + /** * Get the function identifier, used by clients to invoke this function *