Skip to content

Commit

Permalink
Optimizations
Browse files Browse the repository at this point in the history
Ultimately none of these optimizations are huge, but they showed up on
profiles while I was trying to figure out sync behaviors. In the end,
fsync dwarfs these optimizations, but they still seemed quite wasteful:

- Allocating max_nodes + 1 instead of max_nodes allows growing to not
  re-allocate the entry vecs when inserting new entries.
- Many places were using Path/PathBuf in ways that a unique ID would be
  better suited for. The FileManager now exposes a way to resolve a
  PathId, and nearly all APIs have been updated to accomodate. The
  primary goal is to allow Roots to not continually recompute paths for
  trees, and have the same IDs be used throughout Nebari.
- PagedWriter now offers `write_chunk_cached` which will insert the
  value into the cache. This is useful when the data is already an
  ArcBytes<'static>, and the ChunkCache will skip values that are too
  large. Versioned and Unversioned trees now use this new API when
  writing values, meaning they are more likely to be cached.

Additionally, this contains de-optimizations by making temporary files
reside in the current working directory.
  • Loading branch information
ecton committed May 29, 2022
1 parent 480ffc8 commit a94853f
Show file tree
Hide file tree
Showing 25 changed files with 725 additions and 424 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@ Cargo.lock
*.profdata
coverage/
*.folded
fuzz-*.log
fuzz-*.log
.tmp*
2 changes: 1 addition & 1 deletion benchmarks/benches/blobs/nebari.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ impl<B: NebariBenchmark> SimpleBench for InsertBlobs<B> {
config: &Self::Config,
config_group_state: &<Self::Config as BenchConfig>::GroupState,
) -> Result<Self, anyhow::Error> {
let tempfile = TempDir::new()?;
let tempfile = TempDir::new_in(".")?;
let tree = TreeFile::<B::Root, StdFile>::write(
tempfile.path().join("tree"),
State::default(),
Expand Down
4 changes: 2 additions & 2 deletions benchmarks/benches/blobs/persy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ impl SimpleBench for InsertBlobs {
config: &Self::Config,
config_group_state: &<Self::Config as BenchConfig>::GroupState,
) -> Result<Self, anyhow::Error> {
let _ = std::fs::remove_file("/tmp/persy");
let db = Persy::open_or_create_with("/tmp/persy", Config::default(), |persy| {
let _ = std::fs::remove_file("persy");
let db = Persy::open_or_create_with("persy", Config::default(), |persy| {
let mut tx = persy.begin()?;
tx.create_index::<u64, ByteVec>("index", ValueMode::Replace)?;
let prepared = tx.prepare()?;
Expand Down
16 changes: 12 additions & 4 deletions benchmarks/benches/blobs/sled.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ impl SimpleBench for InsertBlobs {
config: &Self::Config,
config_group_state: &<Self::Config as BenchConfig>::GroupState,
) -> Result<Self, anyhow::Error> {
let tempfile = TempDir::new()?;
let tempfile = TempDir::new_in(".")?;
let db = _sled::open(tempfile.path())?;

Ok(Self {
Expand All @@ -48,9 +48,17 @@ impl SimpleBench for InsertBlobs {
let batch = self.blob.next().unwrap();
let start = Instant::now();
self.db
.insert(&batch.0.to_be_bytes(), IVec::from(batch.1.to_vec()))?;
self.db.flush()?;
total_duration += Instant::now() - start;
.transaction::<_, _, ()>(|db| {
db.insert(&batch.0.to_be_bytes(), IVec::from(batch.1.to_vec()))?;
db.flush();
Ok(())
})
.unwrap();
let iter = Instant::now() - start;
total_duration += iter;
// if iter.as_nanos() > 500_000 {
// println!("Iter: {}", iter.as_nanos());
// }
}
Ok(total_duration)
}
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/benches/blobs/sqlite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ impl SimpleBench for InsertBlobs {
// production, it alters more than just insert performance. A more
// complete benchmark which includes both inserts and queries would be
// better to compare roots against sqlite's WAL performance.
let tempfile = NamedTempFile::new()?;
let tempfile = NamedTempFile::new_in(".")?;
let sqlite = Connection::open(tempfile.path())?;
// Sets the journal to what seems to be the most optimal, safe setting
// for @ecton. See:
Expand Down
6 changes: 3 additions & 3 deletions benchmarks/benches/logs/nebari.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ impl<B: NebariBenchmark> SimpleBench for InsertLogs<B> {
config: &Self::Config,
config_group_state: &<Self::Config as BenchConfig>::GroupState,
) -> Result<Self, anyhow::Error> {
let tempfile = TempDir::new()?;
let tempfile = TempDir::new_in(".")?;
let tree = TreeFile::<B::Root, StdFile>::write(
tempfile.path().join("tree"),
State::default(),
Expand Down Expand Up @@ -102,7 +102,7 @@ impl<B: NebariBenchmark> SimpleBench for ReadLogs<B> {
config: &Self::Config,
_group_state: &<Self::Config as BenchConfig>::GroupState,
) -> Self::GroupState {
let tempfile = TempDir::new().unwrap();
let tempfile = TempDir::new_in(".").unwrap();
let mut tree = TreeFile::<B::Root, StdFile>::write(
tempfile.path().join("tree"),
State::default(),
Expand Down Expand Up @@ -191,7 +191,7 @@ impl<B: NebariBenchmark> SimpleBench for ScanLogs<B> {
config: &Self::Config,
_group_state: &<Self::Config as BenchConfig>::GroupState,
) -> Self::GroupState {
let tempfile = TempDir::new().unwrap();
let tempfile = TempDir::new_in(".").unwrap();
let mut tree = TreeFile::<B::Root, StdFile>::write(
tempfile.path().join("tree"),
State::default(),
Expand Down
13 changes: 9 additions & 4 deletions benchmarks/benches/logs/sled.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,12 @@ impl SimpleBench for InsertLogs {
config: &Self::Config,
config_group_state: &<Self::Config as BenchConfig>::GroupState,
) -> Result<Self, anyhow::Error> {
let tempfile = TempDir::new()?;
let db = _sled::open(tempfile.path())?;
let tempfile = TempDir::new_in(".")?;
let db = _sled::Config::default()
.path("/path/to/data".to_owned())
.flush_every_ms(None)
.open()
.unwrap();

Ok(Self {
_tempfile: tempfile,
Expand Down Expand Up @@ -63,6 +67,7 @@ impl SimpleBench for InsertLogs {
db.flush();
Ok(())
})?;
self.db.flush().unwrap();
total_duration += Instant::now() - start;
}
Ok(total_duration)
Expand All @@ -83,7 +88,7 @@ impl SimpleBench for ReadLogs {
config: &Self::Config,
_group_state: &<Self::Config as BenchConfig>::GroupState,
) -> Self::GroupState {
let tempfile = TempDir::new().unwrap();
let tempfile = TempDir::new_in(".").unwrap();
let db = _sled::Config::default()
.cache_capacity(2_000 * 160_384)
.path(tempfile.path())
Expand Down Expand Up @@ -177,7 +182,7 @@ impl SimpleBench for ScanLogs {
config: &Self::Config,
_group_state: &<Self::Config as BenchConfig>::GroupState,
) -> Self::GroupState {
let tempfile = TempDir::new().unwrap();
let tempfile = TempDir::new_in(".").unwrap();
let db = _sled::Config::default()
.cache_capacity(2_000 * 160_384)
.path(tempfile.path())
Expand Down
6 changes: 3 additions & 3 deletions benchmarks/benches/logs/sqlite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ impl SimpleBench for InsertLogs {
// production, it alters more than just insert performance. A more
// complete benchmark which includes both inserts and queries would be
// better to compare roots against sqlite's WAL performance.
let tempfile = NamedTempFile::new()?;
let tempfile = NamedTempFile::new_in(".")?;
let sqlite = Connection::open(tempfile.path())?;
// Sets the journal to what seems to be the most optimal, safe setting
// for @ecton. See:
Expand Down Expand Up @@ -114,7 +114,7 @@ impl SimpleBench for ReadLogs {
config: &Self::Config,
_group_state: &<Self::Config as BenchConfig>::GroupState,
) -> Self::GroupState {
let tempfile = NamedTempFile::new().unwrap();
let tempfile = NamedTempFile::new_in(".").unwrap();
let sqlite = Connection::open(tempfile.path()).unwrap();
sqlite
.execute(
Expand Down Expand Up @@ -214,7 +214,7 @@ impl SimpleBench for ScanLogs {
config: &Self::Config,
_group_state: &<Self::Config as BenchConfig>::GroupState,
) -> Self::GroupState {
let tempfile = NamedTempFile::new().unwrap();
let tempfile = NamedTempFile::new_in(".").unwrap();
let sqlite = Connection::open(tempfile.path()).unwrap();
sqlite
.execute(
Expand Down
5 changes: 3 additions & 2 deletions fuzz/fuzz_targets/compare_swap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ fuzz_target!(|batches: Vec<BTreeSet<u16>>| {
let context = Context::default();
let file = NamedTempFile::new().unwrap();
let mut tree =
TreeFile::<Unversioned, StdFile>::write(&file, State::default(), &context, None).unwrap();
TreeFile::<Unversioned, StdFile>::write(file.as_ref(), State::default(), &context, None)
.unwrap();

let mut oracle = BTreeMap::new();
let ops = batches.iter().map(|b| b.len()).sum::<usize>();
Expand Down Expand Up @@ -81,5 +82,5 @@ fuzz_target!(|batches: Vec<BTreeSet<u16>>| {
}

drop(tree);
context.file_manager.delete(&file).unwrap();
context.file_manager.delete(file.as_ref()).unwrap();
});
6 changes: 6 additions & 0 deletions nebari/src/chunk_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,12 @@ impl ChunkCache {
}
}

/// Returns the maximum size of data that can be cached.
#[must_use]
pub const fn max_chunk_size(&self) -> usize {
self.max_block_length
}

/// Adds a new cached chunk for `file_path` at `position`.
pub fn insert(&self, file_id: u64, position: u64, buffer: ArcBytes<'static>) {
if buffer.len() <= self.max_block_length {
Expand Down
9 changes: 8 additions & 1 deletion nebari/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,10 @@ impl ErrorKind {
Self::DataIntegrity(Box::new(error.into()))
}

pub(crate) fn is_file_not_found(&self) -> bool {
/// Returns true if this error represents an
/// [`std::io::ErrorKind::NotFound`].
#[must_use]
pub fn is_file_not_found(&self) -> bool {
matches!(self, Self::Io(err) if err.kind() == std::io::ErrorKind::NotFound)
}
}
Expand Down Expand Up @@ -277,12 +280,16 @@ impl From<String> for ErrorKind {
/// An internal database error.
#[derive(Debug, Error)]
pub enum InternalError {
/// A b-tree header was too large.
#[error("the b-tree header is too large")]
HeaderTooLarge,
/// The transaction manager is no longer running.
#[error("the transaction manager has stopped")]
TransactionManagerStopped,
/// An internal error communicating over a channel has ocurred.
#[error("an error on an internal channel has occurred")]
InternalCommunication,
/// An unexpected byte length was encountered.
#[error("an unexpected byte length was encountered")]
IncorrectByteLength,
}
63 changes: 25 additions & 38 deletions nebari/src/io/any.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ use std::{
use crate::io::{
fs::{OpenStdFile, StdFile, StdFileManager},
memory::{MemoryFile, MemoryFileManager, OpenMemoryFile},
FileManager, FileOp, ManagedFile, ManagedFileOpener, OpenableFile, OperableFile,
FileManager, FileOp, IntoPathId, ManagedFile, ManagedFileOpener, OpenableFile, OperableFile,
PathId,
};

/// A file that can be either a [`StdFile`] or [`MemoryFile`].
Expand All @@ -23,20 +24,13 @@ impl ManagedFile for AnyFile {
}

impl super::File for AnyFile {
fn id(&self) -> Option<u64> {
fn id(&self) -> &PathId {
match self {
Self::Std(file) => file.id(),
Self::Memory(file) => file.id(),
}
}

fn path(&self) -> &std::path::Path {
match self {
Self::Std(file) => file.path(),
Self::Memory(file) => file.path(),
}
}

fn length(&self) -> Result<u64, crate::Error> {
match self {
Self::Std(file) => file.length(),
Expand Down Expand Up @@ -120,53 +114,56 @@ impl FileManager for AnyFileManager {
type File = AnyFile;
type FileHandle = AnyFileHandle;

fn read(&self, path: impl AsRef<std::path::Path>) -> Result<Self::FileHandle, crate::Error> {
fn resolve_path(&self, path: impl AsRef<Path>, create_if_not_found: bool) -> Option<PathId> {
match self {
Self::Std(manager) => manager.resolve_path(path, create_if_not_found),
Self::Memory(manager) => manager.resolve_path(path, create_if_not_found),
}
}

fn read(&self, path: impl IntoPathId) -> Result<Self::FileHandle, crate::Error> {
match self {
Self::Std(manager) => manager.read(path).map(AnyFileHandle::Std),
Self::Memory(manager) => manager.read(path).map(AnyFileHandle::Memory),
}
}

fn append(&self, path: impl AsRef<std::path::Path>) -> Result<Self::FileHandle, crate::Error> {
fn append(&self, path: impl IntoPathId) -> Result<Self::FileHandle, crate::Error> {
match self {
Self::Std(manager) => manager.append(path).map(AnyFileHandle::Std),
Self::Memory(manager) => manager.append(path).map(AnyFileHandle::Memory),
}
}

fn close_handles<F: FnOnce(u64)>(
&self,
path: impl AsRef<std::path::Path>,
publish_callback: F,
) {
fn close_handles<F: FnOnce(PathId)>(&self, path: impl IntoPathId, publish_callback: F) {
match self {
Self::Std(manager) => manager.close_handles(path, publish_callback),
Self::Memory(manager) => manager.close_handles(path, publish_callback),
}
}

fn delete(&self, path: impl AsRef<std::path::Path>) -> Result<bool, crate::Error> {
fn delete(&self, path: impl IntoPathId) -> Result<bool, crate::Error> {
match self {
Self::Std(manager) => manager.delete(path),
Self::Memory(manager) => manager.delete(path),
}
}

fn delete_directory(&self, path: impl AsRef<std::path::Path>) -> Result<(), crate::Error> {
fn delete_directory(&self, path: impl AsRef<Path>) -> Result<(), crate::Error> {
match self {
Self::Std(manager) => manager.delete_directory(path),
Self::Memory(manager) => manager.delete_directory(path),
}
}

fn exists(&self, path: impl AsRef<std::path::Path>) -> Result<bool, crate::Error> {
fn exists(&self, path: impl IntoPathId) -> Result<bool, crate::Error> {
match self {
Self::Std(manager) => manager.exists(path),
Self::Memory(manager) => manager.exists(path),
}
}

fn file_length(&self, path: impl AsRef<Path>) -> Result<u64, crate::Error> {
fn file_length(&self, path: impl IntoPathId) -> Result<u64, crate::Error> {
match self {
Self::Std(manager) => manager.file_length(path),
Self::Memory(manager) => manager.file_length(path),
Expand All @@ -175,27 +172,17 @@ impl FileManager for AnyFileManager {
}

impl ManagedFileOpener<AnyFile> for AnyFileManager {
fn open_for_read(
&self,
path: impl AsRef<std::path::Path> + Send,
id: Option<u64>,
) -> Result<AnyFile, crate::Error> {
fn open_for_read(&self, path: impl IntoPathId + Send) -> Result<AnyFile, crate::Error> {
match self {
AnyFileManager::Std(manager) => manager.open_for_read(path, id).map(AnyFile::Std),
AnyFileManager::Memory(manager) => manager.open_for_read(path, id).map(AnyFile::Memory),
AnyFileManager::Std(manager) => manager.open_for_read(path).map(AnyFile::Std),
AnyFileManager::Memory(manager) => manager.open_for_read(path).map(AnyFile::Memory),
}
}

fn open_for_append(
&self,
path: impl AsRef<std::path::Path> + Send,
id: Option<u64>,
) -> Result<AnyFile, crate::Error> {
fn open_for_append(&self, path: impl IntoPathId + Send) -> Result<AnyFile, crate::Error> {
match self {
AnyFileManager::Std(manager) => manager.open_for_append(path, id).map(AnyFile::Std),
AnyFileManager::Memory(manager) => {
manager.open_for_append(path, id).map(AnyFile::Memory)
}
AnyFileManager::Std(manager) => manager.open_for_append(path).map(AnyFile::Std),
AnyFileManager::Memory(manager) => manager.open_for_append(path).map(AnyFile::Memory),
}
}
}
Expand All @@ -216,14 +203,14 @@ pub enum AnyFileHandle {
}

impl OpenableFile<AnyFile> for AnyFileHandle {
fn id(&self) -> Option<u64> {
fn id(&self) -> &PathId {
match self {
AnyFileHandle::Std(file) => file.id(),
AnyFileHandle::Memory(file) => file.id(),
}
}

fn replace_with<C: FnOnce(u64)>(
fn replace_with<C: FnOnce(PathId)>(
self,
replacement: AnyFile,
manager: &<AnyFile as ManagedFile>::Manager,
Expand Down
Loading

0 comments on commit a94853f

Please sign in to comment.