Skip to content

Commit

Permalink
[storage] add delete_range and delete_range_inclusive to SchemaBatch
Browse files Browse the repository at this point in the history
  • Loading branch information
areshand authored and aptos-bot committed Mar 10, 2022
1 parent 6c0d879 commit 7fb7920
Show file tree
Hide file tree
Showing 3 changed files with 120 additions and 2 deletions.
50 changes: 48 additions & 2 deletions storage/schemadb/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ use crate::{
metrics::{
DIEM_SCHEMADB_BATCH_COMMIT_BYTES, DIEM_SCHEMADB_BATCH_COMMIT_LATENCY_SECONDS,
DIEM_SCHEMADB_BATCH_PUT_LATENCY_SECONDS, DIEM_SCHEMADB_DELETES, DIEM_SCHEMADB_GET_BYTES,
DIEM_SCHEMADB_GET_LATENCY_SECONDS, DIEM_SCHEMADB_ITER_BYTES,
DIEM_SCHEMADB_ITER_LATENCY_SECONDS, DIEM_SCHEMADB_PUT_BYTES,
DIEM_SCHEMADB_GET_LATENCY_SECONDS, DIEM_SCHEMADB_INCLUSIVE_RANGE_DELETES,
DIEM_SCHEMADB_ITER_BYTES, DIEM_SCHEMADB_ITER_LATENCY_SECONDS, DIEM_SCHEMADB_PUT_BYTES,
DIEM_SCHEMADB_RANGE_DELETES,
},
schema::{KeyCodec, Schema, SeekKeyCodec, ValueCodec},
};
Expand Down Expand Up @@ -52,6 +53,8 @@ pub const DEFAULT_CF_NAME: ColumnFamilyName = "default";
enum WriteOp {
Value { key: Vec<u8>, value: Vec<u8> },
Deletion { key: Vec<u8> },
DeletionRange { begin: Vec<u8>, end: Vec<u8> },
DeletionRangeInclusive { begin: Vec<u8>, end: Vec<u8> },
}

/// `SchemaBatch` holds a collection of updates that can be applied to a DB atomically. The updates
Expand Down Expand Up @@ -92,6 +95,32 @@ impl SchemaBatch {

Ok(())
}

/// Adds a delete range operation that delete a range [start, end)
pub fn delete_range<S: Schema>(&mut self, begin: &S::Key, end: &S::Key) -> Result<()> {
let begin = <S::Key as KeyCodec<S>>::encode_key(begin)?;
let end = <S::Key as KeyCodec<S>>::encode_key(end)?;
self.rows
.entry(S::COLUMN_FAMILY_NAME)
.or_insert_with(Vec::new)
.push(WriteOp::DeletionRange { begin, end });
Ok(())
}

/// Adds a delete range operation that delete a range [start, end] including end
pub fn delete_range_inclusive<S: Schema>(
&mut self,
begin: &S::Key,
end: &S::Key,
) -> Result<()> {
let begin = <S::Key as KeyCodec<S>>::encode_key(begin)?;
let end = <S::Key as KeyCodec<S>>::encode_key(end)?;
self.rows
.entry(S::COLUMN_FAMILY_NAME)
.or_insert_with(Vec::new)
.push(WriteOp::DeletionRangeInclusive { begin, end });
Ok(())
}
}

pub enum ScanDirection {
Expand Down Expand Up @@ -398,6 +427,13 @@ impl DB {
match write_op {
WriteOp::Value { key, value } => db_batch.put_cf(cf_handle, key, value),
WriteOp::Deletion { key } => db_batch.delete_cf(cf_handle, key),
WriteOp::DeletionRange { begin, end } => {
db_batch.delete_range_cf(cf_handle, begin, end);
}
WriteOp::DeletionRangeInclusive { begin, end } => {
db_batch.delete_range_cf(cf_handle, begin, end);
db_batch.delete_cf(cf_handle, end);
}
}
}
}
Expand All @@ -417,6 +453,16 @@ impl DB {
WriteOp::Deletion { key: _ } => {
DIEM_SCHEMADB_DELETES.with_label_values(&[cf_name]).inc();
}
WriteOp::DeletionRange { begin: _, end: _ } => {
DIEM_SCHEMADB_RANGE_DELETES
.with_label_values(&[cf_name])
.inc();
}
WriteOp::DeletionRangeInclusive { begin: _, end: _ } => {
DIEM_SCHEMADB_INCLUSIVE_RANGE_DELETES
.with_label_values(&[cf_name])
.inc();
}
}
}
}
Expand Down
18 changes: 18 additions & 0 deletions storage/schemadb/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,24 @@ pub static DIEM_SCHEMADB_DELETES: Lazy<IntCounterVec> = Lazy::new(|| {
.unwrap()
});

pub static DIEM_SCHEMADB_RANGE_DELETES: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"diem_storage_range_deletes",
"Diem storage range delete calls",
&["cf_name"]
)
.unwrap()
});

pub static DIEM_SCHEMADB_INCLUSIVE_RANGE_DELETES: Lazy<IntCounterVec> = Lazy::new(|| {
register_int_counter_vec!(
"diem_storage_range_inclusive_deletes",
"Diem storage range inclusive delete calls",
&["cf_name"]
)
.unwrap()
});

pub static DIEM_SCHEMADB_BATCH_PUT_LATENCY_SECONDS: Lazy<HistogramVec> = Lazy::new(|| {
register_histogram_vec!(
// metric name
Expand Down
54 changes: 54 additions & 0 deletions storage/schemadb/tests/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,9 +172,62 @@ fn test_schema_put_get() {
);
}

fn test_schemabatch_delete_range_util(begin: u32, end: u32, is_inclusive: bool) {
let db = TestDB::new();
let mut db_batch = SchemaBatch::new();
for i in 0..100u32 {
db_batch
.put::<TestSchema1>(&TestField(i), &TestField(i))
.unwrap();
}
let mut should_exist_vec = [true; 100];

db_batch
.delete_range::<TestSchema1>(&TestField(begin), &TestField(end))
.unwrap();
if !is_inclusive {
for i in begin..end {
should_exist_vec[i as usize] = false;
}
} else {
for i in begin..=end {
should_exist_vec[i as usize] = false;
}
}
db.write_schemas(db_batch).unwrap();
for (i, should_exist) in should_exist_vec.iter().enumerate() {
assert_eq!(
db.get::<TestSchema1>(&TestField(i as u32))
.unwrap()
.is_some(),
*should_exist,
)
}
}

proptest! {
#![proptest_config(ProptestConfig::with_cases(10))]

#[test]
fn test_schemabatch_delete_range(
ranges_to_delete in vec(
(0..100u32).prop_flat_map(|begin| (Just(begin), (begin..100u32))), 0..10)
) {
for (begin, end) in ranges_to_delete {
test_schemabatch_delete_range_util(begin, end, false);
}
}

#[test]
fn test_schemabatch_delete_range_inclusive(
ranges_to_delete in vec(
(0..100u32).prop_flat_map(|begin| (Just(begin), (begin..100u32))), 0..10)
) {
for (begin, end) in ranges_to_delete {
test_schemabatch_delete_range_util(begin, end, false);
}
}

#[test]
fn test_schema_range_delete(
ranges_to_delete in vec(
Expand Down Expand Up @@ -242,6 +295,7 @@ fn test_single_schema_batch() {
db_batch
.put::<TestSchema2>(&TestField(5), &TestField(5))
.unwrap();

db.write_schemas(db_batch).unwrap();

assert_eq!(
Expand Down

0 comments on commit 7fb7920

Please sign in to comment.