Skip to content

Commit

Permalink
introduced support for "shard_size" for terms & terms_stats facets. T…
Browse files Browse the repository at this point in the history
…he "shard_size" is the number of term entries each shard will send back to the coordinating node. "shard_size" > "size" will increase the accuracy (both in terms of the counts associated with each term and the terms that will actually be returned the user) - of course, the higher "shard_size" is, the more expensive the processing becomes as bigger queues are maintained on a shard level and larger lists are streamed back from the shards.

closes elastic#3821
  • Loading branch information
uboness committed Oct 2, 2013
1 parent 6b000d8 commit f3c6108
Show file tree
Hide file tree
Showing 25 changed files with 1,252 additions and 53 deletions.
30 changes: 30 additions & 0 deletions docs/reference/search/facets/terms-facet.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,36 @@ example:
It is preferred to have the terms facet executed on a non analyzed
field, or a field without a large number of terms it breaks to.

==== Accuracy Control

added[0.90.6]

The `size` parameter defines how many top terms should be returned out
of the overall terms list. By default, the node coordinating the
search process will ask each shard to provide its own top `size` terms
and once all shards respond, it will reduces the results to the final list
that will then be sent back to the client. This means that if the number
of unique terms is greater than `size`, the returned list is slightly off
and not accurate (it could be that the term counts are slightly off and it
could even be that a term that should have been in the top `size` entries
was not returned).

The higher the requested `size` is, the more accurate the results will be,
but also, the more expensive it will be to compute the final results (both
due to bigger priority queues that are managed on a shard level and due to
bigger data transfers between the nodes and the client). In an attempt to
minimize the extra work that comes with bigger requested `size` we a
`shard_size` parameter was introduced. The once defined, it will determine
how many terms the coordinating node is requesting from each shard. Once
all the shards responded, the coordinating node will then reduce them
to a final result which will be based on the `size` parameter - this way,
once can increase the accuracy of the returned terms and avoid the overhead
of streaming a big list of terms back to the client.

Note that `shard_size` cannot be smaller than `size`... if that's the case
elasticsearch will override it and reset it to be equal to `size`.


==== Ordering

Allow to control the ordering of the terms facets, to be ordered by
Expand Down
8 changes: 8 additions & 0 deletions docs/reference/search/facets/terms-stats-facet.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,14 @@ The `size` parameter controls how many facet entries will be returned.
It defaults to `10`. Setting it to 0 will return all terms matching the
hits (be careful not to return too many results).

One can also set `shard_size` (in addition to `size`) which will determine
how many term entries will be requested from each shard. When dealing
with field with high cardinality (at least higher than the requested `size`)
The greater `shard_size` is - the more accurate the result will be (and the
more expensive the overall facet computation will be). `shard_size` is there
to enable you to increase accuracy yet still avoid returning too many
terms_stats entries back to the client.

Ordering is done by setting `order`, with possible values of `term`,
`reverse_term`, `count`, `reverse_count`, `total`, `reverse_total`,
`min`, `reverse_min`, `max`, `reverse_max`, `mean`, `reverse_mean`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public class TermsFacetBuilder extends FacetBuilder {
private String fieldName;
private String[] fieldsNames;
private int size = 10;
private int shardSize = -1;
private Boolean allTerms;
private Object[] exclude;
private String regex;
Expand Down Expand Up @@ -124,6 +125,11 @@ public TermsFacetBuilder size(int size) {
return this;
}

public TermsFacetBuilder shardSize(int shardSize) {
this.shardSize = shardSize;
return this;
}

/**
* A regular expression to use in order to further filter terms.
*/
Expand Down Expand Up @@ -213,6 +219,12 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.field("field", fieldName);
}
builder.field("size", size);

// no point in sending shard size if it's not greater than size
if (shardSize > size) {
builder.field("shard_size", shardSize);
}

if (exclude != null) {
builder.startArray("exclude");
for (Object ex : exclude) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ public FacetExecutor.Mode defaultGlobalMode() {
public FacetExecutor parse(String facetName, XContentParser parser, SearchContext context) throws IOException {
String field = null;
int size = 10;
int shardSize = -1;

String[] fieldsNames = null;
ImmutableSet<BytesRef> excluded = ImmutableSet.of();
Expand Down Expand Up @@ -124,6 +125,8 @@ public FacetExecutor parse(String facetName, XContentParser parser, SearchContex
script = parser.text();
} else if ("size".equals(currentFieldName)) {
size = parser.intValue();
} else if ("shard_size".equals(currentFieldName)) {
shardSize = parser.intValue();
} else if ("all_terms".equals(currentFieldName) || "allTerms".equals(currentFieldName)) {
allTerms = parser.booleanValue();
} else if ("regex".equals(currentFieldName)) {
Expand All @@ -143,7 +146,7 @@ public FacetExecutor parse(String facetName, XContentParser parser, SearchContex
}

if ("_index".equals(field)) {
return new IndexNameFacetExecutor(context.shardTarget().index(), comparatorType, size);
return new IndexNameFacetExecutor(context.shardTarget().index(), comparatorType, size, shardSize);
}

if (fieldsNames != null && fieldsNames.length == 1) {
Expand All @@ -161,6 +164,11 @@ public FacetExecutor parse(String facetName, XContentParser parser, SearchContex
searchScript = context.scriptService().search(context.lookup(), scriptLang, script, params);
}

// shard_size cannot be smaller than size as we need to at least fetch <size> entries from every shards in order to return <size>
if (shardSize < size) {
shardSize = size;
}

if (fieldsNames != null) {

// in case of multi files, we only collect the fields that are mapped and facet on them.
Expand All @@ -175,10 +183,10 @@ public FacetExecutor parse(String facetName, XContentParser parser, SearchContex
// non of the fields is mapped
return new UnmappedFieldExecutor(size, comparatorType);
}
return new FieldsTermsStringFacetExecutor(facetName, mappers.toArray(new FieldMapper[mappers.size()]), size, comparatorType, allTerms, context, excluded, pattern, searchScript);
return new FieldsTermsStringFacetExecutor(mappers.toArray(new FieldMapper[mappers.size()]), size, shardSize, comparatorType, allTerms, context, excluded, pattern, searchScript);
}
if (field == null && fieldsNames == null && script != null) {
return new ScriptTermsStringFieldFacetExecutor(size, comparatorType, context, excluded, pattern, scriptLang, script, params, context.cacheRecycler());
return new ScriptTermsStringFieldFacetExecutor(size, shardSize, comparatorType, context, excluded, pattern, scriptLang, script, params, context.cacheRecycler());
}

FieldMapper fieldMapper = context.smartNameFieldMapper(field);
Expand All @@ -190,17 +198,17 @@ public FacetExecutor parse(String facetName, XContentParser parser, SearchContex
if (indexFieldData instanceof IndexNumericFieldData) {
IndexNumericFieldData indexNumericFieldData = (IndexNumericFieldData) indexFieldData;
if (indexNumericFieldData.getNumericType().isFloatingPoint()) {
return new TermsDoubleFacetExecutor(indexNumericFieldData, size, comparatorType, allTerms, context, excluded, searchScript, context.cacheRecycler());
return new TermsDoubleFacetExecutor(indexNumericFieldData, size, shardSize, comparatorType, allTerms, context, excluded, searchScript, context.cacheRecycler());
} else {
return new TermsLongFacetExecutor(indexNumericFieldData, size, comparatorType, allTerms, context, excluded, searchScript, context.cacheRecycler());
return new TermsLongFacetExecutor(indexNumericFieldData, size, shardSize, comparatorType, allTerms, context, excluded, searchScript, context.cacheRecycler());
}
} else {
if (script != null || "map".equals(executionHint)) {
return new TermsStringFacetExecutor(indexFieldData, size, comparatorType, allTerms, context, excluded, pattern, searchScript);
return new TermsStringFacetExecutor(indexFieldData, size, shardSize, comparatorType, allTerms, context, excluded, pattern, searchScript);
} else if (indexFieldData instanceof IndexFieldData.WithOrdinals) {
return new TermsStringOrdinalsFacetExecutor((IndexFieldData.WithOrdinals) indexFieldData, size, comparatorType, allTerms, context, excluded, pattern, ordinalsCacheAbove);
return new TermsStringOrdinalsFacetExecutor((IndexFieldData.WithOrdinals) indexFieldData, size, shardSize, comparatorType, allTerms, context, excluded, pattern, ordinalsCacheAbove);
} else {
return new TermsStringFacetExecutor(indexFieldData, size, comparatorType, allTerms, context, excluded, pattern, searchScript);
return new TermsStringFacetExecutor(indexFieldData, size, shardSize, comparatorType, allTerms, context, excluded, pattern, searchScript);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,13 @@ public long getOtherCount() {
public Facet reduce(ReduceContext context) {
List<Facet> facets = context.facets();
if (facets.size() == 1) {
return facets.get(0);
Facet facet = facets.get(0);

// can be of type InternalStringTermsFacet representing unmapped fields
if (facet instanceof InternalDoubleTermsFacet) {
((InternalDoubleTermsFacet) facet).trimExcessEntries();
}
return facet;
}

InternalDoubleTermsFacet first = null;
Expand Down Expand Up @@ -197,6 +203,25 @@ public Facet reduce(ReduceContext context) {
return first;
}

private void trimExcessEntries() {
if (requiredSize >= entries.size()) {
return;
}

if (entries instanceof List) {
entries = ((List) entries).subList(0, requiredSize);
return;
}

int i = 0;
for (Iterator<DoubleEntry> iter = entries.iterator(); iter.hasNext();) {
iter.next();
if (i++ >= requiredSize) {
iter.remove();
}
}
}

static final class Fields {
static final XContentBuilderString _TYPE = new XContentBuilderString("_type");
static final XContentBuilderString MISSING = new XContentBuilderString("missing");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,17 +53,19 @@ public class TermsDoubleFacetExecutor extends FacetExecutor {
private final IndexNumericFieldData indexFieldData;
private final TermsFacet.ComparatorType comparatorType;
private final int size;
private final int shardSize;
private final SearchScript script;
private final ImmutableSet<BytesRef> excluded;

final Recycler.V<TDoubleIntHashMap> facets;
long missing;
long total;

public TermsDoubleFacetExecutor(IndexNumericFieldData indexFieldData, int size, TermsFacet.ComparatorType comparatorType, boolean allTerms, SearchContext context,
public TermsDoubleFacetExecutor(IndexNumericFieldData indexFieldData, int size, int shardSize, TermsFacet.ComparatorType comparatorType, boolean allTerms, SearchContext context,
ImmutableSet<BytesRef> excluded, SearchScript script, CacheRecycler cacheRecycler) {
this.indexFieldData = indexFieldData;
this.size = size;
this.shardSize = shardSize;
this.comparatorType = comparatorType;
this.script = script;
this.excluded = excluded;
Expand Down Expand Up @@ -120,7 +122,7 @@ public InternalFacet buildFacet(String facetName) {
return new InternalDoubleTermsFacet(facetName, comparatorType, size, ImmutableList.<InternalDoubleTermsFacet.DoubleEntry>of(), missing, total);
} else {
if (size < EntryPriorityQueue.LIMIT) {
EntryPriorityQueue ordered = new EntryPriorityQueue(size, comparatorType.comparator());
EntryPriorityQueue ordered = new EntryPriorityQueue(shardSize, comparatorType.comparator());
for (TDoubleIntIterator it = facets.v().iterator(); it.hasNext(); ) {
it.advance();
ordered.insertWithOverflow(new InternalDoubleTermsFacet.DoubleEntry(it.key(), it.value()));
Expand All @@ -132,7 +134,7 @@ public InternalFacet buildFacet(String facetName) {
facets.release();
return new InternalDoubleTermsFacet(facetName, comparatorType, size, Arrays.asList(list), missing, total);
} else {
BoundedTreeSet<InternalDoubleTermsFacet.DoubleEntry> ordered = new BoundedTreeSet<InternalDoubleTermsFacet.DoubleEntry>(comparatorType.comparator(), size);
BoundedTreeSet<InternalDoubleTermsFacet.DoubleEntry> ordered = new BoundedTreeSet<InternalDoubleTermsFacet.DoubleEntry>(comparatorType.comparator(), shardSize);
for (TDoubleIntIterator it = facets.v().iterator(); it.hasNext(); ) {
it.advance();
ordered.add(new InternalDoubleTermsFacet.DoubleEntry(it.key(), it.value()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,15 @@ public class IndexNameFacetExecutor extends FacetExecutor {
private final String indexName;
private final InternalStringTermsFacet.ComparatorType comparatorType;
private final int size;
private final int shardSize;

private int count = 0;

public IndexNameFacetExecutor(String indexName, TermsFacet.ComparatorType comparatorType, int size) {
public IndexNameFacetExecutor(String indexName, TermsFacet.ComparatorType comparatorType, int size, int shardSize) {
this.indexName = indexName;
this.comparatorType = comparatorType;
this.size = size;
this.shardSize = shardSize;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,13 @@ public long getOtherCount() {
public Facet reduce(ReduceContext context) {
List<Facet> facets = context.facets();
if (facets.size() == 1) {
return facets.get(0);
Facet facet = facets.get(0);

// facet could be InternalStringTermsFacet representing unmapped fields
if (facet instanceof InternalLongTermsFacet) {
((InternalLongTermsFacet) facet).trimExcessEntries();
}
return facet;
}

InternalLongTermsFacet first = null;
Expand Down Expand Up @@ -198,6 +204,25 @@ public Facet reduce(ReduceContext context) {
return first;
}

private void trimExcessEntries() {
if (requiredSize >= entries.size()) {
return;
}

if (entries instanceof List) {
entries = ((List) entries).subList(0, requiredSize);
return;
}

int i = 0;
for (Iterator<LongEntry> iter = entries.iterator(); iter.hasNext();) {
iter.next();
if (i++ >= requiredSize) {
iter.remove();
}
}
}

static final class Fields {
static final XContentBuilderString _TYPE = new XContentBuilderString("_type");
static final XContentBuilderString MISSING = new XContentBuilderString("missing");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public class TermsLongFacetExecutor extends FacetExecutor {

private final IndexNumericFieldData indexFieldData;
private final TermsFacet.ComparatorType comparatorType;
private final int shardSize;
private final int size;
private final SearchScript script;
private final ImmutableSet<BytesRef> excluded;
Expand All @@ -60,10 +61,11 @@ public class TermsLongFacetExecutor extends FacetExecutor {
long missing;
long total;

public TermsLongFacetExecutor(IndexNumericFieldData indexFieldData, int size, TermsFacet.ComparatorType comparatorType, boolean allTerms, SearchContext context,
public TermsLongFacetExecutor(IndexNumericFieldData indexFieldData, int size, int shardSize, TermsFacet.ComparatorType comparatorType, boolean allTerms, SearchContext context,
ImmutableSet<BytesRef> excluded, SearchScript script, CacheRecycler cacheRecycler) {
this.indexFieldData = indexFieldData;
this.size = size;
this.shardSize = shardSize;
this.comparatorType = comparatorType;
this.script = script;
this.excluded = excluded;
Expand Down Expand Up @@ -119,7 +121,7 @@ public InternalFacet buildFacet(String facetName) {
return new InternalLongTermsFacet(facetName, comparatorType, size, ImmutableList.<InternalLongTermsFacet.LongEntry>of(), missing, total);
} else {
if (size < EntryPriorityQueue.LIMIT) {
EntryPriorityQueue ordered = new EntryPriorityQueue(size, comparatorType.comparator());
EntryPriorityQueue ordered = new EntryPriorityQueue(shardSize, comparatorType.comparator());
for (TLongIntIterator it = facets.v().iterator(); it.hasNext(); ) {
it.advance();
ordered.insertWithOverflow(new InternalLongTermsFacet.LongEntry(it.key(), it.value()));
Expand All @@ -131,7 +133,7 @@ public InternalFacet buildFacet(String facetName) {
facets.release();
return new InternalLongTermsFacet(facetName, comparatorType, size, Arrays.asList(list), missing, total);
} else {
BoundedTreeSet<InternalLongTermsFacet.LongEntry> ordered = new BoundedTreeSet<InternalLongTermsFacet.LongEntry>(comparatorType.comparator(), size);
BoundedTreeSet<InternalLongTermsFacet.LongEntry> ordered = new BoundedTreeSet<InternalLongTermsFacet.LongEntry>(comparatorType.comparator(), shardSize);
for (TLongIntIterator it = facets.v().iterator(); it.hasNext(); ) {
it.advance();
ordered.add(new InternalLongTermsFacet.LongEntry(it.key(), it.value()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,17 @@ public class FieldsTermsStringFacetExecutor extends FacetExecutor {

private final InternalStringTermsFacet.ComparatorType comparatorType;
private final int size;
private final int shardSize;
private final IndexFieldData[] indexFieldDatas;
private final SearchScript script;
private final HashedAggregator aggregator;
long missing;
long total;

public FieldsTermsStringFacetExecutor(String facetName, FieldMapper[] fieldMappers, int size, InternalStringTermsFacet.ComparatorType comparatorType, boolean allTerms, SearchContext context,
ImmutableSet<BytesRef> excluded, Pattern pattern, SearchScript script) {
public FieldsTermsStringFacetExecutor(FieldMapper[] fieldMappers, int size, int shardSize, InternalStringTermsFacet.ComparatorType comparatorType,
boolean allTerms, SearchContext context, ImmutableSet<BytesRef> excluded, Pattern pattern, SearchScript script) {
this.size = size;
this.shardSize = shardSize;
this.comparatorType = comparatorType;
this.script = script;
this.indexFieldDatas = new IndexFieldData[fieldMappers.length];
Expand Down Expand Up @@ -78,7 +80,7 @@ public Collector collector() {
@Override
public InternalFacet buildFacet(String facetName) {
try {
return HashedAggregator.buildFacet(facetName, size, missing, total, comparatorType, aggregator);
return HashedAggregator.buildFacet(facetName, size, shardSize, missing, total, comparatorType, aggregator);
} finally {
aggregator.release();
}
Expand Down
Loading

0 comments on commit f3c6108

Please sign in to comment.