Skip to content

Commit

Permalink
Allow MV Field Support For Raw Columns in Text Indices (apache#7638)
Browse files Browse the repository at this point in the history
Using the new MV raw byte forward index, allow multi value fields to be supported in text indices.
  • Loading branch information
atris authored Oct 30, 2021
1 parent e814203 commit 5becf5b
Show file tree
Hide file tree
Showing 10 changed files with 280 additions and 262 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -166,9 +166,8 @@ public void init(SegmentGeneratorConfig segmentCreationSpec, SegmentIndexCreatio

Map<String, H3IndexConfig> h3IndexConfigs = _config.getH3IndexConfigs();
for (String columnName : h3IndexConfigs.keySet()) {
Preconditions
.checkState(schema.hasColumn(columnName), "Cannot create H3 index for column: %s because it is not in schema",
columnName);
Preconditions.checkState(schema.hasColumn(columnName),
"Cannot create H3 index for column: %s because it is not in schema", columnName);
}

// Initialize creators for dictionary, forward index and inverted index
Expand Down Expand Up @@ -206,8 +205,8 @@ public void init(SegmentGeneratorConfig segmentCreationSpec, SegmentIndexCreatio
int cardinality = indexCreationInfo.getDistinctValueCount();
if (fieldSpec.isSingleValueField()) {
if (indexCreationInfo.isSorted()) {
_forwardIndexCreatorMap
.put(columnName, new SingleValueSortedForwardIndexCreator(_indexDir, columnName, cardinality));
_forwardIndexCreatorMap.put(columnName,
new SingleValueSortedForwardIndexCreator(_indexDir, columnName, cardinality));
} else {
_forwardIndexCreatorMap.put(columnName,
new SingleValueUnsortedForwardIndexCreator(_indexDir, columnName, cardinality, _totalDocs));
Expand All @@ -221,8 +220,8 @@ public void init(SegmentGeneratorConfig segmentCreationSpec, SegmentIndexCreatio
// Initialize inverted index creator; skip creating inverted index if sorted
if (invertedIndexColumns.contains(columnName) && !indexCreationInfo.isSorted()) {
if (segmentCreationSpec.isOnHeap()) {
_invertedIndexCreatorMap
.put(columnName, new OnHeapBitmapInvertedIndexCreator(_indexDir, columnName, cardinality));
_invertedIndexCreatorMap.put(columnName,
new OnHeapBitmapInvertedIndexCreator(_indexDir, columnName, cardinality));
} else {
_invertedIndexCreatorMap.put(columnName,
new OffHeapBitmapInvertedIndexCreator(_indexDir, fieldSpec, cardinality, _totalDocs,
Expand Down Expand Up @@ -254,30 +253,28 @@ public void init(SegmentGeneratorConfig segmentCreationSpec, SegmentIndexCreatio

if (textIndexColumns.contains(columnName)) {
// Initialize text index creator
Preconditions.checkState(fieldSpec.isSingleValueField(),
"Text index is currently only supported on single-value columns");
Preconditions
.checkState(storedType == DataType.STRING, "Text index is currently only supported on STRING type columns");
_textIndexCreatorMap
.put(columnName, new LuceneTextIndexCreator(columnName, _indexDir, true /* commitOnClose */));
Preconditions.checkState(storedType == DataType.STRING,
"Text index is currently only supported on STRING type columns");
_textIndexCreatorMap.put(columnName,
new LuceneTextIndexCreator(columnName, _indexDir, true /* commitOnClose */));
}

if (fstIndexColumns.contains(columnName)) {
Preconditions.checkState(fieldSpec.isSingleValueField(),
"FST index is currently only supported on single-value columns");
Preconditions
.checkState(storedType == DataType.STRING, "FST index is currently only supported on STRING type columns");
Preconditions
.checkState(dictEnabledColumn, "FST index is currently only supported on dictionary-encoded columns");
Preconditions.checkState(storedType == DataType.STRING,
"FST index is currently only supported on STRING type columns");
Preconditions.checkState(dictEnabledColumn,
"FST index is currently only supported on dictionary-encoded columns");
_fstIndexCreatorMap.put(columnName, new LuceneFSTIndexCreator(_indexDir, columnName,
(String[]) indexCreationInfo.getSortedUniqueElementsArray()));
}

if (jsonIndexColumns.contains(columnName)) {
Preconditions.checkState(fieldSpec.isSingleValueField(),
"Json index is currently only supported on single-value columns");
Preconditions
.checkState(storedType == DataType.STRING, "Json index is currently only supported on STRING columns");
Preconditions.checkState(storedType == DataType.STRING,
"Json index is currently only supported on STRING columns");
JsonIndexCreator jsonIndexCreator =
segmentCreationSpec.isOnHeap() ? new OnHeapJsonIndexCreator(_indexDir, columnName)
: new OffHeapJsonIndexCreator(_indexDir, columnName);
Expand All @@ -286,8 +283,8 @@ public void init(SegmentGeneratorConfig segmentCreationSpec, SegmentIndexCreatio

H3IndexConfig h3IndexConfig = h3IndexConfigs.get(columnName);
if (h3IndexConfig != null) {
Preconditions
.checkState(fieldSpec.isSingleValueField(), "H3 index is currently only supported on single-value columns");
Preconditions.checkState(fieldSpec.isSingleValueField(),
"H3 index is currently only supported on single-value columns");
Preconditions.checkState(storedType == DataType.BYTES, "H3 index is currently only supported on BYTES columns");
H3IndexResolution resolution = h3IndexConfig.getResolution();
GeoSpatialIndexCreator h3IndexCreator =
Expand All @@ -308,8 +305,8 @@ public static boolean shouldDeriveNumDocsPerChunk(String columnName,
Map<String, Map<String, String>> columnProperties) {
if (columnProperties != null) {
Map<String, String> properties = columnProperties.get(columnName);
return properties != null && Boolean
.parseBoolean(properties.get(FieldConfig.DERIVE_NUM_DOCS_PER_CHUNK_RAW_INDEX_KEY));
return properties != null && Boolean.parseBoolean(
properties.get(FieldConfig.DERIVE_NUM_DOCS_PER_CHUNK_RAW_INDEX_KEY));
}
return false;
}
Expand Down Expand Up @@ -397,7 +394,22 @@ public void indexRow(GenericRow row)
// text-index
TextIndexCreator textIndexCreator = _textIndexCreatorMap.get(columnName);
if (textIndexCreator != null) {
textIndexCreator.add((String) columnValueToIndex);
if (fieldSpec.isSingleValueField()) {
textIndexCreator.add((String) columnValueToIndex);
} else {
Object[] values = (Object[]) columnValueToIndex;
int length = values.length;
if (values instanceof String[]) {
textIndexCreator.add((String[]) values, length);
} else {
String[] strings = new String[length];
for (int i = 0; i < length; i++) {
strings[i] = (String) values[i];
}
textIndexCreator.add(strings, length);
columnValueToIndex = strings;
}
}
}

if (fieldSpec.isSingleValueField()) {
Expand Down Expand Up @@ -461,85 +473,77 @@ public void indexRow(GenericRow row)
//dictionary encoded
int[] dictIds = dictionaryCreator.indexOfMV(columnValueToIndex);
forwardIndexCreator.putDictIdMV(dictIds);
DictionaryBasedInvertedIndexCreator invertedIndexCreator = _invertedIndexCreatorMap
.get(columnName);
DictionaryBasedInvertedIndexCreator invertedIndexCreator = _invertedIndexCreatorMap.get(columnName);
if (invertedIndexCreator != null) {
invertedIndexCreator.add(dictIds, dictIds.length);
}
} else {
// for text index on raw columns, check the config to determine if actual raw value should
// be stored or not
if (textIndexCreator != null && !shouldStoreRawValueForTextIndex(columnName)) {
Object value = _columnProperties.get(columnName)
.get(FieldConfig.TEXT_INDEX_RAW_VALUE);
Object value = _columnProperties.get(columnName).get(FieldConfig.TEXT_INDEX_RAW_VALUE);
if (value == null) {
value = FieldConfig.TEXT_INDEX_DEFAULT_RAW_VALUE;
}
if (forwardIndexCreator.getValueType().getStoredType() == DataType.STRING) {
columnValueToIndex = new String[] {String.valueOf(value)};
columnValueToIndex = new String[]{String.valueOf(value)};
} else if (forwardIndexCreator.getValueType().getStoredType() == DataType.BYTES) {
columnValueToIndex = new byte[][] {String.valueOf(value).getBytes(UTF_8)};
columnValueToIndex = new byte[][]{String.valueOf(value).getBytes(UTF_8)};
} else {
throw new RuntimeException("Text Index is only supported for STRING and BYTES stored type");
}
}
Object[] values = (Object[]) columnValueToIndex;
int length = values.length;
switch (forwardIndexCreator.getValueType()) {
case INT:
if (columnValueToIndex instanceof Object[]) {
int[] array = new int[((Object[]) columnValueToIndex).length];
for (int i = 0; i < array.length; i++) {
array[i] = (Integer) ((Object[]) columnValueToIndex)[i];
}
forwardIndexCreator.putIntMV(array);
int[] ints = new int[length];
for (int i = 0; i < length; i++) {
ints[i] = (Integer) values[i];
}
forwardIndexCreator.putIntMV(ints);
break;
case LONG:
if (columnValueToIndex instanceof Object[]) {
long[] array = new long[((Object[]) columnValueToIndex).length];
for (int i = 0; i < array.length; i++) {
array[i] = (Long) ((Object[]) columnValueToIndex)[i];
}
forwardIndexCreator.putLongMV(array);
long[] longs = new long[length];
for (int i = 0; i < length; i++) {
longs[i] = (Long) values[i];
}
forwardIndexCreator.putLongMV(longs);
break;
case FLOAT:
if (columnValueToIndex instanceof Object[]) {
float[] array = new float[((Object[]) columnValueToIndex).length];
for (int i = 0; i < array.length; i++) {
array[i] = (Float) ((Object[]) columnValueToIndex)[i];
}
forwardIndexCreator.putFloatMV(array);
float[] floats = new float[length];
for (int i = 0; i < length; i++) {
floats[i] = (Float) values[i];
}
forwardIndexCreator.putFloatMV(floats);
break;
case DOUBLE:
if (columnValueToIndex instanceof Object[]) {
double[] array = new double[((Object[]) columnValueToIndex).length];
for (int i = 0; i < array.length; i++) {
array[i] = (Double) ((Object[]) columnValueToIndex)[i];
}
forwardIndexCreator.putDoubleMV(array);
double[] doubles = new double[length];
for (int i = 0; i < length; i++) {
doubles[i] = (Double) values[i];
}
forwardIndexCreator.putDoubleMV(doubles);
break;
case STRING:
if (columnValueToIndex instanceof String[]) {
forwardIndexCreator.putStringMV((String[]) columnValueToIndex);
} else if (columnValueToIndex instanceof Object[]) {
String[] array = new String[((Object[]) columnValueToIndex).length];
for (int i = 0; i < array.length; i++) {
array[i] = (String) ((Object[]) columnValueToIndex)[i];
if (values instanceof String[]) {
forwardIndexCreator.putStringMV((String[]) values);
} else {
String[] strings = new String[length];
for (int i = 0; i < length; i++) {
strings[i] = (String) values[i];
}
forwardIndexCreator.putStringMV(array);
forwardIndexCreator.putStringMV(strings);
}
break;
case BYTES:
if (columnValueToIndex instanceof byte[][]) {
forwardIndexCreator.putBytesMV((byte[][]) columnValueToIndex);
} else if (columnValueToIndex instanceof Object[]) {
byte[][] array = new byte[((Object[]) columnValueToIndex).length][];
for (int i = 0; i < array.length; i++) {
array[i] = (byte[]) ((Object[]) columnValueToIndex)[i];
if (values instanceof byte[][]) {
forwardIndexCreator.putBytesMV((byte[][]) values);
} else {
byte[][] bytesArray = new byte[length][];
for (int i = 0; i < length; i++) {
bytesArray[i] = (byte[]) values[i];
}
forwardIndexCreator.putBytesMV(array);
forwardIndexCreator.putBytesMV(bytesArray);
}
break;
default:
Expand Down Expand Up @@ -740,8 +744,8 @@ public static void addColumnMetadataInfo(PropertiesConfiguration properties, Str
String.valueOf(columnIndexCreationInfo.getMaxNumberOfMultiValueElements()));
properties.setProperty(getKeyFor(column, TOTAL_NUMBER_OF_ENTRIES),
String.valueOf(columnIndexCreationInfo.getTotalNumberOfEntries()));
properties
.setProperty(getKeyFor(column, IS_AUTO_GENERATED), String.valueOf(columnIndexCreationInfo.isAutoGenerated()));
properties.setProperty(getKeyFor(column, IS_AUTO_GENERATED),
String.valueOf(columnIndexCreationInfo.isAutoGenerated()));

PartitionFunction partitionFunction = columnIndexCreationInfo.getPartitionFunction();
if (partitionFunction != null) {
Expand Down Expand Up @@ -871,17 +875,15 @@ public static ForwardIndexCreator getRawIndexCreatorForMVColumn(File file, Chunk
return new MultiValueVarByteRawIndexCreator(file, compressionType, column, totalDocs, dataType, writerVersion,
maxRowLengthInBytes, maxNumberOfMultiValueElements);
default:
throw new UnsupportedOperationException(
"Data type not supported for raw indexing: " + dataType);
throw new UnsupportedOperationException("Data type not supported for raw indexing: " + dataType);
}
}

@Override
public void close()
throws IOException {
FileUtils.close(Iterables
.concat(_dictionaryCreatorMap.values(), _forwardIndexCreatorMap.values(), _invertedIndexCreatorMap.values(),
_textIndexCreatorMap.values(), _fstIndexCreatorMap.values(), _jsonIndexCreatorMap.values(),
_h3IndexCreatorMap.values(), _nullValueVectorCreatorMap.values()));
FileUtils.close(Iterables.concat(_dictionaryCreatorMap.values(), _forwardIndexCreatorMap.values(),
_invertedIndexCreatorMap.values(), _textIndexCreatorMap.values(), _fstIndexCreatorMap.values(),
_jsonIndexCreatorMap.values(), _h3IndexCreatorMap.values(), _nullValueVectorCreatorMap.values()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ public void add(String document) {
}
}

@Override
public void add(String[] documents, int length) {
throw new UnsupportedOperationException("Multiple values not supported");
}

@Override
public void seal()
throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,26 @@ public void add(String document) {
}
}

@Override
public void add(String[] documents, int length) {
Document docToIndex = new Document();

// Whenever multiple fields with the same name appear in one document, both the
// inverted index and term vectors will logically append the tokens of the
// field to one another, in the order the fields were added.
for (int i = 0; i < length; i++) {
docToIndex.add(new TextField(_textColumn, documents[i], Field.Store.NO));
}
docToIndex.add(new StoredField(LUCENE_INDEX_DOC_ID_COLUMN_NAME, _nextDocId++));

try {
_indexWriter.addDocument(docToIndex);
} catch (Exception e) {
throw new RuntimeException(
"Caught exception while adding a new document to the Lucene index for column: " + _textColumn, e);
}
}

@Override
public void seal() {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@
import org.apache.pinot.segment.local.segment.index.readers.BaseImmutableDictionary;
import org.apache.pinot.segment.local.segment.index.readers.forward.FixedBitMVForwardIndexReader;
import org.apache.pinot.segment.local.segment.index.readers.forward.FixedBitSVForwardIndexReaderV2;
import org.apache.pinot.segment.local.segment.index.readers.forward.FixedByteChunkMVForwardIndexReader;
import org.apache.pinot.segment.local.segment.index.readers.forward.FixedByteChunkSVForwardIndexReader;
import org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkMVForwardIndexReader;
import org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkSVForwardIndexReader;
import org.apache.pinot.segment.local.segment.index.readers.sorted.SortedIndexReaderImpl;
import org.apache.pinot.segment.spi.ColumnMetadata;
Expand Down Expand Up @@ -70,9 +72,14 @@ public static ForwardIndexReader<?> getForwardIndexReader(SegmentDirectory.Reade
columnMetadata.getTotalNumberOfEntries(), columnMetadata.getBitsPerElement());
}
} else {
DataType dataType = columnMetadata.getDataType();
return dataType.isFixedWidth() ? new FixedByteChunkSVForwardIndexReader(dataBuffer, dataType)
: new VarByteChunkSVForwardIndexReader(dataBuffer, dataType);
DataType storedType = columnMetadata.getDataType().getStoredType();
if (columnMetadata.isSingleValue()) {
return storedType.isFixedWidth() ? new FixedByteChunkSVForwardIndexReader(dataBuffer, storedType)
: new VarByteChunkSVForwardIndexReader(dataBuffer, storedType);
} else {
return storedType.isFixedWidth() ? new FixedByteChunkMVForwardIndexReader(dataBuffer, storedType)
: new VarByteChunkMVForwardIndexReader(dataBuffer, storedType);
}
}
}

Expand Down
Loading

0 comments on commit 5becf5b

Please sign in to comment.