Skip to content

Commit

Permalink
attempt to replace sled for storing vectors
Browse files Browse the repository at this point in the history
  • Loading branch information
generall committed Dec 8, 2020
1 parent 44c6c0a commit eab0ac8
Show file tree
Hide file tree
Showing 11 changed files with 140 additions and 27 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions benches/service/collection_stress.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ let host = 'http://localhost:6333'

let collection_name = 'stress_collection';

let vector_length = 1000;
let vector_length = 128;
let vectors_per_batch = 32;

var create_collection_payload = JSON.stringify({
"create_collection": {
Expand Down Expand Up @@ -113,7 +114,7 @@ export default function () {

var payload = JSON.stringify({
"upsert_points": {
"points": Array.from({ length: 64 }, () => generate_point()),
"points": Array.from({ length: vectors_per_batch }, () => generate_point()),
}
});

Expand Down
2 changes: 1 addition & 1 deletion benches/service/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@
DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )"


docker run --network=host -i loadimpact/k6 run -u 10 -i 100 --rps 10 - <"$DIR/collection_stress.js"
docker run --network=host -i loadimpact/k6 run -u 10 -i 500000 - <"$DIR/collection_stress.js"


2 changes: 2 additions & 0 deletions lib/segment/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,5 @@ atomic_refcell = "0.1.6"
atomicwrites = "0.2.5"
memmap = "0.7.0"
schemars = "0.8.0"
log = "0.4"
env_logger = "0.7.1"
9 changes: 7 additions & 2 deletions lib/segment/src/id_mapper/simple_id_mapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@ use std::collections::HashMap;
use crate::types::{PointOffsetType, PointIdType};
use crate::id_mapper::id_mapper::IdMapper;
use crate::entry::entry_point::OperationResult;
use sled::Db;
use sled::{Db, Config};
use bincode;
use std::path::Path;

/// Since sled is used for reading only during the initialization, large read cache is not required
const SLED_CACHE_SIZE: u64 = 10 * 1024 * 1024; // 10 mb

pub struct SimpleIdMapper {
internal_to_external: HashMap<PointOffsetType, PointIdType>,
external_to_internal: HashMap<PointIdType, PointOffsetType>,
Expand All @@ -15,7 +18,9 @@ pub struct SimpleIdMapper {
impl SimpleIdMapper {

pub fn open(path: &Path) -> Self {
let store = sled::open(path).unwrap();
let store = Config::new()
.cache_capacity(SLED_CACHE_SIZE)
.path(path).open().unwrap();

let mut internal_to_external: HashMap<PointOffsetType, PointIdType> = Default::default();
let mut external_to_internal: HashMap<PointIdType, PointOffsetType> = Default::default();
Expand Down
7 changes: 5 additions & 2 deletions lib/segment/src/payload_storage/simple_payload_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,12 @@ use crate::types::{PayloadKeyType, PayloadType, PointOffsetType, TheMap};
use std::collections::{HashMap};

use crate::entry::entry_point::OperationResult;
use sled::Db;
use sled::{Db, Config};
use std::path::Path;

/// Since sled is used for reading only during the initialization, large read cache is not required
const SLED_CACHE_SIZE: u64 = 10 * 1024 * 1024; // 10 mb

pub struct SimplePayloadStorage {
payload: HashMap<PointOffsetType, TheMap<PayloadKeyType, PayloadType>>,
store: Db,
Expand All @@ -15,7 +18,7 @@ pub struct SimplePayloadStorage {

impl SimplePayloadStorage {
pub fn open(path: &Path) -> Self {
let store = sled::open(path).unwrap();
let store = Config::new().cache_capacity(SLED_CACHE_SIZE).path(path).open().unwrap();

let mut payload_map: HashMap<PointOffsetType, TheMap<PayloadKeyType, PayloadType>> = Default::default();

Expand Down
6 changes: 4 additions & 2 deletions lib/segment/src/vector_storage/memmap_vector_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,8 +209,10 @@ impl VectorStorage for MemmapVectorStorage {
let mmap = self.deleted_mmap.as_mut().unwrap();
let flag = mmap.get_mut(key + HEADER_SIZE).unwrap();

*flag = 1;
self.deleted_count += 1;
if *flag == 0 {
*flag = 1;
self.deleted_count += 1;
}
}
Ok(())
}
Expand Down
1 change: 1 addition & 0 deletions lib/segment/src/vector_storage/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub mod vector_storage;
pub mod simple_vector_storage;
pub mod memmap_vector_storage;
mod persisted_vector_storage;
80 changes: 80 additions & 0 deletions lib/segment/src/vector_storage/persisted_vector_storage.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
use std::fs::OpenOptions;
use std::path::{Path, PathBuf};
use memmap::{Mmap, MmapMut};

use crate::entry::entry_point::OperationResult;
use std::intrinsics::size_of;
use std::ops;


const MAX_SEGMENT_SIZE: usize = 32 * 1024 * 1024; // 32 mb
const SEGMENT_MAGIC: &'static [u8; 4] = b"vect";

const SEGMENT_HEADER_SIZE: usize = size_of::<usize> + SEGMENT_MAGIC.len();



/// Storage for fixed-length records
///
struct Segment {
mmap: MmapMut,
path: PathBuf,

record_size: usize,
/// Number of records which could be stored in total in this segment
capacity: usize,
/// Number of currently stored vectors
occupancy: usize,
unflashed: uszie
}

impl Segment {
pub fn new(path: &Path, record_size: usize) -> OperationResult<Self> {
let max_records = MAX_SEGMENT_SIZE / record_size;
let storage_size = max_records * record_size;

let file = OpenOptions::new().read(true).write(true).create(true).open(&path)?;
file.allocate(storage_size as u64)?;

let mmap = unsafe { MmapMut::map_mut(&file)? };

Ok(Segment {
mmap,
path: path.to_owned(),
record_size,
capacity: max_records,
occupancy: 0,
unflashed: 0
})
}

pub fn append<T>(&mut self, record: &T) -> OperationResult<usize> where T: ops::Deref<Target=[u8]> {
self.unflashed += 1;
unimplemented!()
}

pub fn update<T>(&mut self, idx: usize, record: &T) -> OperationResult<()> where T: ops::Deref<Target=[u8]> {
self.unflashed += 1;
unimplemented!()
}

/// Flushes recently written entries to durable storage.
pub fn flush(&mut self) -> OperationResult<()> {
if self.unflashed == 0 {
Ok(())
} else {
self.mmap.flush()?;
self.unflashed = 0;
}
}

pub fn read<T>(&self, idx: usize) -> &[u8] {
unimplemented!()
}
}

struct PersistedVectorStorage {
storage_path: PathBuf,
vector_size: usize,
segments: Vec<Segment>,
}
15 changes: 12 additions & 3 deletions lib/segment/src/vector_storage/simple_vector_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ use std::ops::Range;
use std::path::Path;
use sled::{Db, Config};
use serde::{Deserialize, Serialize};
use std::mem::size_of;
use log::debug;


/// Since sled is used for reading only during the initialization, large read cache is not required
const SLED_CACHE_SIZE: u64 = 10 * 1024 * 1024; // 10 mb
Expand All @@ -33,18 +36,24 @@ impl SimpleVectorStorage {
let mut vectors: Vec<Vec<VectorElementType>> = vec![];
let mut deleted: HashSet<PointOffsetType> = HashSet::new();

let store = Config::new().cache_capacity(SLED_CACHE_SIZE).path(path).open()?;
let store = Config::new()
.cache_capacity(SLED_CACHE_SIZE)
.path(path).open()?;

vectors.resize(store.len(), vec![]);

let expected_collection_size = dim * store.len() * size_of::<VectorElementType>() / 1024 / 1024;
debug!("Vector count: {}, expected_mem: {} Mb",
store.len(), expected_collection_size);

for record in store.iter() {
let (key, val) = record?;
let point_id: PointOffsetType = bincode::deserialize(&key).unwrap();
let mut stored_record: StoredRecord = bincode::deserialize(&val).unwrap();
let stored_record: StoredRecord = bincode::deserialize(&val).unwrap();
if stored_record.deleted {
deleted.insert(point_id);
}
stored_record.vector.shrink_to_fit();
// stored_record.vector.shrink_to_fit();
vectors[point_id] = stored_record.vector;
}

Expand Down
38 changes: 23 additions & 15 deletions lib/storage/src/content_manager/toc.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,29 @@
use std::sync::Arc;
use collection::collection::Collection;
use std::collections::HashMap;
use wal::WalOptions;
use tokio::runtime::Runtime;
use tokio::runtime;
use num_cpus;
use std::cmp::max;
use segment::types::SegmentConfig;
use std::collections::HashMap;
use std::fs::{create_dir_all, read_dir, remove_dir_all};
use std::path::{Path, PathBuf};
use std::fs::{create_dir_all, remove_dir_all, read_dir};
use std::str::from_utf8;
use std::sync::Arc;

use num_cpus;
use parking_lot::RwLock;
use sled::{Config, Db};
use sled::transaction::UnabortableTransactionError;
use tokio::runtime;
use tokio::runtime::Runtime;
use wal::WalOptions;

use collection::collection::Collection;
use collection::collection_builder::collection_builder::build_collection;
use collection::collection_builder::collection_loader::load_collection;
use segment::types::SegmentConfig;

use crate::content_manager::errors::StorageError;
use crate::content_manager::storage_ops::{AliasOperations, StorageOps};
use crate::types::StorageConfig;
use sled::Db;
use sled::transaction::UnabortableTransactionError;
use std::str::from_utf8;
use collection::collection_builder::collection_loader::load_collection;
use parking_lot::RwLock;

/// Since sled is used for reading only during the initialization, large read cache is not required
const SLED_CACHE_SIZE: u64 = 1 * 1024 * 1024; // 1 mb

const COLLECTIONS_DIR: &str = "collections";

Expand Down Expand Up @@ -72,7 +77,10 @@ impl TableOfContent {
let alias_path = Path::new(&storage_config.storage_path)
.join("aliases.sled");

let alias_persistence = sled::open(alias_path.as_path()).unwrap();
let alias_persistence = Config::new().cache_capacity(SLED_CACHE_SIZE)
.path(alias_path.as_path())
.open()
.unwrap();

TableOfContent {
collections: Arc::new(RwLock::new(collections)),
Expand Down

0 comments on commit eab0ac8

Please sign in to comment.