Skip to content

Commit

Permalink
apacheGH-1279: Improve the implementation of in-memory, general-purpo…
Browse files Browse the repository at this point in the history
…se,non-transactional graphs.

Summary:
- Improved performance of GraphMem for:
  - Graph#find
    - Slightly by tuning the filter predicates
    - Significantly when results are processed via Iterator#forEachRemaining
  - Graph#stream
    - Slightly by tuning the filter predicates
    - Significantly for most stream operations
    - java.util.stream.BaseStream#parallel is now fully supported.
      (Before these changes, parallel execution was even slower than single-threaded execution)
  - Graph#contains
    - Only for non-concrete (fluent) patterns
- GraphMem used a variety of different Iterator implementations, helper classes, and wrappers:
  -> None of them supported #forEachRemaining
  -> It seemed appropriate to almost universally implement #forEachRemaining to avoid a wrapper or helper
     from breaking the newly gained performance advantages of #forEachRemaining

Details:

Implemented Iterator#forEachRemaining in an optimized way for many iterators throughout the Jena project:
- Replaced hasNext();next(); calls by #forEachRemaining in some promising places (not all)

Optimized NiceIterator:
- NiceIterator#hasNext now avoids redundant calls to current.hasNext()
- NiceIterator#andThen has optimized code to handle expensive hasNext calls of wrapped iterators.

Tuned GraphMem:
- Removed unused classes in the 'mem' namespace.
- Iterators:
  - Tuned Iterator implementations, mainly by adding forEachRemaining implementations
  - Optimized code in BasicKeyIterator. The new iterator works in reverse order, so I had to adapt HashCommon#removeFrom
  - Optimized code in HashedBunchMap#iterator
  - TrackingTripleIterator#forEachRemaining now simply calls super.forEachRemaining() and sets current to null
    -> This had a significant impact on performance
- Spliterators:
  - Created specialized spliterators SparseArraySpliterator and SparseArraySubSpliterator (+unit tests)
  - Replaced Spliterator implementation within HashCommon and HashedBunchMap with SparseArraySpliterator
  - Implemented Spliterators to support fast stream operations, where the SparseSpliterator*
    implementations were not suitable
- Replaced usage of org.apache.jena.graph.impl.GraphBase#containsByFind with optimized implementations:
  - Introduced org.apache.jena.graph.impl.TripleStore#containsMatch
  - Implemented org.apache.jena.mem.NodeToTriplesMapBase#containsMatch using the spliterator

- Filter operations:
  - Introduced org.apache.jena.graph.Triple.Field#filterOnConcrete to avoid double-checking of Node#isConcrete (+unit tests)
  - Created org.apache.jena.mem.FieldFilter to efficiently build filter predicates only when needed, and only with
    the required conditions (+unit tests)
  - Used org.apache.jena.mem.FieldFilter#filterOn in NodeToTriplesMap#iterator, NodeToTriplesMapBase#stream,
    and NodeToTriplesMapBase#containsMatch to only filter when a filter is needed.
    For example: For find(sub, ANY, ANY), there is no need for a filter in the underlying TripleBunch
  • Loading branch information
bern-soptim committed Jun 4, 2023
1 parent ba2cdb6 commit 8ef274a
Show file tree
Hide file tree
Showing 74 changed files with 2,618 additions and 701 deletions.
4 changes: 4 additions & 0 deletions jena-arq/src/main/java/org/apache/jena/query/ResultSet.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.util.Iterator ;
import java.util.List ;
import java.util.function.Consumer;

import org.apache.jena.rdf.model.Model ;
import org.apache.jena.sparql.engine.binding.Binding ;
Expand Down Expand Up @@ -53,6 +54,9 @@ public static ResultSet adapt(RowSet rowSet) {
@Override
public QuerySolution next() ;

@Override
public void forEachRemaining(Consumer<? super QuerySolution> action);

/** Moves onto the next result (legacy - use .next()). */
public QuerySolution nextSolution() ;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.apache.jena.riot.system.StreamRDF;
import org.apache.jena.riot.thrift.ThriftRDF;
import org.apache.jena.sparql.core.Var;
import org.apache.jena.sparql.engine.binding.Binding;
import org.apache.jena.sparql.exec.RowSet;
import org.apache.jena.sparql.exec.RowSetStream;

Expand Down Expand Up @@ -155,10 +154,7 @@ public static void writeRowSet(OutputStream out, RowSet rowSet, boolean withValu
try {
List<Var> vars = rowSet.getResultVars();
try ( Binding2Protobuf b2p = new Binding2Protobuf(out, vars, false) ) {
for ( ; rowSet.hasNext() ; ) {
Binding b = rowSet.next();
b2p.output(b);
}
rowSet.forEachRemaining(b2p::output);
}
} finally { IO.flush(out); }
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.util.*;
import java.util.function.Consumer;
import java.util.regex.Pattern;

import org.apache.jena.atlas.io.IO;
Expand Down Expand Up @@ -199,6 +200,22 @@ public Binding next() {
return row;
}

@Override
public void forEachRemaining(Consumer<? super Binding> action) {
if ( finished )
return;
if ( null != currentBinding ) {
action.accept(currentBinding);
currentBinding = null;
}
Binding row;
while (null != (row = parseNextBinding())) {
action.accept(row);
}
IO.close(reader);
finished = true;
}

private Binding parseNextBinding() {
String line;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,10 +283,7 @@ public static long countTriples(DatasetGraph dsg, Node s, Node p, Node o) {
* Collect all the matching triples
*/
public static void accTriples(Collection<Triple> acc, Graph graph, Node s, Node p, Node o) {
ExtendedIterator<Triple> iter = graph.find(s, p, o);
while (iter.hasNext())
acc.add(iter.next());
iter.close();
graph.find(s, p, o).forEach(acc::add);
}

public static void writeBase(IndentedWriter out, String base, boolean newStyle) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,11 +116,7 @@ public static void sendTriplesToStream(Graph graph, StreamRDF stream) {
/** Set triples to a StreamRDF - does not call .start/.finish */
public static void sendTriplesToStream(Iterator<Triple> iter, StreamRDF dest)
{
for ( ; iter.hasNext() ; )
{
Triple t = iter.next() ;
dest.triple(t) ;
}
iter.forEachRemaining(dest::triple);
}

/** Send quads of a dataset (including default graph as quads) to a StreamRDF, without prefixes */
Expand All @@ -132,10 +128,6 @@ public static void sendQuadsToStream(DatasetGraph datasetGraph, StreamRDF stream
/** Set quads to a StreamRDF - does not call .start/.finish */
public static void sendQuadsToStream(Iterator<Quad> iter, StreamRDF dest)
{
for ( ; iter.hasNext() ; )
{
Quad q = iter.next() ;
dest.quad(q) ;
}
iter.forEachRemaining(dest::quad);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.util.Iterator;
import java.util.Map;
import java.util.function.Consumer;

import org.apache.jena.atlas.json.JsonObject;
import org.apache.jena.graph.Node;
Expand All @@ -44,4 +45,9 @@ public JsonIterator(QueryIterator queryIterator, Map<String, Node> template) {

@Override
public JsonObject next() { return results.next(); }

@Override
public void forEachRemaining(Consumer<? super JsonObject> action) {
results.forEachRemaining(action);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.jena.sparql.engine;

import java.util.List ;
import java.util.function.Consumer;

import org.apache.jena.query.QueryExecution ;
import org.apache.jena.query.QuerySolution ;
Expand Down Expand Up @@ -79,6 +80,16 @@ public void close() {
other.close();
}

/**
* Attention: The check is only done once before the first consumer accept call.
* @param action The action to be performed for each element
*/
@Override
public void forEachRemaining(Consumer<? super QuerySolution> action) {
check() ;
other.forEachRemaining(action);
}

@Override
public int getRowNumber() {
check() ;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.function.Consumer;

import org.apache.jena.atlas.iterator.Iter;
import org.apache.jena.query.QuerySolution;
Expand Down Expand Up @@ -125,6 +126,17 @@ public QuerySolution nextSolution() {
@Override
public QuerySolution next() { return nextSolution(); }

@Override
public void forEachRemaining(Consumer<? super QuerySolution> action) {
if ( queryExecutionIter == null )
return;
queryExecutionIter.forEachRemaining(binding -> {
rowNumber++;
action.accept(new ResultBinding(model, binding));
});
close();
}

/** Return the "row number" - a count of the number of possibilities returned so far.
* Remains valid (as the total number of possibilities) after the iterator ends.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.Collections ;
import java.util.Iterator ;
import java.util.List ;
import java.util.function.Consumer;

import org.apache.jena.atlas.iterator.IteratorSlotted ;
import org.apache.jena.atlas.lib.Closeable ;
Expand Down Expand Up @@ -106,6 +107,11 @@ public Binding next()
return iter.next() ;
}

@Override
public void forEachRemaining(Consumer<? super Binding> action) {
iter.forEachRemaining(action);
}

@Override
public void remove()
{ iter.remove() ; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.Iterator;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.function.Consumer;

import org.apache.jena.atlas.json.JsonArray;
import org.apache.jena.atlas.json.JsonObject;
Expand Down Expand Up @@ -83,6 +84,15 @@ public JsonObject next() {
return jsonObject;
}

@Override
public void forEachRemaining(Consumer<? super JsonObject> action) {
if ( queryIterator == null )
return;
queryIterator.forEachRemaining(binding
-> action.accept( JsonResults.generateJsonObject(binding, template) ));
close();
}

/** Close the query iterator */
private void close() {
queryIterator.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.util.ArrayList ;
import java.util.List ;
import java.util.function.Consumer;

import org.apache.jena.atlas.iterator.PeekIterator ;
import org.apache.jena.query.QuerySolution ;
Expand Down Expand Up @@ -146,6 +147,15 @@ public ResultSetMem() {
@Override
public QuerySolution next() { return nextSolution() ; }

@Override
public void forEachRemaining(Consumer<? super QuerySolution> action) {
iterator.forEachRemaining(binding -> {
rowNumber++;
action.accept(new ResultBinding(model, binding));
});

}

@Override
public void close() {}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.util.List;
import java.util.NoSuchElementException;
import java.util.function.Consumer;

import org.apache.jena.atlas.lib.Closeable;
import org.apache.jena.query.QuerySolution ;
Expand Down Expand Up @@ -75,6 +76,13 @@ public QuerySolution next() {
return new ResultBinding(this.model, this.nextBinding());
}

@Override
public void forEachRemaining(Consumer<? super QuerySolution> action) {
while (this.hasNext()) {
action.accept(this.next());
}
}

@Override
public QuerySolution nextSolution() {
return this.next();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.jena.sparql.resultset;

import java.util.List ;
import java.util.function.Consumer;

import org.apache.jena.query.QuerySolution ;
import org.apache.jena.query.ResultSet ;
Expand All @@ -45,6 +46,11 @@ public QuerySolution next() {
return get().next();
}

@Override
public void forEachRemaining(Consumer<? super QuerySolution> action) {
get().forEachRemaining(action);
}

@Override
public QuerySolution nextSolution() {
return get().nextSolution();
Expand Down
Loading

0 comments on commit 8ef274a

Please sign in to comment.