Skip to content

Commit

Permalink
GEODE-6293: Fix fire & forget functions in gfsh (apache#3138)
Browse files Browse the repository at this point in the history
* GEODE-6293: Fix fire & forget functions in gfsh

- Fixed minor warnings.
- Refactored class `UserFunctionExecution`.
- Added unit tests for class `UserFunctionExecution`.
- Class `UserFunctionExecution` now supports the execution of functions
  that don't return any results.
  • Loading branch information
jujoramos authored Jan 31, 2019
1 parent 9c35fc7 commit 9fa4dc2
Show file tree
Hide file tree
Showing 7 changed files with 502 additions and 91 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/

package org.apache.geode.management.internal.cli.commands;

import static org.assertj.core.api.Assertions.assertThat;
Expand All @@ -32,37 +31,39 @@
import org.apache.geode.cache.execute.FunctionContext;
import org.apache.geode.cache.execute.FunctionService;
import org.apache.geode.cache.execute.RegionFunctionContext;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.test.dunit.rules.ClusterStartupRule;
import org.apache.geode.test.dunit.rules.MemberVM;
import org.apache.geode.test.junit.assertions.CommandResultAssert;
import org.apache.geode.test.junit.assertions.TabularResultModelAssert;
import org.apache.geode.test.junit.categories.GfshTest;
import org.apache.geode.test.junit.rules.GfshCommandRule;

@Category({GfshTest.class})
@Category(GfshTest.class)
public class ExecuteFunctionCommandDUnitTest {
@ClassRule
public static ClusterStartupRule cluster = new ClusterStartupRule();

@ClassRule
public static GfshCommandRule gfsh = new GfshCommandRule();

private static MemberVM locator, server1, server2, server3;
private static final String functionId = "genericFunctionId";

private static String command = "execute function --id=" + functionId + " ";

@BeforeClass
public static void setUpClass() throws Exception {
locator = cluster.startLocatorVM(0);
MemberVM locator = cluster.startLocatorVM(0);
gfsh.connectAndVerify(locator);

server1 = cluster.startServerVM(1, "group1", locator.getPort());
server2 = cluster.startServerVM(2, "group1", locator.getPort());
server3 = cluster.startServerVM(3, "group2", locator.getPort());
MemberVM.invokeInEveryMember(() -> {
FunctionService.registerFunction(new GenericFunctionOp(functionId));
}, server1, server2, server3);
MemberVM server1 = cluster.startServerVM(1, "group1", locator.getPort());
MemberVM server2 = cluster.startServerVM(2, "group1", locator.getPort());
MemberVM server3 = cluster.startServerVM(3, "group2", locator.getPort());
MemberVM.invokeInEveryMember(
() -> FunctionService.registerFunction(new GenericFunctionOp(functionId)), server1, server2,
server3);
MemberVM.invokeInEveryMember(
() -> FunctionService.registerFunction(new FireAndForgetFunction()), server1, server2,
server3);

// create a partitioned region on only group1
gfsh.executeAndAssertThat(
Expand All @@ -73,7 +74,9 @@ public static void setUpClass() throws Exception {
locator.waitUntilRegionIsReadyOnExactlyThisManyServers("/regionA", 2);

server1.invoke(() -> {
Region region = ClusterStartupRule.getCache().getRegion("/regionA");
InternalCache cache = ClusterStartupRule.getCache();
assertThat(cache).isNotNull();
Region<String, String> region = cache.getRegion("/regionA");
region.put("a", "a");
region.put("b", "b");
});
Expand Down Expand Up @@ -267,7 +270,18 @@ public void withArgumentAndResultCollector() {
"[GENERICFUNCTIONID-ARGUMENTS]", "[GENERICFUNCTIONID-ARGUMENTS]");
}

@Test
public void functionWithNoResults() {
TabularResultModelAssert tableAssert =
gfsh.executeAndAssertThat("execute function --id=FireAndForget").statusIsSuccess()
.hasTableSection().hasRowSize(3).hasColumnSize(3);

tableAssert.hasColumn("Member").containsExactlyInAnyOrder("server-1", "server-2", "server-3");
tableAssert.hasColumn("Status").containsExactlyInAnyOrder("OK", "OK", "OK");
tableAssert.hasColumn("Message").containsExactlyInAnyOrder("[]", "[]", "[]");
}

@SuppressWarnings("unused")
public static class MyPartitionResolver implements FixedPartitionResolver {
@Override
public String getPartitionName(final EntryOperation opDetails,
Expand All @@ -291,7 +305,6 @@ public void close() {
}
}


public static class GenericFunctionOp implements Function {
private String functionId;

Expand All @@ -300,6 +313,7 @@ public static class GenericFunctionOp implements Function {
}

@Override
@SuppressWarnings("unchecked")
public void execute(FunctionContext context) {
String filter = null;
if (context instanceof RegionFunctionContext) {
Expand Down Expand Up @@ -330,4 +344,30 @@ public String getId() {
return functionId;
}
}


public static class FireAndForgetFunction implements Function {

FireAndForgetFunction() {}

@Override
public String getId() {
return "FireAndForget";
}

@Override
public boolean isHA() {
return false;
}

@Override
public boolean hasResult() {
return false;
}

@Override
public void execute(FunctionContext context) {
// Do Nothing.
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
import org.apache.geode.test.junit.categories.SecurityTest;
import org.apache.geode.test.junit.rules.GfshCommandRule;

@Category({SecurityTest.class})
@Category(SecurityTest.class)
public class ExecuteFunctionCommandSecurityTest implements Serializable {

@ClassRule
Expand All @@ -54,28 +54,29 @@ public class ExecuteFunctionCommandSecurityTest implements Serializable {
@Rule
public GfshCommandRule gfsh = new GfshCommandRule();

private static MemberVM locator, server1, server2;
private static MemberVM locator;

private static String REPLICATED_REGION = "replicatedRegion";
private static String PARTITIONED_REGION = "partitionedRegion";

@BeforeClass
public static void beforeClass() throws Exception {
public static void beforeClass() {
Properties locatorProps = new Properties();
locatorProps.setProperty(SECURITY_MANAGER, SimpleTestSecurityManager.class.getName());
locator = lsRule.startLocatorVM(0, locatorProps);

Properties serverProps = new Properties();
serverProps.setProperty(ResourceConstants.USER_NAME, "clusterManage");
serverProps.setProperty(ResourceConstants.PASSWORD, "clusterManage");
server1 = lsRule.startServerVM(1, serverProps, locator.getPort());
server2 = lsRule.startServerVM(2, serverProps, locator.getPort());
MemberVM server1 = lsRule.startServerVM(1, serverProps, locator.getPort());
MemberVM server2 = lsRule.startServerVM(2, serverProps, locator.getPort());

Stream.of(server1, server2).forEach(server -> server.invoke(() -> {
FunctionService.registerFunction(new ReadFunction());
FunctionService.registerFunction(new WriteFunction());

InternalCache cache = ClusterStartupRule.getCache();
assertThat(cache).isNotNull();
cache.createRegionFactory(RegionShortcut.REPLICATE).create(REPLICATED_REGION);
cache.createRegionFactory(RegionShortcut.PARTITION).create(PARTITIONED_REGION);
}));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,10 +157,8 @@ public static void setupClass() {

@Test
@ConnectionConfiguration(user = "user", password = "user")
public void functionRequireExpectedPermission() throws Exception {
functionStringMap.entrySet().stream().forEach(entry -> {
Function function = entry.getKey();
String permission = entry.getValue();
public void functionRequireExpectedPermission() {
functionStringMap.forEach((function, permission) -> {
System.out.println("function: " + function.getId() + ", permission: " + permission);
gfsh.executeAndAssertThat("execute function --id=" + function.getId())
.tableHasRowCount(RESULT_HEADER, 1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public ResultModel executeFunction(
}

if (dsMembers.size() == 0) {
return new ResultModel().createError("No members found.");
return ResultModel.createError("No members found.");
}

// Build up our argument list
Expand Down Expand Up @@ -113,6 +113,7 @@ public ResultModel executeFunction(
return ResultModel.createMemberStatusResult(results, false, false);
}

@SuppressWarnings("unused")
public static class ExecuteFunctionCommandInterceptor implements CliAroundInterceptor {
@Override
public ResultModel preExecution(GfshParseResult parseResult) {
Expand All @@ -126,11 +127,11 @@ public ResultModel preExecution(GfshParseResult parseResult) {

ResultModel result = new ResultModel();
if (moreThanOne) {
return result.createError(CliStrings.EXECUTE_FUNCTION__MSG__OPTIONS);
return ResultModel.createError(CliStrings.EXECUTE_FUNCTION__MSG__OPTIONS);
}

if (onRegion == null && filter != null) {
return result.createError(
return ResultModel.createError(
CliStrings.EXECUTE_FUNCTION__MSG__MEMBER_SHOULD_NOT_HAVE_FILTER_FOR_EXECUTION);
}

Expand Down
Loading

0 comments on commit 9fa4dc2

Please sign in to comment.