Skip to content

Commit

Permalink
Merge pull request apache#1051 from afs/jena2141-timeout
Browse files Browse the repository at this point in the history
JENA-2141: Additional timeout flag used for setup time
  • Loading branch information
afs authored Aug 21, 2021
2 parents 0208239 + 6d57b06 commit d1111d7
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 15 deletions.
19 changes: 13 additions & 6 deletions jena-arq/src/main/java/org/apache/jena/sparql/ARQConstants.java
Original file line number Diff line number Diff line change
Expand Up @@ -189,10 +189,10 @@ public class ARQConstants
//public static final String allocGlobalVarMarker = allocVarMarker+globalVar ; // VarAlloc
public static final String allocPathVariables = allocVarAnonMarker+"P" ; // PathCompiler
public static final String allocQueryVariables = allocVarMarker ; // Query

/** Marker for RDF-star variables */
public static final String allocVarTripleTerm = "~"; // RX, SolverRX

public static final String allocParserAnonVars = allocVarAnonMarker ; // LabelToModeMap
// SSE
public static final String allocSSEUnamedVars = "_" ; // ParseHandlerPlain - SSE token "?" - legal SPARQL
Expand Down Expand Up @@ -220,7 +220,7 @@ public class ARQConstants
public static final Symbol sysCurrentDataset = Symbol.create(systemVarNS+"dataset") ;

public static final Symbol sysVarAllocRDFStar = Symbol.create(systemVarNS+"varAllocRDFStar") ;

/** Context key for the dataset description (if any).
* See the <a href="http://www.w3.org/TR/sparql11-protocol">SPARQL protocol</a>.
* <p>
Expand Down Expand Up @@ -258,13 +258,20 @@ public class ARQConstants
public static final Symbol sysVarAllocAnon = Symbol.create(systemVarNS+"namedVarAnon") ;

/** Graphs forming the default graph (List&lt;String&gt;) (Dynamic dataset) */
public static final Symbol symDatasetDefaultGraphs = SystemARQ.allocSymbol("datasetDefaultGraphs") ;
public static final Symbol symDatasetDefaultGraphs = SystemARQ.allocSymbol("datasetDefaultGraphs") ;

/** Graphs forming the named graphs (List&lt;String&gt;) (Dynamic dataset) */
public static final Symbol symDatasetNamedGraphs = SystemARQ.allocSymbol("datasetNamedGraphs") ;
public static final Symbol symDatasetNamedGraphs = SystemARQ.allocSymbol("datasetNamedGraphs") ;

/** Context symbol for a supplied {@link Prologue} (used for text out of result sets). */
public static final Symbol symPrologue = SystemARQ.allocSymbol("prologue");
public static final Symbol symPrologue = SystemARQ.allocSymbol("prologue");

/**
* Internal use context symbol for an AtomicBoolean to signal that a query has been cancelled.
* Used by {@code QueryExecutionMain} and {@code QueryIterProcessBinding}.
* JENA-2141.
*/
public static final Symbol symCancelQuery = SystemARQ.allocSymbol("cancelQuery");

/** Context key for making all SELECT queries have DISTINCT applied, whether stated or not */
public static final Symbol autoDistinct = SystemARQ.allocSymbol("autoDistinct") ;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -451,9 +451,18 @@ private static boolean isTimeoutSet(long x) {
}

class TimeoutCallback implements Runnable {
private final AtomicBoolean cancelSignal;

public TimeoutCallback(AtomicBoolean cancelSignal) {
this.cancelSignal = cancelSignal;
}

@Override
public void run() {
synchronized (lockTimeout) {
if ( cancelSignal != null )
cancelSignal.set(true);

// Abort query if and only if we are the expected callback.
// If the first row has appeared, and we are removing timeout1
// callback,
Expand Down Expand Up @@ -484,7 +493,7 @@ protected Binding moveToNextBinding()
// So nearly not needed.
synchronized(lockTimeout)
{
TimeoutCallback callback = new TimeoutCallback() ;
TimeoutCallback callback = new TimeoutCallback(null) ;
expectedCallback.set(callback) ;
// Lock against calls of .abort() or of timeout1Callback.

Expand Down Expand Up @@ -548,10 +557,22 @@ private void startQueryIterator() {
return;
}

// JENA-2140 - the timeout can go off while building the query iterator structure.
// In this case, use a signal passed through the context.
// We don't know if getPlan().iterator() does a lot of work or not
// (ideally it shouldn't start executing the query but in some sub-systems
// it might be necessary)
//
// This applies to the time to first result because to get the first result, the
// queryIterator must have been built. So it does not apply for the second
// stage of N,-1 or N,M.

if ( !isTimeoutSet(timeout1) && isTimeoutSet(timeout2) ) {
// Case -1,N
// Single overall timeout.
TimeoutCallback callback = new TimeoutCallback() ;
AtomicBoolean cancelSignal = new AtomicBoolean(false);
context.set(ARQConstants.symCancelQuery, cancelSignal);
TimeoutCallback callback = new TimeoutCallback(cancelSignal) ;
expectedCallback.set(callback) ;
timeout2Alarm = alarmClock.add(callback, timeout2) ;
// Start the query.
Expand All @@ -566,13 +587,14 @@ private void startQueryIterator() {
// Whether timeout2 is set is determined by QueryIteratorTimer2
// Subcase 2: ! isTimeoutSet(timeout2)
// Add timeout to first row.
TimeoutCallback callback = new TimeoutCallback() ;

AtomicBoolean cancelSignal = new AtomicBoolean(false);
context.set(ARQConstants.symCancelQuery, cancelSignal);

TimeoutCallback callback = new TimeoutCallback(cancelSignal) ;
timeout1Alarm = alarmClock.add(callback, timeout1) ;
expectedCallback.set(callback) ;

// We don't know if getPlan().iterator() does a lot of work or not
// (ideally it shouldn't start executing the query but in some sub-systems
// it might be necessary)
queryIterator = getPlan().iterator() ;

// Add the timeout1->timeout2 resetter wrapper.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@
package org.apache.jena.sparql.engine.iterator ;

import java.util.NoSuchElementException ;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.jena.atlas.lib.Lib ;
import org.apache.jena.query.QueryCancelledException;
import org.apache.jena.sparql.ARQConstants;
import org.apache.jena.sparql.ARQInternalErrorException ;
import org.apache.jena.sparql.engine.ExecutionContext ;
import org.apache.jena.sparql.engine.QueryIterator ;
Expand All @@ -34,20 +37,28 @@
public abstract class QueryIterProcessBinding extends QueryIter1 {
/** Process the binding - return null for "not accept".
* Subclasses may return a different Binding to the argument and
* the result is the returned Binding.
* the result is the returned Binding.
*/
abstract public Binding accept(Binding binding) ;

private Binding nextBinding ;
private final AtomicBoolean signalCancel ;

public QueryIterProcessBinding(QueryIterator qIter, ExecutionContext context) {
super(qIter, context) ;
nextBinding = null ;
AtomicBoolean signal;
try {
signal = context.getContext().get(ARQConstants.symCancelQuery);
} catch(Exception ex) {
signal = null;
}
signalCancel = signal;
}

/**
* Are there any more acceptable objects.
*
*
* @return true if there is another acceptable object.
*/
@Override
Expand All @@ -64,6 +75,7 @@ protected boolean hasNextBinding() {
throw new ARQInternalErrorException(Lib.className(this) + ": Null iterator") ;

while (getInput().hasNext()) {
checkCancelled();
// Skip forward until a binding to return is found.
Binding input = getInput().nextBinding() ;
Binding output = accept(input) ;
Expand All @@ -76,9 +88,16 @@ protected boolean hasNextBinding() {
return false ;
}

private final void checkCancelled() {
if ( signalCancel != null && signalCancel.get() ) {
this.cancel();
throw new QueryCancelledException();
}
}

/**
* The next acceptable object in the iterator.
*
*
* @return The next acceptable object.
*/
@Override
Expand Down

0 comments on commit d1111d7

Please sign in to comment.