Skip to content

Commit

Permalink
Return index set wildcard instead of individual indices if index list…
Browse files Browse the repository at this point in the history
… is too long (Graylog2#4062)

* Return write index alias instead of individual indices in Searches#determineAffectedIndices()

If the number of indices determined in `Searches#determineAffectedIndices()` is larger than
`Searches.MAX_INDICES_PER_QUERY`, return the covering write index aliases of the respective
index sets instead of the individual index names.

Fixes Graylog2#4054

* Use index wildcard instead of write index alias (duh!)

* Use correct indices in Searches#search(SearchesConfig)

* Reduce number of indices in Searches#fieldStats() if necessary
  • Loading branch information
joschi authored and bernd committed Aug 11, 2017
1 parent 1aa1ba2 commit 3dc4ca1
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSortedSet;
import io.searchbox.client.JestClient;
import io.searchbox.client.JestResult;
Expand Down Expand Up @@ -51,6 +52,7 @@
import org.graylog2.indexer.IndexHelper;
import org.graylog2.indexer.IndexMapping;
import org.graylog2.indexer.IndexSet;
import org.graylog2.indexer.IndexSetRegistry;
import org.graylog2.indexer.cluster.jest.JestUtils;
import org.graylog2.indexer.indices.Indices;
import org.graylog2.indexer.ranges.IndexRange;
Expand Down Expand Up @@ -87,8 +89,8 @@
import java.util.stream.StreamSupport;

import static com.codahale.metrics.MetricRegistry.name;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Strings.isNullOrEmpty;
import static java.util.Objects.requireNonNull;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.elasticsearch.index.query.QueryBuilders.queryStringQuery;

Expand All @@ -104,6 +106,9 @@ public class Searches {
public static final String AGG_VALUE_COUNT = "gl2_value_count";
private static final Pattern filterStreamIdPattern = Pattern.compile("^(.+[^\\p{Alnum}])?streams:([\\p{XDigit}]+)");

@VisibleForTesting
static final int MAX_INDICES_PER_QUERY = 100;

public enum TermsStatsOrder {
TERM,
REVERSE_TERM,
Expand Down Expand Up @@ -162,6 +167,7 @@ public org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInter

private final Configuration configuration;
private final IndexRangeService indexRangeService;
private final IndexSetRegistry indexSetRegistry;
private final Timer esRequestTimer;
private final Histogram esTimeRangeHistogram;
private final Counter esTotalSearchesCounter;
Expand All @@ -173,21 +179,23 @@ public org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInter
@Inject
public Searches(Configuration configuration,
IndexRangeService indexRangeService,
IndexSetRegistry indexSetRegistry,
MetricRegistry metricRegistry,
StreamService streamService,
Indices indices,
JestClient jestClient,
ScrollResult.Factory scrollResultFactory) {
this.configuration = checkNotNull(configuration);
this.indexRangeService = checkNotNull(indexRangeService);
this.configuration = requireNonNull(configuration, "configuration");
this.indexRangeService = requireNonNull(indexRangeService, "indexRangeService");
this.indexSetRegistry = requireNonNull(indexSetRegistry, "indexSetRegistry");

this.esRequestTimer = metricRegistry.timer(name(Searches.class, "elasticsearch", "requests"));
this.esTimeRangeHistogram = metricRegistry.histogram(name(Searches.class, "elasticsearch", "ranges"));
this.esTotalSearchesCounter = metricRegistry.counter(name(Searches.class, "elasticsearch", "total-searches"));
this.streamService = streamService;
this.indices = indices;
this.jestClient = jestClient;
this.scrollResultFactory = scrollResultFactory;
this.streamService = requireNonNull(streamService, "streamService");
this.indices = requireNonNull(indices, "indices");
this.jestClient = requireNonNull(jestClient, "jestClient");
this.scrollResultFactory = requireNonNull(scrollResultFactory, "scrollResultFactory");
}

public CountResult count(String query, TimeRange range) {
Expand Down Expand Up @@ -257,20 +265,20 @@ public SearchResult search(String query, String filter, TimeRange range, int lim

public SearchResult search(SearchesConfig config) {
final Set<IndexRange> indexRanges = determineAffectedIndicesWithRanges(config.range(), config.filter());
final Set<String> indices = indexRanges.stream().map(IndexRange::indexName).collect(Collectors.toSet());

final SearchSourceBuilder requestBuilder = searchRequest(config);
if (indexRanges.isEmpty()) {
return SearchResult.empty(config.query(), requestBuilder.toString());
}

final Set<String> indices = extractIndexNamesFromIndexRanges(indexRanges);
final Search.Builder searchBuilder = new Search.Builder(requestBuilder.toString())
.addType(IndexMapping.TYPE_MESSAGE)
.addIndex(indices);

if (indices.isEmpty()) {
return SearchResult.empty(config.query(), requestBuilder.toString());
}
final io.searchbox.core.SearchResult searchResult = checkForFailedShards(JestUtils.execute(jestClient, searchBuilder.build(), () -> "Unable to perform search query."));
final List<ResultMessage> hits = searchResult.getHits(Map.class, false).stream()
.map(hit -> ResultMessage.parseFromSource(hit.id, hit.index, (Map<String, Object>)hit.source))
.map(hit -> ResultMessage.parseFromSource(hit.id, hit.index, (Map<String, Object>) hit.source))
.collect(Collectors.toList());
final long tookMs = tookMsFromSearchResult(searchResult);
recordEsMetrics(tookMs, config.range());
Expand Down Expand Up @@ -444,10 +452,15 @@ public FieldStatsResult fieldStats(String field,
boolean includeCardinality,
boolean includeStats,
boolean includeCount) {
final SearchSourceBuilder searchSourceBuilder = filteredSearchRequest(query, filter, range);

final Set<String> affectedIndices = indicesContainingField(determineAffectedIndices(range, filter), field);
final SearchSourceBuilder searchSourceBuilder;
if (filter == null) {
searchSourceBuilder = standardSearchRequest(query, range);
} else {
searchSourceBuilder = filteredSearchRequest(query, filter, range);
}

final FilterAggregationBuilder filterBuilder = AggregationBuilders.filter(AGG_FILTER)
.filter(standardAggregationFilters(range, filter));
if (includeCount) {
searchSourceBuilder.aggregation(AggregationBuilders.count(AGG_VALUE_COUNT).field(field));
}
Expand All @@ -458,14 +471,23 @@ public FieldStatsResult fieldStats(String field,
searchSourceBuilder.aggregation(AggregationBuilders.cardinality(AGG_CARDINALITY).field(field));
}

final Search.Builder searchBuilder = new Search.Builder(searchSourceBuilder.toString())
.addType(IndexMapping.TYPE_MESSAGE)
.addIndex(affectedIndices);
searchSourceBuilder.aggregation(filterBuilder);

final Set<String> affectedIndices = indicesContainingField(determineAffectedIndices(range, filter), field);
if (affectedIndices.isEmpty()) {
return FieldStatsResult.empty(query, searchSourceBuilder.toString());
}

final Set<String> indices;
if (affectedIndices.size() > MAX_INDICES_PER_QUERY) {
indices = reduceIndexNamesToIndexWildcards(affectedIndices);
} else {
indices = affectedIndices;
}
final Search.Builder searchBuilder = new Search.Builder(searchSourceBuilder.toString())
.addType(IndexMapping.TYPE_MESSAGE)
.addIndex(indices);

final io.searchbox.core.SearchResult searchResponse = checkForFailedShards(JestUtils.execute(jestClient, searchBuilder.build(), () -> "Unable to retrieve fields stats."));
final List<ResultMessage> hits = searchResponse.getHits(Map.class, false).stream()
.map(hit -> ResultMessage.parseFromSource(hit.id, hit.index, (Map<String, Object>)hit.source))
Expand Down Expand Up @@ -766,16 +788,32 @@ public static Optional<String> extractStreamId(String filter) {
return Optional.empty();
}

public Set<String> determineAffectedIndices(TimeRange range,
@Nullable String filter) {
final Set<IndexRange> indexRanges = determineAffectedIndicesWithRanges(range, filter);
return indexRanges.stream()
@VisibleForTesting
Set<String> determineAffectedIndices(TimeRange range, @Nullable String filter) {
return extractIndexNamesFromIndexRanges(determineAffectedIndicesWithRanges(range, filter));
}

private Set<String> extractIndexNamesFromIndexRanges(Set<IndexRange> indexRanges) {
final Set<String> indexNames = indexRanges.stream()
.map(IndexRange::indexName)
.collect(Collectors.toSet());
if (indexNames.size() > MAX_INDICES_PER_QUERY) {
return reduceIndexNamesToIndexWildcards(indexNames);
} else {
return indexNames;
}
}

private Set<String> reduceIndexNamesToIndexWildcards(Set<String> indexNames) {
return indexNames.stream()
.map(indexSetRegistry::getForIndex)
.flatMap(o -> o.map(java.util.stream.Stream::of).orElseGet(java.util.stream.Stream::empty))
.map(IndexSet::getIndexWildcard)
.collect(Collectors.toSet());
}

public Set<IndexRange> determineAffectedIndicesWithRanges(TimeRange range,
@Nullable String filter) {
@VisibleForTesting
Set<IndexRange> determineAffectedIndicesWithRanges(TimeRange range, @Nullable String filter) {
final Optional<String> streamId = extractStreamId(filter);
IndexSet indexSet = null;
// if we are searching in a stream, we are further restricting the indices using the currently
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.graylog2.buffers.processors.fakestreams.FakeStream;
import org.graylog2.indexer.IndexHelper;
import org.graylog2.indexer.IndexSet;
import org.graylog2.indexer.IndexSetRegistry;
import org.graylog2.indexer.TestIndexSet;
import org.graylog2.indexer.indexset.IndexSetConfig;
import org.graylog2.indexer.indices.Indices;
Expand Down Expand Up @@ -71,11 +72,13 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.SortedSet;

import static org.assertj.core.api.Assertions.assertThat;
import static org.joda.time.DateTimeZone.UTC;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.startsWith;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -130,6 +133,9 @@ public DateTime begin() {
@Mock
private IndexRangeService indexRangeService;

@Mock
private IndexSetRegistry indexSetRegistry;

@Mock
private StreamService streamService;

Expand Down Expand Up @@ -170,6 +176,7 @@ public void setUp() throws Exception {
searches = new Searches(
new Configuration(),
indexRangeService,
indexSetRegistry,
metricRegistry,
streamService,
indices,
Expand Down Expand Up @@ -593,6 +600,25 @@ public void determineAffectedIndicesWithRangesIncludesDeflectorTarget() throws E
.containsExactly(indexRangeLatest, indexRange0, indexRange1);
}

@Test
public void determineAffectedIndicesOnlyReturnsAliasesIfTooManyIndicesAreFound() throws Exception {
final int numberOfIndices = Searches.MAX_INDICES_PER_QUERY + 1;
final DateTime now = DateTime.now(DateTimeZone.UTC);
final ImmutableSortedSet.Builder<IndexRange> indices = ImmutableSortedSet.orderedBy(IndexRange.COMPARATOR);

for (int i = 0; i < numberOfIndices; i++) {
final MongoIndexRange indexRange = MongoIndexRange.create("graylog_" + i, now.plusDays(i), now.plusDays(i + 1), now, 0);
indices.add(indexRange);
}

when(indexRangeService.find(any(DateTime.class), any(DateTime.class))).thenReturn(indices.build());
when(indexSetRegistry.getForIndex(anyString())).thenReturn(Optional.of(indexSet));

final TimeRange absoluteRange = AbsoluteRange.create(now, now.plusDays(numberOfIndices + 1));

assertThat(searches.determineAffectedIndices(absoluteRange, null)).containsExactly(indexSet.getIndexWildcard());
}

@Test
public void determineAffectedIndicesWithRangesDoesNotIncludesDeflectorTargetIfMissing() throws Exception {
final DateTime now = DateTime.now(DateTimeZone.UTC);
Expand Down

0 comments on commit 3dc4ca1

Please sign in to comment.