Skip to content

Commit

Permalink
disk index: batch insert (solana-labs#31094)
Browse files Browse the repository at this point in the history
  • Loading branch information
jeffwashington authored Apr 10, 2023
1 parent ce21a58 commit d63359a
Show file tree
Hide file tree
Showing 6 changed files with 502 additions and 171 deletions.
271 changes: 269 additions & 2 deletions bucket_map/src/bucket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ use {

pub struct ReallocatedItems<I: BucketOccupied, D: BucketOccupied> {
// Some if the index was reallocated
// u64 is random associated with the new index
pub index: Option<BucketStorage<I>>,
// Some for a data bucket reallocation
// u64 is data bucket index
Expand Down Expand Up @@ -104,6 +103,10 @@ pub struct Bucket<T: Copy + 'static> {
anticipated_size: u64,

pub reallocated: Reallocated<IndexBucket<T>, DataBucket>,

/// set to true once any entries have been deleted from the index.
/// Deletes indicate that there can be free slots and that the full search range must be searched for an entry.
at_least_one_entry_deleted: bool,
}

impl<'b, T: Clone + Copy + 'static> Bucket<T> {
Expand Down Expand Up @@ -131,6 +134,7 @@ impl<'b, T: Clone + Copy + 'static> Bucket<T> {
stats,
reallocated: Reallocated::default(),
anticipated_size: 0,
at_least_one_entry_deleted: false,
}
}

Expand Down Expand Up @@ -269,12 +273,124 @@ impl<'b, T: Clone + Copy + 'static> Bucket<T> {
Err(BucketMapError::IndexNoSpace(index.contents.capacity()))
}

pub fn read_value(&self, key: &Pubkey) -> Option<(&[T], RefCount)> {
pub(crate) fn read_value(&self, key: &Pubkey) -> Option<(&[T], RefCount)> {
//debug!("READ_VALUE: {:?}", key);
let (elem, _) = self.find_index_entry(key)?;
Some(elem.read_value(&self.index, &self.data))
}

/// for each item in `items`, get the hash value when hashed with `random`.
/// Return a vec of tuples:
/// (hash_value, key, value)
fn index_entries(
items: impl Iterator<Item = (Pubkey, T)>,
count: usize,
random: u64,
) -> Vec<(u64, Pubkey, T)> {
let mut inserts = Vec::with_capacity(count);
items.for_each(|(key, v)| {
let ix = Self::bucket_index_ix(&key, random);
inserts.push((ix, key, v));
});
inserts
}

/// insert all of `items` into the index.
/// return duplicates
pub(crate) fn batch_insert_non_duplicates(
&mut self,
items: impl Iterator<Item = (Pubkey, T)>,
count: usize,
) -> Vec<(Pubkey, T, T)> {
assert!(
!self.at_least_one_entry_deleted,
"efficient batch insertion can only occur prior to any deletes"
);
let current_len = self.index.count.load(Ordering::Relaxed);
let anticipated = count as u64;
self.set_anticipated_count((anticipated).saturating_add(current_len));
let mut entries = Self::index_entries(items, count, self.random);
let mut duplicates = Vec::default();
// insert, but resizes may be necessary
loop {
let cap = self.index.capacity();
// sort entries by their index % cap, so we'll search over the same spots in the file close to each other
// `reverse()` is so we can efficiently pop off the end but get ascending order index values
// sort before calling to make `batch_insert_non_duplicates_internal` easier to test.
entries.sort_unstable_by(|a, b| (a.0 % cap).cmp(&(b.0 % cap)).reverse());

let result = Self::batch_insert_non_duplicates_internal(
&mut self.index,
&self.data,
&mut entries,
&mut duplicates,
);
match result {
Ok(_result) => {
// everything added
self.set_anticipated_count(0);
return duplicates;
}
Err(error) => {
// resize and add more
// `entries` will have had items removed from it
self.grow(error);
self.handle_delayed_grows();
}
}
}
}

/// sort `entries` by hash value
/// insert as much of `entries` as possible into `index`.
/// return an error if the index needs to resize.
/// for every entry that already exists in `index`, add it (and the value already in the index) to `duplicates`
pub fn batch_insert_non_duplicates_internal(
index: &mut BucketStorage<IndexBucket<T>>,
data_buckets: &[BucketStorage<DataBucket>],
reverse_sorted_entries: &mut Vec<(u64, Pubkey, T)>,
duplicates: &mut Vec<(Pubkey, T, T)>,
) -> Result<(), BucketMapError> {
let max_search = index.max_search();
let cap = index.capacity();
let search_end = max_search.min(cap);

// pop one entry at a time to insert
'outer: while let Some((ix_entry_raw, k, v)) = reverse_sorted_entries.pop() {
let ix_entry = ix_entry_raw % cap;
// search for an empty spot starting at `ix_entry`
for search in 0..search_end {
let ix_index = (ix_entry + search) % cap;
let elem = IndexEntryPlaceInBucket::new(ix_index);
if index.try_lock(ix_index) {
// found free element and occupied it
// These fields will be overwritten after allocation by callers.
// Since this part of the mmapped file could have previously been used by someone else, there can be garbage here.
elem.init(index, &k);

// new data stored should be stored in IndexEntry and NOT in data file
// new data len is 1
elem.set_slot_count_enum_value(index, OccupiedEnum::OneSlotInIndex(&v));
continue 'outer; // this 'insertion' is completed: inserted successfully
} else {
// occupied, see if the key already exists here
if elem.key(index) == &k {
let (v_existing, _ref_count_existing) =
elem.read_value(index, data_buckets);
duplicates.push((k, v, *v_existing.first().unwrap()));
continue 'outer; // this 'insertion' is completed: found a duplicate entry
}
}
}
// search loop ended without finding a spot to insert this key
// so, remember the item we were trying to insert for next time after resizing
reverse_sorted_entries.push((ix_entry_raw, k, v));
return Err(BucketMapError::IndexNoSpace(cap));
}

Ok(())
}

pub fn try_write(
&mut self,
key: &Pubkey,
Expand Down Expand Up @@ -417,6 +533,7 @@ impl<'b, T: Clone + Copy + 'static> Bucket<T> {

pub fn delete_key(&mut self, key: &Pubkey) {
if let Some((elem, elem_ix)) = self.find_index_entry(key) {
self.at_least_one_entry_deleted = true;
if let OccupiedEnum::MultipleSlots(multiple_slots) =
elem.get_slot_count_enum(&self.index)
{
Expand Down Expand Up @@ -637,3 +754,153 @@ impl<'b, T: Clone + Copy + 'static> Bucket<T> {
self.insert(key, (&new, refct));
}
}

#[cfg(test)]
mod tests {
use {super::*, tempfile::tempdir};

#[test]
fn test_index_entries() {
for v in 10..12u64 {
for random in 1..3 {
for len in 1..3 {
let raw = (0..len)
.map(|l| {
let k = Pubkey::from([l as u8; 32]);
(k, v + (l as u64))
})
.collect::<Vec<_>>();
let hashed = Bucket::index_entries(raw.clone().into_iter(), len, random);
assert_eq!(hashed.len(), len);
(0..len).for_each(|i| {
let raw = raw[i];
let hashed = hashed[i];
assert_eq!(Bucket::<u64>::bucket_index_ix(&raw.0, random), hashed.0);
assert_eq!(raw.0, hashed.1);
assert_eq!(raw.1, hashed.2);
});
}
}
}
}

fn create_test_index(max_search: Option<u8>) -> BucketStorage<IndexBucket<u64>> {
let tmpdir = tempdir().unwrap();
let paths: Vec<PathBuf> = vec![tmpdir.path().to_path_buf()];
assert!(!paths.is_empty());
let max_search = max_search.unwrap_or(2);
BucketStorage::<IndexBucket<u64>>::new(
Arc::new(paths),
1,
std::mem::size_of::<crate::index_entry::IndexEntry<u64>>() as u64,
max_search,
Arc::default(),
Arc::default(),
)
}

#[test]
fn batch_insert_non_duplicates_internal_simple() {
solana_logger::setup();
// add 2 entries, make sure they are added in the buckets we expect
let random = 1;
let data_buckets = Vec::default();
for v in 10..12u64 {
for len in 1..3 {
let raw = (0..len)
.map(|l| {
let k = Pubkey::from([l as u8; 32]);
(k, v + (l as u64))
})
.collect::<Vec<_>>();
let mut hashed = Bucket::index_entries(raw.clone().into_iter(), len, random);
let hashed_raw = hashed.clone();

let mut index = create_test_index(None);

let mut duplicates = Vec::default();
assert!(Bucket::<u64>::batch_insert_non_duplicates_internal(
&mut index,
&Vec::default(),
&mut hashed,
&mut duplicates,
)
.is_ok());

assert_eq!(hashed.len(), 0);
(0..len).for_each(|i| {
let raw = hashed_raw[i];
let elem = IndexEntryPlaceInBucket::new(raw.0 % index.capacity());
let (value, ref_count) = elem.read_value(&index, &data_buckets);
assert_eq!(ref_count, 1);
assert_eq!(value, &[hashed_raw[i].2]);
});
}
}
}

#[test]
fn batch_insert_non_duplicates_internal_same_ix_exceeds_max_search() {
solana_logger::setup();
// add `len` entries with the same ix, make sure they are added in subsequent buckets.
// adjust `max_search`. If we try to add an entry that causes us to exceed `max_search`, then assert that the adding fails with an error and
// the colliding item remains in `entries`
let random = 1;
let data_buckets = Vec::default();
for max_search in [2usize, 3] {
for v in 10..12u64 {
for len in 1..(max_search + 1) {
let raw = (0..len)
.map(|l| {
let k = Pubkey::from([l as u8; 32]);
(k, v + (l as u64))
})
.collect::<Vec<_>>();
let mut hashed = Bucket::index_entries(raw.clone().into_iter(), len, random);
let common_ix = 2; // both are put at same ix
hashed.iter_mut().for_each(|mut v| {
v.0 = common_ix;
});
let hashed_raw = hashed.clone();

let mut index = create_test_index(Some(max_search as u8));

let mut duplicates = Vec::default();
let result = Bucket::<u64>::batch_insert_non_duplicates_internal(
&mut index,
&Vec::default(),
&mut hashed,
&mut duplicates,
);

assert_eq!(
hashed.len(),
if len > max_search { 1 } else { 0 },
"len: {len}"
);
(0..len).for_each(|i| {
assert!(if len > max_search {
result.is_err()
} else {
result.is_ok()
});
let raw = hashed_raw[i];
if i == 0 && len > max_search {
// max search was exceeded and the first entry was unable to be inserted, so it remained in `hashed`
assert_eq!(hashed[0], hashed_raw[0]);
} else {
// we insert in reverse order when ix values are equal, so we expect to find item[1] in item[1]'s expected ix and item[0] will be 1 search distance away from expected ix
let search_required = (len - i - 1) as u64;
let elem = IndexEntryPlaceInBucket::new(
(raw.0 + search_required) % index.capacity(),
);
let (value, ref_count) = elem.read_value(&index, &data_buckets);
assert_eq!(ref_count, 1);
assert_eq!(value, &[hashed_raw[i].2]);
}
});
}
}
}
}
}
14 changes: 14 additions & 0 deletions bucket_map/src/bucket_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,20 @@ impl<T: Clone + Copy> BucketApi<T> {
bucket.as_mut().unwrap().set_anticipated_count(count);
}

/// batch insert of `items`. Assumption is a single slot list element and ref_count == 1.
/// For any pubkeys that already exist, the failed insertion data and the existing data are returned.
pub fn batch_insert_non_duplicates(
&self,
items: impl Iterator<Item = (Pubkey, T)>,
count: usize,
) -> Vec<(Pubkey, T, T)> {
let mut bucket = self.get_write_bucket();
bucket
.as_mut()
.unwrap()
.batch_insert_non_duplicates(items, count)
}

pub fn update<F>(&self, key: &Pubkey, updatefn: F)
where
F: FnMut(Option<(&[T], RefCount)>) -> Option<(Vec<T>, RefCount)>,
Expand Down
Loading

0 comments on commit d63359a

Please sign in to comment.