Skip to content

Commit

Permalink
GEODE-6488: Migrating cancellation state to execution context (apache…
Browse files Browse the repository at this point in the history
…#3322)

This work solves two problems.  One is that the query cancellation task
reference in DefaultQuery could be overwritten and thus never removed
from monitoring upon successful completion of a query.  Second is that
once a query execution timed out once, the query object was in an
unusable state which is undesirable.

The solution is to attach the cancellation state to the execution
context rather than the query object, so that cancellation is associated
with each independent execution of a query rather than having
cancellation state that applies to the entire query object.
  • Loading branch information
mcmellawatt authored Mar 20, 2019
1 parent 428b55b commit 3ffbc41
Show file tree
Hide file tree
Showing 36 changed files with 467 additions and 268 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,13 @@
import org.apache.geode.cache.query.data.Portfolio;
import org.apache.geode.cache.query.functional.StructSetOrResultsSet;
import org.apache.geode.cache.query.internal.DefaultQuery;
import org.apache.geode.cache.query.internal.ExecutionContext;
import org.apache.geode.cache.query.internal.IndexTrackingQueryObserver;
import org.apache.geode.cache.query.internal.QueryExecutionContext;
import org.apache.geode.cache.query.internal.QueryObserverHolder;
import org.apache.geode.cache30.CacheSerializableRunnable;
import org.apache.geode.distributed.ConfigurationProperties;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.LocalDataSet;
import org.apache.geode.internal.cache.PartitionedRegion;
import org.apache.geode.internal.cache.execute.PRClientServerTestBase;
Expand Down Expand Up @@ -673,9 +676,11 @@ public void execute(FunctionContext context) {

try {
Query query = queryService.newQuery((String) args[0]);
final ExecutionContext executionContext =
new QueryExecutionContext(null, (InternalCache) cache, query);
context.getResultSender()
.lastResult((ArrayList) ((SelectResults) ((LocalDataSet) localDataSet)
.executeQuery((DefaultQuery) query, null, buckets)).asList());
.executeQuery((DefaultQuery) query, executionContext, null, buckets)).asList());
} catch (Exception e) {
throw new FunctionException(e);
}
Expand Down Expand Up @@ -866,9 +871,13 @@ protected ArrayList runQueryOnServerLDS(String queryStr, Set filter) {
QueryService qservice = CacheFactory.getAnyInstance().getQueryService();

Query query = qservice.newQuery(queryStr);
final ExecutionContext executionContext =
new QueryExecutionContext(null, (InternalCache) cache, query);

SelectResults results;
try {
results = (SelectResults) ((LocalDataSet) localDataSet).executeQuery((DefaultQuery) query,
executionContext,
null, buckets);

return (ArrayList) results.asList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.apache.geode.cache.query.TypeMismatchException;
import org.apache.geode.cache.query.data.Portfolio;
import org.apache.geode.cache.query.internal.DefaultQuery;
import org.apache.geode.cache.query.internal.ExecutionContext;
import org.apache.geode.cache.server.CacheServer;
import org.apache.geode.cache30.CacheSerializableRunnable;
import org.apache.geode.cache30.ClientServerTestCase;
Expand Down Expand Up @@ -1296,7 +1297,8 @@ private static class PauseTestHook implements DefaultQuery.TestHook {
public boolean rejectedObjects = false;

@Override
public void doTestHook(final SPOTS spot, final DefaultQuery _ignored) {
public void doTestHook(final SPOTS spot, final DefaultQuery _ignored,
final ExecutionContext executionContext) {
switch (spot) {
case BEFORE_QUERY_EXECUTION:
try {
Expand Down Expand Up @@ -1327,7 +1329,8 @@ private class CancelDuringGatherHook implements DefaultQuery.TestHook {
private int numObjectsBeforeCancel = 5;

@Override
public void doTestHook(final SPOTS spot, final DefaultQuery _ignored) {
public void doTestHook(final SPOTS spot, final DefaultQuery _ignored,
final ExecutionContext executionContext) {
switch (spot) {
case LOW_MEMORY_WHEN_DESERIALIZING_STREAMINGOPERATION:
rejectedObjects = true;
Expand All @@ -1350,7 +1353,8 @@ private class CancelDuringAddResultsHook implements DefaultQuery.TestHook {
public boolean rejectedObjects = false;

@Override
public void doTestHook(final SPOTS spot, final DefaultQuery _ignored) {
public void doTestHook(final SPOTS spot, final DefaultQuery _ignored,
final ExecutionContext executionContext) {
switch (spot) {
case BEFORE_BUILD_CUMULATIVE_RESULT:
if (triggeredOOME == false) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
import org.apache.geode.cache.query.SelectResults;
import org.apache.geode.cache.query.data.Portfolio;
import org.apache.geode.cache.query.internal.DefaultQuery;
import org.apache.geode.cache.query.internal.ExecutionContext;
import org.apache.geode.cache.query.internal.QueryExecutionContext;
import org.apache.geode.internal.cache.PartitionedRegion;
import org.apache.geode.internal.cache.PartitionedRegionQueryEvaluator;
import org.apache.geode.test.dunit.VM;
Expand Down Expand Up @@ -113,6 +115,7 @@ public void testDataLossDuringQueryProcessor() throws Exception {

DefaultQuery query = (DefaultQuery) getCache().getQueryService()
.newQuery("select distinct * from " + region.getFullPath());
final ExecutionContext executionContext = new QueryExecutionContext(null, getCache(), query);
SelectResults results =
query.getSimpleSelect().getEmptyResultSet(EMPTY_PARAMETERS, getCache(), query);

Expand All @@ -124,6 +127,7 @@ public void testDataLossDuringQueryProcessor() throws Exception {
PartitionedRegion partitionedRegion = (PartitionedRegion) region;
PartitionedRegionQueryEvaluator queryEvaluator =
new PartitionedRegionQueryEvaluator(partitionedRegion.getSystem(), partitionedRegion, query,
executionContext,
EMPTY_PARAMETERS, results, buckets);

DisconnectingTestHook testHook = new DisconnectingTestHook();
Expand Down Expand Up @@ -164,13 +168,15 @@ public void testQueryResultsFromMembers() throws Exception {

for (int i = 0; i < queries.length; i++) {
DefaultQuery query = (DefaultQuery) getCache().getQueryService().newQuery(queries[i]);
final ExecutionContext executionContext =
new QueryExecutionContext(null, getCache(), query);
SelectResults results =
query.getSimpleSelect().getEmptyResultSet(EMPTY_PARAMETERS, getCache(), query);

PartitionedRegion partitionedRegion = (PartitionedRegion) region;
PartitionedRegionQueryEvaluator queryEvaluator =
new PartitionedRegionQueryEvaluator(partitionedRegion.getSystem(), partitionedRegion,
query, EMPTY_PARAMETERS, results, bucketsToQuery);
query, executionContext, EMPTY_PARAMETERS, results, bucketsToQuery);

CollatingTestHook testHook = new CollatingTestHook(queryEvaluator);
queryEvaluator.queryBuckets(testHook);
Expand Down Expand Up @@ -212,13 +218,14 @@ public void testQueryResultsFromMembersWithAccessor() throws Exception {

for (int i = 0; i < queries.length; i++) {
DefaultQuery query = (DefaultQuery) getCache().getQueryService().newQuery(queries[i]);
final ExecutionContext executionContext = new QueryExecutionContext(null, getCache(), query);
SelectResults results =
query.getSimpleSelect().getEmptyResultSet(EMPTY_PARAMETERS, getCache(), query);

PartitionedRegion partitionedRegion = (PartitionedRegion) region;
PartitionedRegionQueryEvaluator queryEvaluator =
new PartitionedRegionQueryEvaluator(partitionedRegion.getSystem(), partitionedRegion,
query, EMPTY_PARAMETERS, results, buckets);
query, executionContext, EMPTY_PARAMETERS, results, buckets);

CollatingTestHook testHook = new CollatingTestHook(queryEvaluator);
queryEvaluator.queryBuckets(testHook);
Expand Down Expand Up @@ -258,6 +265,7 @@ public void testSimulatedDataLossBeforeQueryProcessor() throws Exception {

DefaultQuery query = (DefaultQuery) getCache().getQueryService()
.newQuery("select distinct * from /" + regionName);
final ExecutionContext executionContext = new QueryExecutionContext(null, getCache(), query);
SelectResults results =
query.getSimpleSelect().getEmptyResultSet(EMPTY_PARAMETERS, getCache(), query);

Expand All @@ -270,7 +278,7 @@ public void testSimulatedDataLossBeforeQueryProcessor() throws Exception {
PartitionedRegion partitionedRegion = (PartitionedRegion) region;
PartitionedRegionQueryEvaluator queryEvaluator =
new PartitionedRegionQueryEvaluator(partitionedRegion.getSystem(), partitionedRegion,
query, EMPTY_PARAMETERS, results, buckets);
query, executionContext, EMPTY_PARAMETERS, results, buckets);

assertThatThrownBy(() -> queryEvaluator.queryBuckets(null))
.isInstanceOf(QueryException.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.geode.cache.query.SelectResults;
import org.apache.geode.cache.query.internal.DefaultQuery;
import org.apache.geode.cache.query.internal.DefaultQuery.TestHook;
import org.apache.geode.cache.query.internal.ExecutionContext;
import org.apache.geode.cache.query.internal.index.IndexManager;
import org.apache.geode.test.dunit.VM;
import org.apache.geode.test.dunit.rules.CacheRule;
Expand Down Expand Up @@ -575,7 +576,8 @@ Map<SPOTS, Boolean> getHooks() {
}

@Override
public void doTestHook(final SPOTS spot, final DefaultQuery _ignored) {
public void doTestHook(final SPOTS spot, final DefaultQuery _ignored,
final ExecutionContext executionContext) {
hooks.put(spot, Boolean.TRUE);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
import org.apache.geode.cache.query.QueryService;
import org.apache.geode.cache.query.SelectResults;
import org.apache.geode.cache.query.internal.DefaultQuery;
import org.apache.geode.cache.query.internal.ExecutionContext;
import org.apache.geode.cache.query.internal.QueryExecutionContext;
import org.apache.geode.cache.query.internal.QueryObserverAdapter;
import org.apache.geode.cache.query.internal.QueryObserverHolder;
import org.apache.geode.cache30.CacheSerializableRunnable;
Expand Down Expand Up @@ -149,6 +151,8 @@ public void execute(FunctionContext context) {
QueryService qs = pr1.getCache().getQueryService();
DefaultQuery query = (DefaultQuery) qs.newQuery(
"select distinct e1.value from /pr1 e1, /pr2 e2 where e1.value=e2.value");
final ExecutionContext executionContext =
new QueryExecutionContext(null, cache, query);

GemFireCacheImpl.getInstance().getLogger()
.fine(" Num BUCKET SET: " + localCust.getBucketSet());
Expand All @@ -175,7 +179,8 @@ public void execute(FunctionContext context) {
}

SelectResults r =
(SelectResults) localCust.executeQuery(query, null, localCust.getBucketSet());
(SelectResults) localCust.executeQuery(query, executionContext, null,
localCust.getBucketSet());

GemFireCacheImpl.getInstance().getLogger().fine("Result :" + r.asList());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.apache.geode.cache.query.data.Portfolio;
import org.apache.geode.cache.query.data.Position;
import org.apache.geode.cache.query.internal.DefaultQuery;
import org.apache.geode.cache.query.internal.ExecutionContext;
import org.apache.geode.cache.query.internal.index.IndexProtocol;
import org.apache.geode.test.junit.categories.OQLQueryTest;

Expand Down Expand Up @@ -396,7 +397,8 @@ public ScopeThreadingTestHook(int numThreads) {
}

@Override
public void doTestHook(final SPOTS spot, final DefaultQuery _ignored) {
public void doTestHook(final SPOTS spot, final DefaultQuery _ignored,
final ExecutionContext executionContext) {
if (spot == SPOTS.BEFORE_QUERY_EXECUTION) {
try {
barrier.await(8, TimeUnit.SECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,12 @@
import org.apache.geode.cache.query.data.Position;
import org.apache.geode.cache.query.data.TestData.MyValue;
import org.apache.geode.cache.query.internal.DefaultQuery;
import org.apache.geode.cache.query.internal.ExecutionContext;
import org.apache.geode.cache.query.internal.QueryExecutionContext;
import org.apache.geode.cache.query.internal.QueryObserverAdapter;
import org.apache.geode.cache.query.internal.QueryObserverHolder;
import org.apache.geode.cache.query.internal.QueryUtils;
import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.LocalDataSet;
import org.apache.geode.internal.cache.PartitionedRegion;
import org.apache.geode.test.junit.categories.OQLQueryTest;
Expand Down Expand Up @@ -442,7 +445,9 @@ public void close() {
String query =
"select distinct e1.value from /pr1 e1, " + "/pr2 e2" + " where e1.value=e2.value";
DefaultQuery cury = (DefaultQuery) CacheUtils.getQueryService().newQuery(query);
SelectResults r = (SelectResults) lds.executeQuery(cury, null, set);
final ExecutionContext executionContext =
new QueryExecutionContext(null, (InternalCache) cache, cury);
SelectResults r = (SelectResults) lds.executeQuery(cury, executionContext, null, set);

if (!observer.isIndexesUsed) {
fail("Indexes should have been used");
Expand Down Expand Up @@ -513,7 +518,9 @@ public void close() {
String query =
"select distinct e1.key from /pr1.entries e1,/pr2.entries e2" + " where e1.value=e2.value";
DefaultQuery cury = (DefaultQuery) CacheUtils.getQueryService().newQuery(query);
SelectResults r = (SelectResults) lds.executeQuery(cury, null, set);
final ExecutionContext executionContext =
new QueryExecutionContext(null, (InternalCache) cache, cury);
SelectResults r = (SelectResults) lds.executeQuery(cury, executionContext, null, set);

if (!observer.isIndexesUsed) {
fail("Indexes should have been used");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.geode.cache.RegionFactory;
import org.apache.geode.cache.RegionShortcut;
import org.apache.geode.cache.query.internal.DefaultQuery;
import org.apache.geode.cache.query.internal.ExecutionContext;
import org.apache.geode.internal.cache.LocalDataSet;
import org.apache.geode.internal.cache.PartitionedRegion;
import org.apache.geode.test.junit.categories.OQLQueryTest;
Expand Down Expand Up @@ -105,32 +106,39 @@ public void close() {}

@Test
public void testQueryExecuteWithEmptyBucketListExpectNoResults() throws Exception {
SelectResults r = (SelectResults) lds.executeQuery(queryExecutor, null, new HashSet<Integer>());
final ExecutionContext executionContext = new ExecutionContext(null, CacheUtils.getCache());
SelectResults r = (SelectResults) lds.executeQuery(queryExecutor, executionContext, null,
new HashSet<Integer>());
assertTrue("Received: A non-empty result collection, expected : Empty result collection",
r.isEmpty());
}

@Test
public void testQueryExecuteWithNullBucketListExpectNonEmptyResultSet() throws Exception {
SelectResults r = (SelectResults) lds.executeQuery(queryExecutor, null, null);
final ExecutionContext executionContext = new ExecutionContext(null, CacheUtils.getCache());
SelectResults r = (SelectResults) lds.executeQuery(queryExecutor, executionContext, null, null);
assertFalse("Received: An empty result collection, expected : Non-empty result collection",
r.isEmpty());
}

@Test
public void testQueryExecuteWithNonEmptyBucketListExpectNonEmptyResultSet() throws Exception {
final ExecutionContext executionContext = new ExecutionContext(null, CacheUtils.getCache());
int nTestBucketNumber = 15;
Set<Integer> nonEmptySet = createAndPopulateSet(nTestBucketNumber);
SelectResults r = (SelectResults) lds.executeQuery(queryExecutor, null, nonEmptySet);
SelectResults r =
(SelectResults) lds.executeQuery(queryExecutor, executionContext, null, nonEmptySet);
assertFalse("Received: An empty result collection, expected : Non-empty result collection",
r.isEmpty());
}

@Test(expected = QueryInvocationTargetException.class)
public void testQueryExecuteWithLargerBucketListThanExistingExpectQueryInvocationTargetException()
throws Exception {
final ExecutionContext executionContext = new ExecutionContext(null, CacheUtils.getCache());
int nTestBucketNumber = 45;
Set<Integer> overflowSet = createAndPopulateSet(nTestBucketNumber);
SelectResults r = (SelectResults) lds.executeQuery(queryExecutor, null, overflowSet);
SelectResults r =
(SelectResults) lds.executeQuery(queryExecutor, executionContext, null, overflowSet);
}
}
Loading

0 comments on commit 3ffbc41

Please sign in to comment.