Skip to content

Commit

Permalink
immutable map index integration (qdrant#2524)
Browse files Browse the repository at this point in the history
* immutable map index integration

* remove wipe

* fix unit tests

* get appendable flag from config

* minor refactoring

* fix chunked mmap appendable flag

---------

Co-authored-by: generall <[email protected]>
  • Loading branch information
IvanPleshkov and generall committed Sep 4, 2023
1 parent 68abd89 commit 8ef5152
Show file tree
Hide file tree
Showing 19 changed files with 54 additions and 86 deletions.
3 changes: 2 additions & 1 deletion lib/segment/benches/boolean_filtering.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ pub fn keyword_index_boolean_query_points(c: &mut Criterion) {
));
let id_tracker = Arc::new(AtomicRefCell::new(FixtureIdTracker::new(NUM_POINTS)));

let mut index = StructPayloadIndex::open(payload_storage, id_tracker, dir.path()).unwrap();
let mut index =
StructPayloadIndex::open(payload_storage, id_tracker, dir.path(), true).unwrap();

index
.set_indexed(BOOL_KEY, PayloadSchemaType::Keyword.into())
Expand Down
4 changes: 0 additions & 4 deletions lib/segment/src/fixtures/index_fixtures.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,6 @@ impl<TMetric: Metric> VectorStorage for TestRawScorerProducer<TMetric> {
fn deleted_vector_bitslice(&self) -> &BitSlice {
&self.deleted_vectors
}

fn is_appendable(&self) -> bool {
true
}
}

impl<TMetric> TestRawScorerProducer<TMetric>
Expand Down
2 changes: 1 addition & 1 deletion lib/segment/src/fixtures/payload_context_fixture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ pub fn create_struct_payload_index(
));
let id_tracker = Arc::new(AtomicRefCell::new(FixtureIdTracker::new(num_points)));

let mut index = StructPayloadIndex::open(payload_storage, id_tracker, path).unwrap();
let mut index = StructPayloadIndex::open(payload_storage, id_tracker, path, true).unwrap();

index
.set_indexed(STR_KEY, PayloadSchemaType::Keyword.into())
Expand Down
9 changes: 7 additions & 2 deletions lib/segment/src/index/field_index/index_selector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,19 @@ pub fn index_selector(
field: &str,
payload_schema: &PayloadFieldSchema,
db: Arc<RwLock<DB>>,
is_appendable: bool,
) -> Vec<FieldIndex> {
match payload_schema {
PayloadFieldSchema::FieldType(payload_type) => match payload_type {
PayloadSchemaType::Keyword => {
vec![FieldIndex::KeywordIndex(MapIndex::new(db, field))]
vec![FieldIndex::KeywordIndex(MapIndex::new(
db,
field,
is_appendable,
))]
}
PayloadSchemaType::Integer => vec![
FieldIndex::IntMapIndex(MapIndex::new(db.clone(), field)),
FieldIndex::IntMapIndex(MapIndex::new(db.clone(), field, is_appendable)),
FieldIndex::IntIndex(NumericIndex::<IntPayloadType>::new(db, field)),
],
PayloadSchemaType::Float => {
Expand Down
14 changes: 10 additions & 4 deletions lib/segment/src/index/field_index/map_index/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,12 @@ pub enum MapIndex<N: Hash + Eq + Clone + Display + FromStr> {
}

impl<N: Hash + Eq + Clone + Display + FromStr + Default> MapIndex<N> {
pub fn new(db: Arc<RwLock<DB>>, field_name: &str) -> Self {
MapIndex::Mutable(MutableMapIndex::new(db, field_name))
pub fn new(db: Arc<RwLock<DB>>, field_name: &str, is_appendable: bool) -> Self {
if is_appendable {
MapIndex::Mutable(MutableMapIndex::new(db, field_name))
} else {
MapIndex::Immutable(ImmutableMapIndex::new(db, field_name))
}
}

fn get_db_wrapper(&self) -> &DatabaseColumnWrapper {
Expand Down Expand Up @@ -551,7 +555,8 @@ mod tests {
data: &[Vec<N>],
path: &Path,
) {
let mut index = MapIndex::<N>::new(open_db_with_existing_cf(path).unwrap(), FIELD_NAME);
let mut index =
MapIndex::<N>::new(open_db_with_existing_cf(path).unwrap(), FIELD_NAME, true);
index.recreate().unwrap();
for (idx, values) in data.iter().enumerate() {
match &mut index {
Expand All @@ -568,7 +573,8 @@ mod tests {
data: &[Vec<N>],
path: &Path,
) {
let mut index = MapIndex::<N>::new(open_db_with_existing_cf(path).unwrap(), FIELD_NAME);
let mut index =
MapIndex::<N>::new(open_db_with_existing_cf(path).unwrap(), FIELD_NAME, true);
index.load_from_db().unwrap();
for (idx, values) in data.iter().enumerate() {
let index_values: HashSet<N> = HashSet::from_iter(
Expand Down
4 changes: 0 additions & 4 deletions lib/segment/src/index/hnsw_index/hnsw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -688,8 +688,4 @@ impl<TGraphLinks: GraphLinks> VectorIndex for HNSWIndex<TGraphLinks> {
.or_else(|| self.graph.as_ref().map(|graph| graph.num_points()))
.unwrap_or(0)
}

fn is_appendable(&self) -> bool {
false
}
}
3 changes: 0 additions & 3 deletions lib/segment/src/index/payload_index_base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,6 @@ pub trait PayloadIndex {
/// Drop all payload of the point
fn drop(&mut self, point_id: PointOffsetType) -> OperationResult<Option<Payload>>;

/// Completely drop payload. Pufff!
fn wipe(&mut self) -> OperationResult<()>;

/// Return function that forces persistence of current storage state.
fn flusher(&self) -> Flusher;

Expand Down
8 changes: 0 additions & 8 deletions lib/segment/src/index/plain_payload_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,10 +175,6 @@ impl PayloadIndex for PlainPayloadIndex {
unreachable!()
}

fn wipe(&mut self) -> OperationResult<()> {
unreachable!()
}

fn flusher(&self) -> Flusher {
unreachable!()
}
Expand Down Expand Up @@ -296,10 +292,6 @@ impl VectorIndex for PlainIndex {
fn indexed_vector_count(&self) -> usize {
0
}

fn is_appendable(&self) -> bool {
true
}
}

pub struct PlainFilterContext<'a> {
Expand Down
23 changes: 8 additions & 15 deletions lib/segment/src/index/struct_payload_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,11 @@ impl StructPayloadIndex {
self.config.save(&config_path)
}

fn load_all_fields(&mut self) -> OperationResult<()> {
fn load_all_fields(&mut self, is_appendable: bool) -> OperationResult<()> {
let mut field_indexes: IndexesMap = Default::default();

for (field, payload_schema) in &self.config.indexed_fields {
let field_index = self.load_from_db(field, payload_schema.to_owned())?;
let field_index = self.load_from_db(field, payload_schema.to_owned(), is_appendable)?;
field_indexes.insert(field.clone(), field_index);
}
self.field_indexes = field_indexes;
Expand All @@ -117,8 +117,9 @@ impl StructPayloadIndex {
&self,
field: PayloadKeyTypeRef,
payload_schema: PayloadFieldSchema,
is_appendable: bool,
) -> OperationResult<Vec<FieldIndex>> {
let mut indexes = index_selector(field, &payload_schema, self.db.clone());
let mut indexes = index_selector(field, &payload_schema, self.db.clone(), is_appendable);

let mut is_loaded = true;
for ref mut index in indexes.iter_mut() {
Expand All @@ -129,6 +130,7 @@ impl StructPayloadIndex {
}
if !is_loaded {
debug!("Index for `{field}` was not loaded. Building...");
// todo(ivan): decide what to do with indexes, which were not loaded
indexes = self.build_field_indexes(field, payload_schema)?;
}

Expand All @@ -139,6 +141,7 @@ impl StructPayloadIndex {
payload: Arc<AtomicRefCell<PayloadStorageEnum>>,
id_tracker: Arc<AtomicRefCell<IdTrackerSS>>,
path: &Path,
is_appendable: bool,
) -> OperationResult<Self> {
create_dir_all(path)?;
let config_path = PayloadConfig::get_config_path(path);
Expand Down Expand Up @@ -166,7 +169,7 @@ impl StructPayloadIndex {
index.save_config()?;
}

index.load_all_fields()?;
index.load_all_fields(is_appendable)?;

Ok(index)
}
Expand All @@ -177,7 +180,7 @@ impl StructPayloadIndex {
payload_schema: PayloadFieldSchema,
) -> OperationResult<Vec<FieldIndex>> {
let payload_storage = self.payload.borrow();
let mut field_indexes = index_selector(field, &payload_schema, self.db.clone());
let mut field_indexes = index_selector(field, &payload_schema, self.db.clone(), true);
for index in &field_indexes {
index.recreate()?;
}
Expand Down Expand Up @@ -513,16 +516,6 @@ impl PayloadIndex for StructPayloadIndex {
self.payload.borrow_mut().drop(point_id)
}

fn wipe(&mut self) -> OperationResult<()> {
self.payload.borrow_mut().wipe()?;
for (_, field_indexes) in self.field_indexes.iter_mut() {
for index in field_indexes.drain(..) {
index.clear()?;
}
}
self.load_all_fields()
}

fn flusher(&self) -> Flusher {
let mut flushers = Vec::new();
for field_indexes in self.field_indexes.values() {
Expand Down
11 changes: 0 additions & 11 deletions lib/segment/src/index/vector_index_base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,6 @@ pub trait VectorIndex {

/// The number of indexed vectors, currently accessible
fn indexed_vector_count(&self) -> usize;

/// Whether this vector index type support appending.
fn is_appendable(&self) -> bool;
}

pub enum VectorIndexEnum {
Expand Down Expand Up @@ -103,12 +100,4 @@ impl VectorIndex for VectorIndexEnum {
Self::HnswMmap(index) => index.indexed_vector_count(),
}
}

fn is_appendable(&self) -> bool {
match self {
Self::Plain(index) => index.is_appendable(),
Self::HnswRam(index) => index.is_appendable(),
Self::HnswMmap(index) => index.is_appendable(),
}
}
}
7 changes: 0 additions & 7 deletions lib/segment/src/segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,6 @@ pub struct VectorData {
}

impl VectorData {
/// Whether this vector data can be appended to
///
/// This requires an index and storage type that both support appending.
pub fn is_appendable(&self) -> bool {
self.vector_index.borrow().is_appendable() && self.vector_storage.borrow().is_appendable()
}

pub fn prefault_mmap_pages(&self) -> impl Iterator<Item = mmap_ops::PrefaultMmapPages> {
let index_task = match &*self.vector_index.borrow() {
VectorIndexEnum::HnswMmap(index) => index.prefault_mmap_pages(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,17 @@ fn create_segment(

let id_tracker = sp(SimpleIdTracker::open(database.clone())?);

let appendable_flag = config
.vector_data
.values()
.all(|vector_config| vector_config.is_appendable());

let payload_index_path = segment_path.join(PAYLOAD_INDEX_PATH);
let payload_index: Arc<AtomicRefCell<StructPayloadIndex>> = sp(StructPayloadIndex::open(
payload_storage,
id_tracker.clone(),
&payload_index_path,
appendable_flag,
)?);

let mut vector_data = HashMap::new();
Expand Down Expand Up @@ -177,7 +183,6 @@ fn create_segment(
} else {
SegmentType::Plain
};
let appendable_flag = vector_data.values().all(VectorData::is_appendable);

Ok(Segment {
version,
Expand Down
18 changes: 18 additions & 0 deletions lib/segment/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -691,6 +691,24 @@ pub struct VectorDataConfig {
pub quantization_config: Option<QuantizationConfig>,
}

impl VectorDataConfig {
/// Whether this vector data can be appended to
///
/// This requires an index and storage type that both support appending.
pub fn is_appendable(&self) -> bool {
let is_index_appendable = match self.index {
Indexes::Plain {} => true,
Indexes::Hnsw(_) => false,
};
let is_storage_appendable = match self.storage_type {
VectorStorageType::Memory => true,
VectorStorageType::Mmap => false,
VectorStorageType::ChunkedMmap => true,
};
is_index_appendable && is_storage_appendable
}
}

/// Default value based on <https://github.com/google-research/google-research/blob/master/scann/docs/algorithms.md>
pub const DEFAULT_FULL_SCAN_THRESHOLD: usize = 20_000;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,8 +200,4 @@ impl VectorStorage for AppendableMmapVectorStorage {
fn deleted_vector_bitslice(&self) -> &BitSlice {
self.deleted.get_bitslice()
}

fn is_appendable(&self) -> bool {
true
}
}
4 changes: 0 additions & 4 deletions lib/segment/src/vector_storage/memmap_vector_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,10 +224,6 @@ impl VectorStorage for MemmapVectorStorage {
fn deleted_vector_bitslice(&self) -> &BitSlice {
self.mmap_store.as_ref().unwrap().deleted_vector_bitslice()
}

fn is_appendable(&self) -> bool {
false
}
}

/// Open a file shortly for appending
Expand Down
4 changes: 0 additions & 4 deletions lib/segment/src/vector_storage/simple_vector_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,10 +242,6 @@ impl VectorStorage for SimpleVectorStorage {
fn deleted_vector_bitslice(&self) -> &BitSlice {
self.deleted.as_bitslice()
}

fn is_appendable(&self) -> bool {
true
}
}

/// Set deleted state in given bitvec.
Expand Down
11 changes: 0 additions & 11 deletions lib/segment/src/vector_storage/vector_storage_base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,6 @@ pub trait VectorStorage {
/// The size of this slice is not guaranteed. It may be smaller/larger than the number of
/// vectors in this segment.
fn deleted_vector_bitslice(&self) -> &BitSlice;

/// Whether this vector storage type support appending.
fn is_appendable(&self) -> bool;
}

pub enum VectorStorageEnum {
Expand Down Expand Up @@ -275,12 +272,4 @@ impl VectorStorage for VectorStorageEnum {
VectorStorageEnum::AppendableMemmap(v) => v.deleted_vector_bitslice(),
}
}

fn is_appendable(&self) -> bool {
match self {
VectorStorageEnum::Simple(v) => v.is_appendable(),
VectorStorageEnum::Memmap(v) => v.is_appendable(),
VectorStorageEnum::AppendableMemmap(v) => v.is_appendable(),
}
}
}
2 changes: 1 addition & 1 deletion lib/segment/tests/integration/nested_filtering_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ fn test_filtering_context_consistency() {
let id_tracker = Arc::new(AtomicRefCell::new(FixtureIdTracker::new(NUM_POINTS)));

let mut index =
StructPayloadIndex::open(wrapped_payload_storage, id_tracker, dir.path()).unwrap();
StructPayloadIndex::open(wrapped_payload_storage, id_tracker, dir.path(), true).unwrap();

index
.set_indexed("f", PayloadSchemaType::Integer.into())
Expand Down
2 changes: 1 addition & 1 deletion lib/segment/tests/integration/payload_index_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -764,7 +764,7 @@ fn test_update_payload_index_type() {
let id_tracker = Arc::new(AtomicRefCell::new(FixtureIdTracker::new(point_num)));

let mut index =
StructPayloadIndex::open(wrapped_payload_storage, id_tracker, dir.path()).unwrap();
StructPayloadIndex::open(wrapped_payload_storage, id_tracker, dir.path(), true).unwrap();

// set field to Integer type
index.set_indexed("field", Integer.into()).unwrap();
Expand Down

0 comments on commit 8ef5152

Please sign in to comment.