Skip to content
This repository has been archived by the owner on Oct 24, 2023. It is now read-only.

Commit

Permalink
refactor: move datapool out into separate crate (twitter#426)
Browse files Browse the repository at this point in the history
Moves the datapool out from the seg storage crate and into its own
crate.

Adds a basic header to file-backed datapools and introduces a new
datapool for keeping content in memory with a file available to
save and restore state.

Minor refactoring to accomodate returning a result which allows us
to better surface any issues creating the datapool.

Adds a header to the datapools to allow versioning, validation, and
staleness checks.
  • Loading branch information
brayniac authored Jun 7, 2022
1 parent 9a93a26 commit fa2abea
Show file tree
Hide file tree
Showing 23 changed files with 1,000 additions and 202 deletions.
111 changes: 108 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ members = [
"src/rust/server/pingserver",
"src/rust/server/segcache",
"src/rust/session",
"src/rust/storage/datapool",
"src/rust/storage/seg",
"src/rust/storage/types",
]
Expand Down
6 changes: 3 additions & 3 deletions src/rust/entrystore/src/seg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ pub struct Seg {
impl Seg {
/// Create `Seg` storage based on the config and the `TimeType` which is
/// used to interpret various expiry time formats.
pub fn new<T: SegConfig>(config: &T) -> Self {
pub fn new<T: SegConfig>(config: &T) -> Result<Self, std::io::Error> {
let config = config.seg();

// build up the eviction policy from the config
Expand All @@ -49,9 +49,9 @@ impl Seg {
.segment_size(config.segment_size())
.eviction(eviction)
.datapool_path(config.datapool_path())
.build();
.build()?;

Self { data }
Ok(Self { data })
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/rust/server/segcache/benches/benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ fn get_benchmark(c: &mut Criterion) {
let config = SegcacheConfig::default();

// launch the server
let server = Segcache::new(config);
let server = Segcache::new(config).expect("failed to launch segcache");

// wait for server to startup. duration is chosen to be longer than we'd
// expect startup to take in a slow ci environment.
Expand Down
6 changes: 3 additions & 3 deletions src/rust/server/segcache/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,15 @@ pub struct Segcache {

impl Segcache {
/// Creates a new `Segcache` process from the given `SegcacheConfig`.
pub fn new(config: SegcacheConfig) -> Self {
pub fn new(config: SegcacheConfig) -> Result<Self, std::io::Error> {
// initialize logging
let log_drain = configure_logging(&config);

// initialize metrics
common::metrics::init();

// initialize storage
let storage = Storage::new(&config);
let storage = Storage::new(&config)?;

let max_buffer_size = std::cmp::max(
server::DEFAULT_BUFFER_SIZE,
Expand All @@ -59,7 +59,7 @@ impl Segcache {
// spawn threads
let process = process_builder.spawn();

Self { process }
Ok(Self { process })
}

/// Wait for all threads to complete. Blocks until the process has fully
Expand Down
7 changes: 6 additions & 1 deletion src/rust/server/segcache/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,5 +117,10 @@ fn main() {
}

// launch segcache
Segcache::new(config).wait()
match Segcache::new(config) {
Ok(segcache) => segcache.wait(),
Err(e) => {
error!("error launching segcache: {}", e);
}
}
}
2 changes: 1 addition & 1 deletion src/rust/server/segcache/tests/integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use std::time::Duration;

fn main() {
debug!("launching server");
let server = Segcache::new(SegcacheConfig::default());
let server = Segcache::new(SegcacheConfig::default()).expect("failed to launch segcache");

// wait for server to startup. duration is chosen to be longer than we'd
// expect startup to take in a slow ci environment.
Expand Down
2 changes: 1 addition & 1 deletion src/rust/server/segcache/tests/integration_multi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ fn main() {
debug!("launching multi-worker server");
let mut config = SegcacheConfig::default();
config.worker_mut().set_threads(2);
let server = Segcache::new(config);
let server = Segcache::new(config).expect("failed to launch segcache");

// wait for server to startup. duration is chosen to be longer than we'd
// expect startup to take in a slow ci environment.
Expand Down
18 changes: 18 additions & 0 deletions src/rust/storage/datapool/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
[package]
name = "datapool"
version = "0.1.0"
edition = "2021"
authors = ["Brian Martin <[email protected]>"]
description = "abstractions for byte storage pools"
homepage = "https://pelikan.io"
repository = "https://github.com/twitter/pelikan"
license = "Apache-2.0"

[dependencies]
blake3 = "1.3.1"
common = { path = "../../common" }
libc = "0.2.125"
memmap2 = "0.5.3"

[dev-dependencies]
tempfile = "3.3.0"
Loading

0 comments on commit fa2abea

Please sign in to comment.