Skip to content

Commit

Permalink
Stream result of TargetPattern#eval to a callback instead of returnin…
Browse files Browse the repository at this point in the history
…g it directly, and pass a Query callback in when resolving target patterns. This means that the targets a pattern resolves to can be processed incrementally.

This is the fifth step in a series to allow processing large sets of targets in query target patterns via streaming batches rather than all at once. This should improve performance for SkyQueryEnvironment for certain classes of large queries.

--
MOS_MIGRATED_REVID=111696713
  • Loading branch information
janakdr authored and damienmg committed Jan 11, 2016
1 parent 930bf69 commit ae3a20a
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,9 @@
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.devtools.build.lib.cmdline.LabelValidator.BadLabelException;
import com.google.devtools.build.lib.cmdline.LabelValidator.PackageAndTarget;
import com.google.devtools.build.lib.cmdline.PackageIdentifier.RepositoryName;
import com.google.devtools.build.lib.collect.CompactHashSet;
import com.google.devtools.build.lib.util.BatchCallback;
import com.google.devtools.build.lib.util.Preconditions;
import com.google.devtools.build.lib.util.StringUtilities;
Expand All @@ -34,7 +32,6 @@
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Set;

import javax.annotation.concurrent.Immutable;

Expand Down Expand Up @@ -146,18 +143,10 @@ public String getOffset() {
/**
* Evaluates the current target pattern and returns the result.
*/
public <T> ResolvedTargets<T> eval(TargetPatternResolver<T> resolver)
throws TargetParsingException, InterruptedException {
final Set<T> results = CompactHashSet.create();
BatchCallback<T, RuntimeException> callback =
new BatchCallback<T, RuntimeException>() {
@Override
public void process(Iterable<T> partialResult) {
Iterables.addAll(results, partialResult);
}
};
public <T, E extends Exception> void eval(
TargetPatternResolver<T> resolver, BatchCallback<T, E> callback)
throws TargetParsingException, E, InterruptedException {
eval(resolver, ImmutableSet.<String>of(), callback);
return ResolvedTargets.<T>builder().addAll(results).build();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import com.google.devtools.build.lib.skyframe.TargetPatternValue;
import com.google.devtools.build.lib.skyframe.TargetPatternValue.TargetPatternKey;
import com.google.devtools.build.lib.skyframe.TransitiveTraversalValue;
import com.google.devtools.build.lib.util.BatchCallback;
import com.google.devtools.build.lib.util.Preconditions;
import com.google.devtools.build.lib.vfs.PathFragment;
import com.google.devtools.build.lib.vfs.RootedPath;
Expand All @@ -77,7 +78,6 @@
import java.util.Deque;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
Expand All @@ -94,6 +94,9 @@
* even if the full closure isn't needed.
*/
public class SkyQueryEnvironment extends AbstractBlazeQueryEnvironment<Target> {
// 10k is likely a good balance between using batch efficiently and not blowing up memory.
// TODO(janakr): Unify with RecursivePackageProviderBackedTargetPatternResolver's constant.
private static final int BATCH_CALLBACK_SIZE = 10000;

private WalkableGraph graph;

Expand Down Expand Up @@ -307,9 +310,8 @@ public Set<Target> getNodesOnPath(Target from, Target to) {
@Override
public void eval(QueryExpression expr, Callback<Target> callback)
throws QueryException, InterruptedException {
// 10k is likely a good balance between using batch efficiently and not blowing up memory.
BatchStreamedCallback aggregator = new BatchStreamedCallback(callback, 10000,
createUniquifier());
BatchStreamedCallback aggregator =
new BatchStreamedCallback(callback, BATCH_CALLBACK_SIZE, createUniquifier());
expr.eval(this, aggregator);
aggregator.processLastPending();
}
Expand All @@ -324,14 +326,50 @@ protected Label extractKey(Target target) {
};
}

/**
* Wraps a {@link Callback<Target>} with three additional filtering mechanisms. First, it
* validates the scope of the targets it's given before it passes them to the delegate Callback.
* Second, it removes {@link Target}s not in the graph (outside the universe scope). Third, it
* wraps the Callback in a {@link BatchStreamedCallback}, which aggregates results into batches of
* {@link #BATCH_CALLBACK_SIZE} and also deduplicates elements.
*/
private class FilteringBatchingUniquifyingCallback
implements BatchCallback<Target, QueryException> {
private final BatchStreamedCallback batchStreamedCallback;

private FilteringBatchingUniquifyingCallback(Callback<Target> callback) {
this.batchStreamedCallback =
new BatchStreamedCallback(callback, BATCH_CALLBACK_SIZE, createUniquifier());
}

@Override
public void process(Iterable<Target> partialResult)
throws QueryException, InterruptedException {
Set<Target> targets = CompactHashSet.create();
for (Target target : partialResult) {
if (validateScope(target.getLabel(), strictScope)) {
targets.add(target);
}
}
batchStreamedCallback.process(filterTargetsNotInGraph(targets));
}

private void processLastPending() throws QueryException, InterruptedException {
batchStreamedCallback.processLastPending();
}
}

@Override
public void getTargetsMatchingPattern(
QueryExpression owner, String pattern, Callback<Target> callback) throws QueryException {
Set<Target> targets = ImmutableSet.of();
if (precomputedPatterns.containsKey(pattern)) {
Set<Label> labels = precomputedPatterns.get(pattern);
if (labels != null) {
targets = ImmutableSet.copyOf(makeTargetsFromLabels(labels));
try {
callback.process(ImmutableSet.copyOf(makeTargetsFromLabels(labels)));
} catch (InterruptedException e) {
throw new QueryException(owner, e.getMessage());
}
} else {
TargetParsingException exception;
try {
Expand Down Expand Up @@ -368,27 +406,16 @@ public void getTargetsMatchingPattern(
new RecursivePackageProviderBackedTargetPatternResolver(
provider, eventHandler, targetPatternKey.getPolicy());
TargetPattern parsedPattern = targetPatternKey.getParsedPattern();
targets = parsedPattern.eval(resolver).getTargets();
FilteringBatchingUniquifyingCallback wrapper =
new FilteringBatchingUniquifyingCallback(callback);
parsedPattern.eval(resolver, wrapper);
wrapper.processLastPending();
} catch (TargetParsingException e) {
reportBuildFileError(owner, e.getMessage());
} catch (InterruptedException e) {
throw new QueryException(owner, e.getMessage());
}
}

// Sets.filter would be more convenient here, but can't deal with exceptions.
Iterator<Target> targetIterator = targets.iterator();
while (targetIterator.hasNext()) {
Target target = targetIterator.next();
if (!validateScope(target.getLabel(), strictScope)) {
targetIterator.remove();
}
}
try {
callback.process(filterTargetsNotInGraph(targets));
} catch (InterruptedException e) {
throw new QueryException(owner, e.getMessage());
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,24 +13,19 @@
// limitations under the License.
package com.google.devtools.build.lib.query2.engine;

import com.google.devtools.build.lib.util.BatchCallback;

/**
* Query callback to be called by a {@link QueryExpression} when it has part of the computation
* result. Assuming the {@code QueryEnvironment} supports it, it would allow the caller
* to stream the results.
*/
// TODO(janakr): have this inherit from com.google.devtools.build.lib.util.BatchCallback.
public interface Callback<T> {
public interface Callback<T> extends BatchCallback<T, QueryException> {

/**
* Called by {@code QueryExpression} when it has been able to compute part of the result.
*
* <p>Note that this method can be called several times for a QueryExpression. Callers
* implementing this method should assume that multiple calls can happen.
*
* @param partialResult Part of the result. Note that from the caller's perspective, it is
* guaranteed that no repeated elements will be returned. However {@code QueryExpression}s calling
* the callback do not need to maintain this property, as the {@code QueryEnvironment} should
* handle the uniqueness.
* According to the {@link BatchCallback} interface, repeated elements may be passed in here.
* However, {@code QueryExpression}s calling the callback do not need to maintain this property,
* as the {@code QueryEnvironment} should filter out duplicates.
*/
void process(Iterable<T> partialResult) throws QueryException, InterruptedException;
}

0 comments on commit ae3a20a

Please sign in to comment.