Skip to content

Commit

Permalink
update IncrementalIndex to support unsorted facts map that can be use…
Browse files Browse the repository at this point in the history
…d in groupBy merging to improve performance
  • Loading branch information
himanshug committed Mar 10, 2016
1 parent 1e49092 commit 02dfd5c
Show file tree
Hide file tree
Showing 8 changed files with 65 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ private IncrementalIndex makeIncIndex()
aggs,
false,
false,
true,
maxRows
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ public String apply(DimensionSpec input)
aggs.toArray(new AggregatorFactory[aggs.size()]),
false,
true,
true,
Math.min(query.getContextValue(CTX_KEY_MAX_RESULTS, config.getMaxResults()), config.getMaxResults()),
bufferPool
);
Expand All @@ -102,6 +103,7 @@ public String apply(DimensionSpec input)
aggs.toArray(new AggregatorFactory[aggs.size()]),
false,
true,
true,
Math.min(query.getContextValue(CTX_KEY_MAX_RESULTS, config.getMaxResults()), config.getMaxResults())
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -340,6 +341,7 @@ public int lookupId(String name)
private final AggregatorType[] aggs;
private final boolean deserializeComplexMetrics;
private final boolean reportParseExceptions;
private final boolean sortFacts;
private final Metadata metadata;

private final Map<String, MetricDesc> metricDescs;
Expand Down Expand Up @@ -374,7 +376,8 @@ public InputRow get()
public IncrementalIndex(
final IncrementalIndexSchema incrementalIndexSchema,
final boolean deserializeComplexMetrics,
final boolean reportParseExceptions
final boolean reportParseExceptions,
final boolean sortFacts
)
{
this.minTimestamp = incrementalIndexSchema.getMinTimestamp();
Expand All @@ -383,6 +386,7 @@ public IncrementalIndex(
this.rowTransformers = new CopyOnWriteArrayList<>();
this.deserializeComplexMetrics = deserializeComplexMetrics;
this.reportParseExceptions = reportParseExceptions;
this.sortFacts = sortFacts;

this.metadata = new Metadata().setAggregators(getCombiningAggregators(metrics));

Expand Down Expand Up @@ -441,7 +445,7 @@ private DimDim newDimDim(String dimension, ValueType type) {
// use newDimDim() to create a DimDim, makeDimDim() provides the subclass-specific implementation
protected abstract DimDim makeDimDim(String dimension, Object lock);

public abstract ConcurrentNavigableMap<TimeAndDims, Integer> getFacts();
public abstract ConcurrentMap<TimeAndDims, Integer> getFacts();

public abstract boolean canAppendRow();

Expand Down Expand Up @@ -673,12 +677,20 @@ public int size()

private long getMinTimeMillis()
{
return getFacts().firstKey().getTimestamp();
if (sortFacts) {
return ((ConcurrentNavigableMap<TimeAndDims, Integer>) getFacts()).firstKey().getTimestamp();
} else {
throw new UnsupportedOperationException("can't get minTime from unsorted facts data.");
}
}

private long getMaxTimeMillis()
{
return getFacts().lastKey().getTimestamp();
if (sortFacts) {
return ((ConcurrentNavigableMap<TimeAndDims, Integer>) getFacts()).lastKey().getTimestamp();
} else {
throw new UnsupportedOperationException("can't get maxTime from unsorted facts data.");
}
}

private int[] getDimVals(final DimDim dimLookup, final List<Comparable> dimValues)
Expand Down Expand Up @@ -831,7 +843,11 @@ public ColumnCapabilities getCapabilities(String column)

public ConcurrentNavigableMap<TimeAndDims, Integer> getSubMap(TimeAndDims start, TimeAndDims end)
{
return getFacts().subMap(start, end);
if (sortFacts) {
return ((ConcurrentNavigableMap<TimeAndDims, Integer>) getFacts()).subMap(start, end);
} else {
throw new UnsupportedOperationException("can't get subMap from unsorted facts data.");
}
}

public Metadata getMetadata()
Expand Down Expand Up @@ -862,7 +878,14 @@ public Iterable<Row> iterableWithPostAggregations(final List<PostAggregator> pos
public Iterator<Row> iterator()
{
final List<DimensionDesc> dimensions = getDimensions();
final ConcurrentNavigableMap<TimeAndDims, Integer> facts = descending ? getFacts().descendingMap() : getFacts();

Map<TimeAndDims, Integer> facts = null;
if (descending && sortFacts) {
facts = ((ConcurrentNavigableMap<TimeAndDims, Integer>) getFacts()).descendingMap();
} else {
facts = getFacts();
}

return Iterators.transform(
facts.entrySet().iterator(),
new Function<Map.Entry<TimeAndDims, Integer>, Row>()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicInteger;

Expand All @@ -51,7 +52,7 @@ public class OffheapIncrementalIndex extends IncrementalIndex<BufferAggregator>
private final List<ResourceHolder<ByteBuffer>> aggBuffers = new ArrayList<>();
private final List<int[]> indexAndOffsets = new ArrayList<>();

private final ConcurrentNavigableMap<TimeAndDims, Integer> facts;
private final ConcurrentMap<TimeAndDims, Integer> facts;

private final AtomicInteger indexIncrement = new AtomicInteger(0);

Expand All @@ -71,14 +72,20 @@ public OffheapIncrementalIndex(
IncrementalIndexSchema incrementalIndexSchema,
boolean deserializeComplexMetrics,
boolean reportParseExceptions,
boolean sortFacts,
int maxRowCount,
StupidPool<ByteBuffer> bufferPool
)
{
super(incrementalIndexSchema, deserializeComplexMetrics, reportParseExceptions);
super(incrementalIndexSchema, deserializeComplexMetrics, reportParseExceptions, sortFacts);
this.maxRowCount = maxRowCount;
this.bufferPool = bufferPool;
this.facts = new ConcurrentSkipListMap<>(dimsComparator());

if (sortFacts) {
this.facts = new ConcurrentSkipListMap<>(dimsComparator());
} else {
this.facts = new ConcurrentHashMap<>();
}

//check that stupid pool gives buffers that can hold at least one row's aggregators
ResourceHolder<ByteBuffer> bb = bufferPool.take();
Expand All @@ -100,6 +107,7 @@ public OffheapIncrementalIndex(
final AggregatorFactory[] metrics,
boolean deserializeComplexMetrics,
boolean reportParseExceptions,
boolean sortFacts,
int maxRowCount,
StupidPool<ByteBuffer> bufferPool
)
Expand All @@ -111,6 +119,7 @@ public OffheapIncrementalIndex(
.build(),
deserializeComplexMetrics,
reportParseExceptions,
sortFacts,
maxRowCount,
bufferPool
);
Expand All @@ -131,13 +140,14 @@ public OffheapIncrementalIndex(
.build(),
true,
true,
true,
maxRowCount,
bufferPool
);
}

@Override
public ConcurrentNavigableMap<TimeAndDims, Integer> getFacts()
public ConcurrentMap<TimeAndDims, Integer> getFacts()
{
return facts;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,11 @@
import io.druid.segment.FloatColumnSelector;
import io.druid.segment.LongColumnSelector;
import io.druid.segment.ObjectColumnSelector;
import io.druid.segment.column.ValueType;

import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicInteger;

Expand All @@ -48,7 +46,7 @@
public class OnheapIncrementalIndex extends IncrementalIndex<Aggregator>
{
private final ConcurrentHashMap<Integer, Aggregator[]> aggregators = new ConcurrentHashMap<>();
private final ConcurrentNavigableMap<TimeAndDims, Integer> facts;
private final ConcurrentMap<TimeAndDims, Integer> facts;
private final AtomicInteger indexIncrement = new AtomicInteger(0);
protected final int maxRowCount;
private volatile Map<String, ColumnSelectorFactory> selectors;
Expand All @@ -59,12 +57,18 @@ public OnheapIncrementalIndex(
IncrementalIndexSchema incrementalIndexSchema,
boolean deserializeComplexMetrics,
boolean reportParseExceptions,
boolean sortFacts,
int maxRowCount
)
{
super(incrementalIndexSchema, deserializeComplexMetrics, reportParseExceptions);
super(incrementalIndexSchema, deserializeComplexMetrics, reportParseExceptions, sortFacts);
this.maxRowCount = maxRowCount;
this.facts = new ConcurrentSkipListMap<>(dimsComparator());

if (sortFacts) {
this.facts = new ConcurrentSkipListMap<>(dimsComparator());
} else {
this.facts = new ConcurrentHashMap<>();
}
}

public OnheapIncrementalIndex(
Expand All @@ -73,6 +77,7 @@ public OnheapIncrementalIndex(
final AggregatorFactory[] metrics,
boolean deserializeComplexMetrics,
boolean reportParseExceptions,
boolean sortFacts,
int maxRowCount
)
{
Expand All @@ -83,6 +88,7 @@ public OnheapIncrementalIndex(
.build(),
deserializeComplexMetrics,
reportParseExceptions,
sortFacts,
maxRowCount
);
}
Expand All @@ -101,6 +107,7 @@ public OnheapIncrementalIndex(
.build(),
true,
true,
true,
maxRowCount
);
}
Expand All @@ -111,11 +118,11 @@ public OnheapIncrementalIndex(
int maxRowCount
)
{
this(incrementalIndexSchema, true, reportParseExceptions, maxRowCount);
this(incrementalIndexSchema, true, reportParseExceptions, true, maxRowCount);
}

@Override
public ConcurrentNavigableMap<TimeAndDims, Integer> getFacts()
public ConcurrentMap<TimeAndDims, Integer> getFacts()
{
return facts;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ public static void setupClass() throws Exception
},
true,
true,
true,
5000
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@
import io.druid.segment.Segment;
import io.druid.segment.TestHelper;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IndexSizeExceededException;
import io.druid.segment.incremental.OnheapIncrementalIndex;
import org.apache.commons.io.IOUtils;
import org.apache.commons.io.LineIterator;
Expand Down Expand Up @@ -311,15 +310,15 @@ public void createIndex(
List<File> toMerge = new ArrayList<>();

try {
index = new OnheapIncrementalIndex(minTimestamp, gran, metrics, deserializeComplexMetrics, true, maxRowCount);
index = new OnheapIncrementalIndex(minTimestamp, gran, metrics, deserializeComplexMetrics, true, true, maxRowCount);
while (rows.hasNext()) {
Object row = rows.next();
if (!index.canAppendRow()) {
File tmp = tempFolder.newFolder();
toMerge.add(tmp);
indexMerger.persist(index, tmp, new IndexSpec());
index.close();
index = new OnheapIncrementalIndex(minTimestamp, gran, metrics, deserializeComplexMetrics, true, maxRowCount);
index = new OnheapIncrementalIndex(minTimestamp, gran, metrics, deserializeComplexMetrics, true, true, maxRowCount);
}
if (row instanceof String && parser instanceof StringInputRowParser) {
//Note: this is required because StringInputRowParser is InputRowParser<ByteBuffer> as opposed to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ private void createTestIndex(File segmentDir) throws Exception

IncrementalIndex index = null;
try {
index = new OnheapIncrementalIndex(0, QueryGranularity.NONE, aggregators, true, true, 5000);
index = new OnheapIncrementalIndex(0, QueryGranularity.NONE, aggregators, true, true, true, 5000);
for (String line : rows) {
index.add(parser.parse(line));
}
Expand Down

0 comments on commit 02dfd5c

Please sign in to comment.