From eab0ac83baffa257d3ffa120253a2f6ec71362aa Mon Sep 17 00:00:00 2001 From: Andrey Vasnetsov Date: Tue, 8 Dec 2020 16:55:09 +0100 Subject: [PATCH] attempt to replace sled for storing vectors --- Cargo.lock | 2 + benches/service/collection_stress.js | 5 +- benches/service/run.sh | 2 +- lib/segment/Cargo.toml | 2 + lib/segment/src/id_mapper/simple_id_mapper.rs | 9 ++- .../payload_storage/simple_payload_storage.rs | 7 +- .../vector_storage/memmap_vector_storage.rs | 6 +- lib/segment/src/vector_storage/mod.rs | 1 + .../persisted_vector_storage.rs | 80 +++++++++++++++++++ .../vector_storage/simple_vector_storage.rs | 15 +++- lib/storage/src/content_manager/toc.rs | 38 +++++---- 11 files changed, 140 insertions(+), 27 deletions(-) create mode 100644 lib/segment/src/vector_storage/persisted_vector_storage.rs diff --git a/Cargo.lock b/Cargo.lock index 727640cd3fa..3044d4bc084 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2075,6 +2075,8 @@ dependencies = [ "atomic_refcell", "atomicwrites", "bincode", + "env_logger 0.7.1", + "log 0.4.11", "memmap 0.7.0", "ordered-float", "rmp-serde", diff --git a/benches/service/collection_stress.js b/benches/service/collection_stress.js index f9d5b711d48..1ed967924d4 100644 --- a/benches/service/collection_stress.js +++ b/benches/service/collection_stress.js @@ -5,7 +5,8 @@ let host = 'http://localhost:6333' let collection_name = 'stress_collection'; -let vector_length = 1000; +let vector_length = 128; +let vectors_per_batch = 32; var create_collection_payload = JSON.stringify({ "create_collection": { @@ -113,7 +114,7 @@ export default function () { var payload = JSON.stringify({ "upsert_points": { - "points": Array.from({ length: 64 }, () => generate_point()), + "points": Array.from({ length: vectors_per_batch }, () => generate_point()), } }); diff --git a/benches/service/run.sh b/benches/service/run.sh index 600685a0867..f4f53cb5b79 100644 --- a/benches/service/run.sh +++ b/benches/service/run.sh @@ -4,6 +4,6 @@ DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )" -docker run --network=host -i loadimpact/k6 run -u 10 -i 100 --rps 10 - <"$DIR/collection_stress.js" +docker run --network=host -i loadimpact/k6 run -u 10 -i 500000 - <"$DIR/collection_stress.js" diff --git a/lib/segment/Cargo.toml b/lib/segment/Cargo.toml index 903477c0a22..86dcdf40797 100644 --- a/lib/segment/Cargo.toml +++ b/lib/segment/Cargo.toml @@ -24,3 +24,5 @@ atomic_refcell = "0.1.6" atomicwrites = "0.2.5" memmap = "0.7.0" schemars = "0.8.0" +log = "0.4" +env_logger = "0.7.1" diff --git a/lib/segment/src/id_mapper/simple_id_mapper.rs b/lib/segment/src/id_mapper/simple_id_mapper.rs index 6969bb06d58..df5f299c358 100644 --- a/lib/segment/src/id_mapper/simple_id_mapper.rs +++ b/lib/segment/src/id_mapper/simple_id_mapper.rs @@ -2,10 +2,13 @@ use std::collections::HashMap; use crate::types::{PointOffsetType, PointIdType}; use crate::id_mapper::id_mapper::IdMapper; use crate::entry::entry_point::OperationResult; -use sled::Db; +use sled::{Db, Config}; use bincode; use std::path::Path; +/// Since sled is used for reading only during the initialization, large read cache is not required +const SLED_CACHE_SIZE: u64 = 10 * 1024 * 1024; // 10 mb + pub struct SimpleIdMapper { internal_to_external: HashMap, external_to_internal: HashMap, @@ -15,7 +18,9 @@ pub struct SimpleIdMapper { impl SimpleIdMapper { pub fn open(path: &Path) -> Self { - let store = sled::open(path).unwrap(); + let store = Config::new() + .cache_capacity(SLED_CACHE_SIZE) + .path(path).open().unwrap(); let mut internal_to_external: HashMap = Default::default(); let mut external_to_internal: HashMap = Default::default(); diff --git a/lib/segment/src/payload_storage/simple_payload_storage.rs b/lib/segment/src/payload_storage/simple_payload_storage.rs index b680cb5f141..c951331433a 100644 --- a/lib/segment/src/payload_storage/simple_payload_storage.rs +++ b/lib/segment/src/payload_storage/simple_payload_storage.rs @@ -4,9 +4,12 @@ use crate::types::{PayloadKeyType, PayloadType, PointOffsetType, TheMap}; use std::collections::{HashMap}; use crate::entry::entry_point::OperationResult; -use sled::Db; +use sled::{Db, Config}; use std::path::Path; +/// Since sled is used for reading only during the initialization, large read cache is not required +const SLED_CACHE_SIZE: u64 = 10 * 1024 * 1024; // 10 mb + pub struct SimplePayloadStorage { payload: HashMap>, store: Db, @@ -15,7 +18,7 @@ pub struct SimplePayloadStorage { impl SimplePayloadStorage { pub fn open(path: &Path) -> Self { - let store = sled::open(path).unwrap(); + let store = Config::new().cache_capacity(SLED_CACHE_SIZE).path(path).open().unwrap(); let mut payload_map: HashMap> = Default::default(); diff --git a/lib/segment/src/vector_storage/memmap_vector_storage.rs b/lib/segment/src/vector_storage/memmap_vector_storage.rs index 9797a5cc1ea..47424279aa7 100644 --- a/lib/segment/src/vector_storage/memmap_vector_storage.rs +++ b/lib/segment/src/vector_storage/memmap_vector_storage.rs @@ -209,8 +209,10 @@ impl VectorStorage for MemmapVectorStorage { let mmap = self.deleted_mmap.as_mut().unwrap(); let flag = mmap.get_mut(key + HEADER_SIZE).unwrap(); - *flag = 1; - self.deleted_count += 1; + if *flag == 0 { + *flag = 1; + self.deleted_count += 1; + } } Ok(()) } diff --git a/lib/segment/src/vector_storage/mod.rs b/lib/segment/src/vector_storage/mod.rs index c678b1d1b01..1d2ee0fad41 100644 --- a/lib/segment/src/vector_storage/mod.rs +++ b/lib/segment/src/vector_storage/mod.rs @@ -1,3 +1,4 @@ pub mod vector_storage; pub mod simple_vector_storage; pub mod memmap_vector_storage; +mod persisted_vector_storage; diff --git a/lib/segment/src/vector_storage/persisted_vector_storage.rs b/lib/segment/src/vector_storage/persisted_vector_storage.rs new file mode 100644 index 00000000000..5a42b32b116 --- /dev/null +++ b/lib/segment/src/vector_storage/persisted_vector_storage.rs @@ -0,0 +1,80 @@ +use std::fs::OpenOptions; +use std::path::{Path, PathBuf}; +use memmap::{Mmap, MmapMut}; + +use crate::entry::entry_point::OperationResult; +use std::intrinsics::size_of; +use std::ops; + + +const MAX_SEGMENT_SIZE: usize = 32 * 1024 * 1024; // 32 mb +const SEGMENT_MAGIC: &'static [u8; 4] = b"vect"; + +const SEGMENT_HEADER_SIZE: usize = size_of:: + SEGMENT_MAGIC.len(); + + + +/// Storage for fixed-length records +/// +struct Segment { + mmap: MmapMut, + path: PathBuf, + + record_size: usize, + /// Number of records which could be stored in total in this segment + capacity: usize, + /// Number of currently stored vectors + occupancy: usize, + unflashed: uszie +} + +impl Segment { + pub fn new(path: &Path, record_size: usize) -> OperationResult { + let max_records = MAX_SEGMENT_SIZE / record_size; + let storage_size = max_records * record_size; + + let file = OpenOptions::new().read(true).write(true).create(true).open(&path)?; + file.allocate(storage_size as u64)?; + + let mmap = unsafe { MmapMut::map_mut(&file)? }; + + Ok(Segment { + mmap, + path: path.to_owned(), + record_size, + capacity: max_records, + occupancy: 0, + unflashed: 0 + }) + } + + pub fn append(&mut self, record: &T) -> OperationResult where T: ops::Deref { + self.unflashed += 1; + unimplemented!() + } + + pub fn update(&mut self, idx: usize, record: &T) -> OperationResult<()> where T: ops::Deref { + self.unflashed += 1; + unimplemented!() + } + + /// Flushes recently written entries to durable storage. + pub fn flush(&mut self) -> OperationResult<()> { + if self.unflashed == 0 { + Ok(()) + } else { + self.mmap.flush()?; + self.unflashed = 0; + } + } + + pub fn read(&self, idx: usize) -> &[u8] { + unimplemented!() + } +} + +struct PersistedVectorStorage { + storage_path: PathBuf, + vector_size: usize, + segments: Vec, +} \ No newline at end of file diff --git a/lib/segment/src/vector_storage/simple_vector_storage.rs b/lib/segment/src/vector_storage/simple_vector_storage.rs index aaa82a7ee91..7f7e9f70373 100644 --- a/lib/segment/src/vector_storage/simple_vector_storage.rs +++ b/lib/segment/src/vector_storage/simple_vector_storage.rs @@ -9,6 +9,9 @@ use std::ops::Range; use std::path::Path; use sled::{Db, Config}; use serde::{Deserialize, Serialize}; +use std::mem::size_of; +use log::debug; + /// Since sled is used for reading only during the initialization, large read cache is not required const SLED_CACHE_SIZE: u64 = 10 * 1024 * 1024; // 10 mb @@ -33,18 +36,24 @@ impl SimpleVectorStorage { let mut vectors: Vec> = vec![]; let mut deleted: HashSet = HashSet::new(); - let store = Config::new().cache_capacity(SLED_CACHE_SIZE).path(path).open()?; + let store = Config::new() + .cache_capacity(SLED_CACHE_SIZE) + .path(path).open()?; vectors.resize(store.len(), vec![]); + let expected_collection_size = dim * store.len() * size_of::() / 1024 / 1024; + debug!("Vector count: {}, expected_mem: {} Mb", + store.len(), expected_collection_size); + for record in store.iter() { let (key, val) = record?; let point_id: PointOffsetType = bincode::deserialize(&key).unwrap(); - let mut stored_record: StoredRecord = bincode::deserialize(&val).unwrap(); + let stored_record: StoredRecord = bincode::deserialize(&val).unwrap(); if stored_record.deleted { deleted.insert(point_id); } - stored_record.vector.shrink_to_fit(); + // stored_record.vector.shrink_to_fit(); vectors[point_id] = stored_record.vector; } diff --git a/lib/storage/src/content_manager/toc.rs b/lib/storage/src/content_manager/toc.rs index 883f32b72ae..6652f217882 100644 --- a/lib/storage/src/content_manager/toc.rs +++ b/lib/storage/src/content_manager/toc.rs @@ -1,24 +1,29 @@ -use std::sync::Arc; -use collection::collection::Collection; -use std::collections::HashMap; -use wal::WalOptions; -use tokio::runtime::Runtime; -use tokio::runtime; -use num_cpus; use std::cmp::max; -use segment::types::SegmentConfig; +use std::collections::HashMap; +use std::fs::{create_dir_all, read_dir, remove_dir_all}; use std::path::{Path, PathBuf}; -use std::fs::{create_dir_all, remove_dir_all, read_dir}; +use std::str::from_utf8; +use std::sync::Arc; + +use num_cpus; +use parking_lot::RwLock; +use sled::{Config, Db}; +use sled::transaction::UnabortableTransactionError; +use tokio::runtime; +use tokio::runtime::Runtime; +use wal::WalOptions; + +use collection::collection::Collection; use collection::collection_builder::collection_builder::build_collection; +use collection::collection_builder::collection_loader::load_collection; +use segment::types::SegmentConfig; + use crate::content_manager::errors::StorageError; use crate::content_manager::storage_ops::{AliasOperations, StorageOps}; use crate::types::StorageConfig; -use sled::Db; -use sled::transaction::UnabortableTransactionError; -use std::str::from_utf8; -use collection::collection_builder::collection_loader::load_collection; -use parking_lot::RwLock; +/// Since sled is used for reading only during the initialization, large read cache is not required +const SLED_CACHE_SIZE: u64 = 1 * 1024 * 1024; // 1 mb const COLLECTIONS_DIR: &str = "collections"; @@ -72,7 +77,10 @@ impl TableOfContent { let alias_path = Path::new(&storage_config.storage_path) .join("aliases.sled"); - let alias_persistence = sled::open(alias_path.as_path()).unwrap(); + let alias_persistence = Config::new().cache_capacity(SLED_CACHE_SIZE) + .path(alias_path.as_path()) + .open() + .unwrap(); TableOfContent { collections: Arc::new(RwLock::new(collections)),