Skip to content

Commit

Permalink
Support Native FST As An Index Subtype for FST Indices (apache#7729)
Browse files Browse the repository at this point in the history
This PR introduces the notion of subtypes to FST index -- allowing users to set a segment level flag indicating whether the index should be built using native FST or Lucene FST.
  • Loading branch information
atris authored Nov 12, 2021
1 parent 068549c commit 781f5cd
Show file tree
Hide file tree
Showing 13 changed files with 204 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.response.broker.AggregationResult;
import org.apache.pinot.common.response.broker.BrokerResponseNative;
Expand All @@ -44,6 +45,7 @@
import org.apache.pinot.segment.spi.ImmutableSegment;
import org.apache.pinot.segment.spi.IndexSegment;
import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
import org.apache.pinot.spi.config.table.FSTType;
import org.apache.pinot.spi.config.table.FieldConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
Expand All @@ -69,8 +71,6 @@ public class FSTBasedRegexpLikeQueriesTest extends BaseQueriesTest {
private static final Integer INT_BASE_VALUE = 1000;
private static final Integer NUM_ROWS = 1024;

private final List<GenericRow> _rows = new ArrayList<>();

private IndexSegment _indexSegment;
private List<IndexSegment> _indexSegments;

Expand All @@ -94,19 +94,25 @@ public void setUp()
throws Exception {
FileUtils.deleteQuietly(INDEX_DIR);

buildSegment();
IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig();
Set<String> fstIndexCols = new HashSet<>();
fstIndexCols.add(DOMAIN_NAMES_COL);
indexLoadingConfig.setFSTIndexColumns(fstIndexCols);

Set<String> invertedIndexCols = new HashSet<>();
invertedIndexCols.add(DOMAIN_NAMES_COL);
indexLoadingConfig.setInvertedIndexColumns(invertedIndexCols);
ImmutableSegment immutableSegment =
ImmutableSegmentLoader.load(new File(INDEX_DIR, SEGMENT_NAME), indexLoadingConfig);
_indexSegment = immutableSegment;
_indexSegments = Arrays.asList(immutableSegment, immutableSegment);
List<IndexSegment> segments = new ArrayList<>();
for (FSTType fstType : Arrays.asList(FSTType.LUCENE, FSTType.NATIVE)) {
buildSegment(fstType);

IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig();
Set<String> fstIndexCols = new HashSet<>();
fstIndexCols.add(DOMAIN_NAMES_COL);
indexLoadingConfig.setFSTIndexColumns(fstIndexCols);
indexLoadingConfig.setFSTIndexType(fstType);
Set<String> invertedIndexCols = new HashSet<>();
invertedIndexCols.add(DOMAIN_NAMES_COL);
indexLoadingConfig.setInvertedIndexColumns(invertedIndexCols);
ImmutableSegment segment = ImmutableSegmentLoader.load(new File(INDEX_DIR, SEGMENT_NAME), indexLoadingConfig);

segments.add(segment);
}

_indexSegment = segments.get(ThreadLocalRandom.current().nextInt(2));
_indexSegments = segments;
}

@AfterClass
Expand Down Expand Up @@ -151,7 +157,7 @@ private List<GenericRow> createTestData(int numRows)
return rows;
}

private void buildSegment()
private void buildSegment(FSTType fstType)
throws Exception {
List<GenericRow> rows = createTestData(NUM_ROWS);
List<FieldConfig> fieldConfigs = new ArrayList<>();
Expand All @@ -171,6 +177,7 @@ private void buildSegment()
config.setOutDir(INDEX_DIR.getPath());
config.setTableName(TABLE_NAME);
config.setSegmentName(SEGMENT_NAME);
config.setFSTIndexType(fstType);

SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl();
try (RecordReader recordReader = new GenericRowRecordReader(rows)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.pinot.segment.local.segment.creator.impl.nullvalue.NullValueVectorCreator;
import org.apache.pinot.segment.local.segment.creator.impl.text.LuceneTextIndexCreator;
import org.apache.pinot.segment.local.utils.GeometrySerializer;
import org.apache.pinot.segment.local.utils.nativefst.NativeFSTIndexCreator;
import org.apache.pinot.segment.spi.V1Constants;
import org.apache.pinot.segment.spi.compression.ChunkCompressionType;
import org.apache.pinot.segment.spi.creator.ColumnIndexCreationInfo;
Expand All @@ -66,6 +67,7 @@
import org.apache.pinot.segment.spi.index.creator.TextIndexType;
import org.apache.pinot.segment.spi.index.reader.H3IndexResolution;
import org.apache.pinot.segment.spi.partition.PartitionFunction;
import org.apache.pinot.spi.config.table.FSTType;
import org.apache.pinot.spi.config.table.FieldConfig;
import org.apache.pinot.spi.data.DateTimeFieldSpec;
import org.apache.pinot.spi.data.DateTimeFormatSpec;
Expand Down Expand Up @@ -266,8 +268,15 @@ public void init(SegmentGeneratorConfig segmentCreationSpec, SegmentIndexCreatio
"FST index is currently only supported on STRING type columns");
Preconditions.checkState(dictEnabledColumn,
"FST index is currently only supported on dictionary-encoded columns");
_fstIndexCreatorMap.put(columnName, new LuceneFSTIndexCreator(_indexDir, columnName,
(String[]) indexCreationInfo.getSortedUniqueElementsArray()));
String[] sortedValues = (String[]) indexCreationInfo.getSortedUniqueElementsArray();
TextIndexCreator textIndexCreator;
if (_config.getFSTIndexType() == FSTType.NATIVE) {
textIndexCreator = new NativeFSTIndexCreator(_indexDir, columnName, sortedValues);
} else {
textIndexCreator = new LuceneFSTIndexCreator(_indexDir, columnName, sortedValues);
}

_fstIndexCreatorMap.put(columnName, textIndexCreator);
}

if (jsonIndexColumns.contains(columnName)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@
import org.apache.pinot.segment.local.segment.index.readers.json.ImmutableJsonIndexReader;
import org.apache.pinot.segment.local.segment.index.readers.sorted.SortedIndexReaderImpl;
import org.apache.pinot.segment.local.segment.index.readers.text.LuceneTextIndexReader;
import org.apache.pinot.segment.local.utils.nativefst.FSTHeader;
import org.apache.pinot.segment.local.utils.nativefst.NativeFSTIndexReader;
import org.apache.pinot.segment.spi.ColumnMetadata;
import org.apache.pinot.segment.spi.index.column.ColumnIndexContainer;
import org.apache.pinot.segment.spi.index.reader.BloomFilterReader;
Expand Down Expand Up @@ -176,7 +178,13 @@ public PhysicalColumnIndexContainer(SegmentDirectory.Reader segmentReader, Colum
}

if (loadFSTIndex) {
_fstIndex = new LuceneFSTIndexReader(segmentReader.getIndexFor(columnName, ColumnIndexType.FST_INDEX));
PinotDataBuffer buffer = segmentReader.getIndexFor(columnName, ColumnIndexType.FST_INDEX);
int magicHeader = buffer.getInt(0);
if (magicHeader == FSTHeader.FST_MAGIC) {
_fstIndex = new NativeFSTIndexReader(buffer);
} else {
_fstIndex = new LuceneFSTIndexReader(buffer);
}
} else {
_fstIndex = null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@

import java.io.File;
import org.apache.pinot.segment.local.segment.index.loader.bloomfilter.BloomFilterHandler;
import org.apache.pinot.segment.local.segment.index.loader.invertedindex.FSTIndexHandler;
import org.apache.pinot.segment.local.segment.index.loader.invertedindex.H3IndexHandler;
import org.apache.pinot.segment.local.segment.index.loader.invertedindex.InvertedIndexHandler;
import org.apache.pinot.segment.local.segment.index.loader.invertedindex.JsonIndexHandler;
import org.apache.pinot.segment.local.segment.index.loader.invertedindex.LuceneFSTIndexHandler;
import org.apache.pinot.segment.local.segment.index.loader.invertedindex.RangeIndexHandler;
import org.apache.pinot.segment.local.segment.index.loader.invertedindex.TextIndexHandler;
import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
Expand All @@ -48,7 +48,8 @@ public static IndexHandler getIndexHandler(ColumnIndexType type, File indexDir,
case TEXT_INDEX:
return new TextIndexHandler(indexDir, segmentMetadata, indexLoadingConfig, segmentWriter);
case FST_INDEX:
return new LuceneFSTIndexHandler(indexDir, segmentMetadata, indexLoadingConfig, segmentWriter);
return new FSTIndexHandler(indexDir, segmentMetadata, indexLoadingConfig, segmentWriter,
indexLoadingConfig.getFSTIndexType());
case JSON_INDEX:
return new JsonIndexHandler(indexDir, segmentMetadata, indexLoadingConfig, segmentWriter);
case H3_INDEX:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderRegistry;
import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig;
import org.apache.pinot.spi.config.table.BloomFilterConfig;
import org.apache.pinot.spi.config.table.FSTType;
import org.apache.pinot.spi.config.table.FieldConfig;
import org.apache.pinot.spi.config.table.IndexingConfig;
import org.apache.pinot.spi.config.table.StarTreeIndexConfig;
Expand All @@ -57,6 +58,7 @@ public class IndexLoadingConfig {
private int _rangeIndexVersion = IndexingConfig.DEFAULT_RANGE_INDEX_VERSION;
private Set<String> _textIndexColumns = new HashSet<>();
private Set<String> _fstIndexColumns = new HashSet<>();
private FSTType _fstIndexType = FSTType.LUCENE;
private Set<String> _jsonIndexColumns = new HashSet<>();
private Map<String, H3IndexConfig> _h3IndexConfigs = new HashMap<>();
private Set<String> _noDictionaryColumns = new HashSet<>(); // TODO: replace this by _noDictionaryConfig.
Expand Down Expand Up @@ -107,7 +109,6 @@ private void extractFromTableConfig(TableConfig tableConfig) {
if (invertedIndexColumns != null) {
_invertedIndexColumns.addAll(invertedIndexColumns);
}
_rangeIndexVersion = indexingConfig.getRangeIndexVersion();

List<String> jsonIndexColumns = indexingConfig.getJsonIndexColumns();
if (jsonIndexColumns != null) {
Expand All @@ -119,6 +120,10 @@ private void extractFromTableConfig(TableConfig tableConfig) {
_rangeIndexColumns.addAll(rangeIndexColumns);
}

_rangeIndexVersion = indexingConfig.getRangeIndexVersion();

_fstIndexType = indexingConfig.getFSTIndexType();

List<String> bloomFilterColumns = indexingConfig.getBloomFilterColumns();
if (bloomFilterColumns != null) {
for (String bloomFilterColumn : bloomFilterColumns) {
Expand Down Expand Up @@ -285,6 +290,10 @@ public int getRangeIndexVersion() {
return _rangeIndexVersion;
}

public FSTType getFSTIndexType() {
return _fstIndexType;
}

/**
* Used in two places:
* (1) In {@link PhysicalColumnIndexContainer} to create the index loading info for immutable segments
Expand Down Expand Up @@ -349,6 +358,11 @@ public void setFSTIndexColumns(Set<String> fstIndexColumns) {
_fstIndexColumns = fstIndexColumns;
}

@VisibleForTesting
public void setFSTIndexType(FSTType fstType) {
_fstIndexType = fstType;
}

@VisibleForTesting
public void setJsonIndexColumns(Set<String> jsonIndexColumns) {
_jsonIndexColumns = jsonIndexColumns;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,15 @@
import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
import org.apache.pinot.segment.local.segment.index.loader.LoaderUtils;
import org.apache.pinot.segment.local.segment.index.loader.SegmentPreProcessor;
import org.apache.pinot.segment.local.utils.nativefst.NativeFSTIndexCreator;
import org.apache.pinot.segment.spi.ColumnMetadata;
import org.apache.pinot.segment.spi.SegmentMetadata;
import org.apache.pinot.segment.spi.creator.SegmentVersion;
import org.apache.pinot.segment.spi.index.creator.TextIndexCreator;
import org.apache.pinot.segment.spi.index.reader.Dictionary;
import org.apache.pinot.segment.spi.store.ColumnIndexType;
import org.apache.pinot.segment.spi.store.SegmentDirectory;
import org.apache.pinot.spi.config.table.FSTType;
import org.apache.pinot.spi.data.FieldSpec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -60,20 +63,22 @@
* added column. In this case, the default column handler would have taken care of adding
* dictionary for the new column. Read the dictionary to create FST index.
*/
public class LuceneFSTIndexHandler implements IndexHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(LuceneFSTIndexHandler.class);
public class FSTIndexHandler implements IndexHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(FSTIndexHandler.class);

private final File _indexDir;
private final SegmentMetadata _segmentMetadata;
private final SegmentDirectory.Writer _segmentWriter;
private final Set<String> _columnsToAddIdx;
private final FSTType _fstType;

public LuceneFSTIndexHandler(File indexDir, SegmentMetadata segmentMetadata, IndexLoadingConfig indexLoadingConfig,
SegmentDirectory.Writer segmentWriter) {
public FSTIndexHandler(File indexDir, SegmentMetadata segmentMetadata, IndexLoadingConfig indexLoadingConfig,
SegmentDirectory.Writer segmentWriter, FSTType fstType) {
_indexDir = indexDir;
_segmentMetadata = segmentMetadata;
_segmentWriter = segmentWriter;
_columnsToAddIdx = new HashSet<>(indexLoadingConfig.getFSTIndexColumns());
_fstType = fstType;
}

@Override
Expand Down Expand Up @@ -130,13 +135,20 @@ private void createFSTIndexForColumn(ColumnMetadata columnMetadata)

LOGGER.info("Creating new FST index for column: {} in segment: {}, cardinality: {}", column, segmentName,
columnMetadata.getCardinality());
LuceneFSTIndexCreator luceneFSTIndexCreator = new LuceneFSTIndexCreator(_indexDir, column, null);

TextIndexCreator fstIndexCreator;
if (_fstType == FSTType.LUCENE) {
fstIndexCreator = new LuceneFSTIndexCreator(_indexDir, column, null);
} else {
fstIndexCreator = new NativeFSTIndexCreator(_indexDir, column, null);
}

try (Dictionary dictionary = LoaderUtils.getDictionary(_segmentWriter, columnMetadata)) {
for (int dictId = 0; dictId < dictionary.length(); dictId++) {
luceneFSTIndexCreator.add(dictionary.getStringValue(dictId));
fstIndexCreator.add(dictionary.getStringValue(dictId));
}
}
luceneFSTIndexCreator.seal();
fstIndexCreator.seal();

// For v3, write the generated range index file into the single file and remove it.
if (_segmentMetadata.getVersion() == SegmentVersion.v3) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public final class FSTHeader {
/**
* FST magic (4 bytes).
*/
final static int FST_MAGIC = ('\\' << 24) | ('f' << 16) | ('s' << 8) | ('a');
public static final int FST_MAGIC = ('\\' << 24) | ('f' << 16) | ('s' << 8) | 'a';

/** FST version number. */
final byte _version;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public class NativeFSTIndexCreator implements TextIndexCreator {
* @throws IOException
*/
public NativeFSTIndexCreator(File indexDir, String columnName, String[] sortedEntries) {
_fstIndexFile = new File(indexDir, columnName + V1Constants.Indexes.NATIVE_FST_INDEX_FILE_EXTENSION);
_fstIndexFile = new File(indexDir, columnName + V1Constants.Indexes.FST_INDEX_FILE_EXTENSION);

_fstBuilder = new FSTBuilder();
_dictId = 0;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.segment.local.segment.index.creator;

import java.io.File;
import java.io.IOException;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.segment.local.utils.nativefst.NativeFSTIndexCreator;
import org.apache.pinot.segment.local.utils.nativefst.NativeFSTIndexReader;
import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

import static org.apache.pinot.segment.spi.V1Constants.Indexes.FST_INDEX_FILE_EXTENSION;


public class NativeFSTIndexCreatorTest {
private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), "NativeFSTIndexCreatorTest");

@BeforeClass
public void setUp()
throws IOException {
FileUtils.forceMkdir(INDEX_DIR);
}

@AfterClass
public void tearDown()
throws IOException {
FileUtils.deleteDirectory(INDEX_DIR);
}

@Test
public void testIndexWriterReader()
throws IOException {
String[] uniqueValues = new String[3];
uniqueValues[0] = "hello-world";
uniqueValues[1] = "hello-world123";
uniqueValues[2] = "still";

try (NativeFSTIndexCreator creator = new NativeFSTIndexCreator(INDEX_DIR, "testFSTColumn", uniqueValues)) {
creator.seal();
}

File fstFile = new File(INDEX_DIR, "testFSTColumn" + FST_INDEX_FILE_EXTENSION);
try (PinotDataBuffer dataBuffer = PinotDataBuffer.mapReadOnlyBigEndianFile(fstFile);
NativeFSTIndexReader reader = new NativeFSTIndexReader(dataBuffer)) {

int[] matchedDictIds = reader.getDictIds("hello.*").toArray();
Assert.assertEquals(2, matchedDictIds.length);
Assert.assertEquals(0, matchedDictIds[0]);
Assert.assertEquals(1, matchedDictIds[1]);

matchedDictIds = reader.getDictIds(".*llo").toArray();
Assert.assertEquals(0, matchedDictIds.length);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ public static class Indexes {
public static final String BITMAP_INVERTED_INDEX_FILE_EXTENSION = ".bitmap.inv";
public static final String BITMAP_RANGE_INDEX_FILE_EXTENSION = ".bitmap.range";
public static final String FST_INDEX_FILE_EXTENSION = ".lucene.fst";
public static final String NATIVE_FST_INDEX_FILE_EXTENSION = ".native.fst";
public static final String JSON_INDEX_FILE_EXTENSION = ".json.idx";
public static final String H3_INDEX_FILE_EXTENSION = ".h3.idx";
public static final String BLOOM_FILTER_FILE_EXTENSION = ".bloom";
Expand Down
Loading

0 comments on commit 781f5cd

Please sign in to comment.