Skip to content

Commit

Permalink
Merge branch 'GEODE-7049-new' into feature/GEODE-7049
Browse files Browse the repository at this point in the history
  • Loading branch information
albertogpz committed Sep 16, 2019
2 parents b8b6769 + 86477eb commit 1b7edf0
Show file tree
Hide file tree
Showing 6 changed files with 56 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,9 @@ public interface Execution<IN, OUT, AGG> {
Execution<IN, OUT, AGG> withCollector(ResultCollector<OUT, AGG> rc);

/**
* Executes the function using its {@linkplain Function#getId() id}
* Executes the function using its {@linkplain Function#getId() id}.
* When executed from a client, it blocks until all results have been received
* or the global timeout (gemfire.CLIENT_FUNCTION_TIMEOUT Java property) has expired.
* <p>
* {@link Function#execute(FunctionContext)} is called on the instance retrieved using
* {@link FunctionService#getFunction(String)} on the executing member.
Expand All @@ -110,6 +112,7 @@ public interface Execution<IN, OUT, AGG> {

/**
* Executes the function using its {@linkplain Function#getId() id} with the specified timeout.
* It blocks until all results have been received or the timeout has expired.
* <p>
* {@link Function#execute(FunctionContext)} is called on the instance retrieved using
* {@link FunctionService#getFunction(String)} on the executing member.
Expand All @@ -128,6 +131,8 @@ ResultCollector<OUT, AGG> execute(String functionId, long timeout, TimeUnit unit

/**
* Executes the function instance provided.
* When executed from a client, it blocks until all results have been received
* or the global timeout (gemfire.CLIENT_FUNCTION_TIMEOUT Java property) has expired.
* <p>
* {@link Function#execute(FunctionContext)} is called on the de-serialized instance on the
* executing member.
Expand All @@ -145,6 +150,7 @@ ResultCollector<OUT, AGG> execute(String functionId, long timeout, TimeUnit unit

/**
* Executes the function instance provided.
* It blocks until all results have been received or the timeout has expired.
* <p>
* {@link Function#execute(FunctionContext)} is called on the de-serialized instance on the
* executing member.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public synchronized void addResult(DistributedMember distributedMember,
}

/**
* Waits if necessary for the computation to complete, and then retrieves its result.<br>
* Returns the result of the execution if available.<br>
* If {@link Function#hasResult()} is false, upon calling {@link ResultCollector#getResult()}
* throws {@link FunctionException}.
*
Expand All @@ -70,12 +70,11 @@ public Object getResult() throws FunctionException {
public void endResults() {}

/**
* Waits if necessary for at most the given time for the computation to complete, and then
* retrieves its result, if available. <br>
* Returns the result of the execution if available.<br>
* If {@link Function#hasResult()} is false, upon calling {@link ResultCollector#getResult()}
* throws {@link FunctionException}.
*
* @param timeout the maximum time to wait
* @param timeout the maximum time to wait (ignored)
* @param unit the time unit of the timeout argument
* @return Object computed result
* @throws FunctionException if something goes wrong while retrieving the result
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,18 +206,21 @@ public ResultCollector execute(final Function function) {

@Override
protected ResultCollector executeFunction(Function function, long timeout, TimeUnit unit) {
if (function.hasResult()) { // have Results
if (this.rc == null) { // Default Result Collector
ResultCollector defaultCollector = new DefaultResultCollector();
return this.region.executeFunction(this, function, args, defaultCollector, this.filter,
this.sender);
} else { // Custome Result COllector
return this.region.executeFunction(this, function, args, rc, this.filter, this.sender);
}
} else { // No results
if (!function.hasResult()) {
this.region.executeFunction(this, function, args, null, this.filter, this.sender);
return new NoResult();
}
ResultCollector inRc = (rc == null) ? new DefaultResultCollector() : rc;
ResultCollector rcToReturn =
this.region.executeFunction(this, function, args, inRc, this.filter, this.sender);
if (timeout > 0) {
try {
rcToReturn.getResult(timeout, unit);
} catch (Exception exception) {
throw new FunctionException(exception);
}
}
return rcToReturn;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,16 +182,20 @@ public void validateExecution(final Function function,

@Override
protected ResultCollector executeFunction(Function function, long timeout, TimeUnit unit) {
if (function.hasResult()) {
ResultCollector rc = this.rc;
if (rc == null) {
rc = new DefaultResultCollector();
}
return executeFunction(function, rc);
} else {
if (!function.hasResult()) {
executeFunction(function, null);
return new NoResult();
}
ResultCollector inRc = (rc == null) ? new DefaultResultCollector() : rc;
ResultCollector rcToReturn = executeFunction(function, inRc);
if (timeout > 0) {
try {
rcToReturn.getResult(timeout, unit);
} catch (Exception e) {
throw new FunctionException(e);
}
}
return rcToReturn;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,16 +178,20 @@ public InternalExecution withBucketFilter(Set<Integer> bucketIDs) {

@Override
protected ResultCollector executeFunction(Function function, long timeout, TimeUnit unit) {
if (function.hasResult()) {
ResultCollector rc = this.rc;
if (rc == null) {
rc = new DefaultResultCollector();
}
return executeFunction(function, rc);
} else {
if (!function.hasResult()) {
executeFunction(function, null);
return new NoResult();
}
ResultCollector inRc = (rc == null) ? new DefaultResultCollector() : rc;
ResultCollector rcToReturn = executeFunction(function, rc);
if (timeout > 0) {
try {
rcToReturn.getResult(timeout, unit);
} catch (Exception e) {
throw new FunctionException(e);
}
}
return rcToReturn;
}

private ResultCollector executeFunction(final Function function,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,17 +211,21 @@ public PartitionedRegionFunctionExecutor(PartitionedRegion region, Set filter2,

@Override
public ResultCollector executeFunction(final Function function, long timeout, TimeUnit unit) {
if (function.hasResult()) {
if (this.rc == null) {
return this.pr.executeFunction(function, this, new DefaultResultCollector(),
this.executeOnBucketSet);
} else {
return this.pr.executeFunction(function, this, rc, this.executeOnBucketSet);
}
} else { /* NO RESULT:fire-n-forget */
if (!function.hasResult()) /* NO RESULT:fire-n-forget */ {
this.pr.executeFunction(function, this, null, this.executeOnBucketSet);
return new NoResult();
}
ResultCollector inRc = (rc == null) ? new DefaultResultCollector() : rc;
ResultCollector rcToReturn =
this.pr.executeFunction(function, this, inRc, this.executeOnBucketSet);
if (timeout > 0) {
try {
rcToReturn.getResult(timeout, unit);
} catch (Exception exception) {
throw new FunctionException(exception);
}
}
return rcToReturn;
}

@Override
Expand Down

0 comments on commit 1b7edf0

Please sign in to comment.