Skip to content

Commit

Permalink
Tidied up the Stamper module and upgraded to a 1.34 dependency
Browse files Browse the repository at this point in the history
Added stamper.revert method to be used for rollback - rolling back to a previous
commit in case of deleting all documents or rolling operations back should reset
the stamper as well

Added type alias for Opstamp - helps code readibility instead of seeing u64
returned by functions.

Moved to AtomicU64 on stable rust (since 1.34) - where possible use standard
library interfaces.
  • Loading branch information
petr-tik committed Apr 24, 2019
1 parent 96a4f50 commit 8e50921
Show file tree
Hide file tree
Showing 13 changed files with 92 additions and 46 deletions.
3 changes: 2 additions & 1 deletion src/core/index_meta.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use core::SegmentMeta;
use indexer::Opstamp;
use schema::Schema;
use serde_json;
use std::fmt;
Expand All @@ -15,7 +16,7 @@ use std::fmt;
pub struct IndexMeta {
pub segments: Vec<SegmentMeta>,
pub schema: Schema,
pub opstamp: u64,
pub opstamp: Opstamp,
#[serde(skip_serializing_if = "Option::is_none")]
pub payload: Option<String>,
}
Expand Down
3 changes: 2 additions & 1 deletion src/core/segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use directory::error::{OpenReadError, OpenWriteError};
use directory::Directory;
use directory::{ReadOnlySource, WritePtr};
use indexer::segment_serializer::SegmentSerializer;
use indexer::Opstamp;
use schema::Schema;
use std::fmt;
use std::path::PathBuf;
Expand Down Expand Up @@ -50,7 +51,7 @@ impl Segment {
}

#[doc(hidden)]
pub fn with_delete_meta(self, num_deleted_docs: u32, opstamp: u64) -> Segment {
pub fn with_delete_meta(self, num_deleted_docs: u32, opstamp: Opstamp) -> Segment {
Segment {
index: self.index,
meta: self.meta.with_delete_meta(num_deleted_docs, opstamp),
Expand Down
7 changes: 4 additions & 3 deletions src/core/segment_meta.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use super::SegmentComponent;
use census::{Inventory, TrackedObject};
use core::SegmentId;
use indexer::Opstamp;
use serde;
use std::collections::HashSet;
use std::fmt;
Expand All @@ -13,7 +14,7 @@ lazy_static! {
#[derive(Clone, Debug, Serialize, Deserialize)]
struct DeleteMeta {
num_deleted_docs: u32,
opstamp: u64,
opstamp: Opstamp,
}

/// `SegmentMeta` contains simple meta information about a segment.
Expand Down Expand Up @@ -138,7 +139,7 @@ impl SegmentMeta {

/// Returns the opstamp of the last delete operation
/// taken in account in this segment.
pub fn delete_opstamp(&self) -> Option<u64> {
pub fn delete_opstamp(&self) -> Option<Opstamp> {
self.tracked
.deletes
.as_ref()
Expand All @@ -152,7 +153,7 @@ impl SegmentMeta {
}

#[doc(hidden)]
pub fn with_delete_meta(self, num_deleted_docs: u32, opstamp: u64) -> SegmentMeta {
pub fn with_delete_meta(self, num_deleted_docs: u32, opstamp: Opstamp) -> SegmentMeta {
let delete_meta = DeleteMeta {
num_deleted_docs,
opstamp,
Expand Down
5 changes: 3 additions & 2 deletions src/indexer/delete_queue.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use super::operation::DeleteOperation;
use indexer::Opstamp;
use std::mem;
use std::ops::DerefMut;
use std::sync::{Arc, RwLock};
Expand Down Expand Up @@ -184,15 +185,15 @@ impl DeleteCursor {
/// queue are consume and the next get will return None.
/// - the next get will return the first operation with an
/// `opstamp >= target_opstamp`.
pub fn skip_to(&mut self, target_opstamp: u64) {
pub fn skip_to(&mut self, target_opstamp: Opstamp) {
// TODO Can be optimize as we work with block.
while self.is_behind_opstamp(target_opstamp) {
self.advance();
}
}

#[cfg_attr(feature = "cargo-clippy", allow(clippy::wrong_self_convention))]
fn is_behind_opstamp(&mut self, target_opstamp: u64) -> bool {
fn is_behind_opstamp(&mut self, target_opstamp: Opstamp) -> bool {
self.get()
.map(|operation| operation.opstamp < target_opstamp)
.unwrap_or(false)
Expand Down
6 changes: 4 additions & 2 deletions src/indexer/doc_opstamp_mapping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@ pub enum DocToOpstampMapping {
None,
}

use super::Opstamp;

impl From<Vec<u64>> for DocToOpstampMapping {
fn from(opstamps: Vec<u64>) -> DocToOpstampMapping {
fn from(opstamps: Vec<Opstamp>) -> DocToOpstampMapping {
DocToOpstampMapping::WithMap(Arc::new(opstamps))
}
}
Expand All @@ -35,7 +37,7 @@ impl DocToOpstampMapping {
//
// The edge case opstamp = some doc opstamp is in practise
// never called.
pub fn compute_doc_limit(&self, target_opstamp: u64) -> DocId {
pub fn compute_doc_limit(&self, target_opstamp: Opstamp) -> DocId {
match *self {
DocToOpstampMapping::WithMap(ref doc_opstamps) => {
match doc_opstamps.binary_search(&target_opstamp) {
Expand Down
29 changes: 15 additions & 14 deletions src/indexer/index_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use indexer::doc_opstamp_mapping::DocToOpstampMapping;
use indexer::operation::DeleteOperation;
use indexer::stamper::Stamper;
use indexer::MergePolicy;
use indexer::Opstamp;
use indexer::SegmentEntry;
use indexer::SegmentWriter;
use postings::compute_table_size;
Expand Down Expand Up @@ -99,7 +100,7 @@ pub struct IndexWriter {
delete_queue: DeleteQueue,

stamper: Stamper,
committed_opstamp: u64,
committed_opstamp: Opstamp,
}

/// Open a new index writer. Attempts to acquire a lockfile.
Expand Down Expand Up @@ -177,7 +178,7 @@ pub fn compute_deleted_bitset(
segment_reader: &SegmentReader,
delete_cursor: &mut DeleteCursor,
doc_opstamps: &DocToOpstampMapping,
target_opstamp: u64,
target_opstamp: Opstamp,
) -> Result<bool> {
let mut might_have_changed = false;

Expand Down Expand Up @@ -219,7 +220,7 @@ pub fn compute_deleted_bitset(
pub fn advance_deletes(
mut segment: Segment,
segment_entry: &mut SegmentEntry,
target_opstamp: u64,
target_opstamp: Opstamp,
) -> Result<()> {
{
if segment_entry.meta().delete_opstamp() == Some(target_opstamp) {
Expand Down Expand Up @@ -299,11 +300,11 @@ fn index_documents(
// the worker thread.
assert!(num_docs > 0);

let doc_opstamps: Vec<u64> = segment_writer.finalize()?;
let doc_opstamps: Vec<Opstamp> = segment_writer.finalize()?;

let segment_meta = SegmentMeta::new(segment_id, num_docs);

let last_docstamp: u64 = *(doc_opstamps.last().unwrap());
let last_docstamp: Opstamp = *(doc_opstamps.last().unwrap());

let delete_bitset_opt = if delete_cursor.get().is_some() {
let doc_to_opstamps = DocToOpstampMapping::from(doc_opstamps);
Expand Down Expand Up @@ -494,7 +495,7 @@ impl IndexWriter {
/// state as it was after the last commit.
///
/// The opstamp at the last commit is returned.
pub fn rollback(&mut self) -> Result<()> {
pub fn rollback(&mut self) -> Result<Opstamp> {
info!("Rolling back to opstamp {}", self.committed_opstamp);

// marks the segment updater as killed. From now on, all
Expand Down Expand Up @@ -529,7 +530,7 @@ impl IndexWriter {
// was dropped with the index_writer.
for _ in document_receiver.clone() {}

Ok(())
Ok(self.committed_opstamp)
}

/// Prepares a commit.
Expand Down Expand Up @@ -567,7 +568,7 @@ impl IndexWriter {
info!("Preparing commit");

// this will drop the current document channel
// and recreate a new one channels.
// and recreate a new one.
self.recreate_document_channel();

let former_workers_join_handle = mem::replace(&mut self.workers_join_handle, Vec::new());
Expand Down Expand Up @@ -601,7 +602,7 @@ impl IndexWriter {
/// Commit returns the `opstamp` of the last document
/// that made it in the commit.
///
pub fn commit(&mut self) -> Result<u64> {
pub fn commit(&mut self) -> Result<Opstamp> {
self.prepare_commit()?.commit()
}

Expand All @@ -617,7 +618,7 @@ impl IndexWriter {
///
/// Like adds, the deletion itself will be visible
/// only after calling `commit()`.
pub fn delete_term(&mut self, term: Term) -> u64 {
pub fn delete_term(&mut self, term: Term) -> Opstamp {
let opstamp = self.stamper.stamp();
let delete_operation = DeleteOperation { opstamp, term };
self.delete_queue.push(delete_operation);
Expand All @@ -631,7 +632,7 @@ impl IndexWriter {
///
/// This is also the opstamp of the commit that is currently
/// available for searchers.
pub fn commit_opstamp(&self) -> u64 {
pub fn commit_opstamp(&self) -> Opstamp {
self.committed_opstamp
}

Expand All @@ -645,7 +646,7 @@ impl IndexWriter {
///
/// Currently it represents the number of documents that
/// have been added since the creation of the index.
pub fn add_document(&mut self, document: Document) -> u64 {
pub fn add_document(&mut self, document: Document) -> Opstamp {
let opstamp = self.stamper.stamp();
let add_operation = AddOperation { opstamp, document };
let send_result = self.operation_sender.send(vec![add_operation]);
Expand All @@ -662,7 +663,7 @@ impl IndexWriter {
/// The total number of stamps generated by this method is `count + 1`;
/// each operation gets a stamp from the `stamps` iterator and `last_opstamp`
/// is for the batch itself.
fn get_batch_opstamps(&mut self, count: u64) -> (u64, Range<u64>) {
fn get_batch_opstamps(&mut self, count: Opstamp) -> (Opstamp, Range<Opstamp>) {
let Range { start, end } = self.stamper.stamps(count + 1u64);
let last_opstamp = end - 1;
let stamps = Range {
Expand All @@ -688,7 +689,7 @@ impl IndexWriter {
/// Like adds and deletes (see `IndexWriter.add_document` and
/// `IndexWriter.delete_term`), the changes made by calling `run` will be
/// visible to readers only after calling `commit()`.
pub fn run(&mut self, user_operations: Vec<UserOperation>) -> u64 {
pub fn run(&mut self, user_operations: Vec<UserOperation>) -> Opstamp {
let count = user_operations.len() as u64;
if count == 0 {
return self.stamper.stamp();
Expand Down
7 changes: 4 additions & 3 deletions src/indexer/merge_operation.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use census::{Inventory, TrackedObject};
use indexer::Opstamp;
use std::collections::HashSet;
use SegmentId;

Expand Down Expand Up @@ -35,14 +36,14 @@ pub struct MergeOperation {
}

struct InnerMergeOperation {
target_opstamp: u64,
target_opstamp: Opstamp,
segment_ids: Vec<SegmentId>,
}

impl MergeOperation {
pub fn new(
inventory: &MergeOperationInventory,
target_opstamp: u64,
target_opstamp: Opstamp,
segment_ids: Vec<SegmentId>,
) -> MergeOperation {
let inner_merge_operation = InnerMergeOperation {
Expand All @@ -54,7 +55,7 @@ impl MergeOperation {
}
}

pub fn target_opstamp(&self) -> u64 {
pub fn target_opstamp(&self) -> Opstamp {
self.inner.target_opstamp
}

Expand Down
1 change: 1 addition & 0 deletions src/indexer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ pub use self::segment_entry::SegmentEntry;
pub use self::segment_manager::SegmentManager;
pub use self::segment_serializer::SegmentSerializer;
pub use self::segment_writer::SegmentWriter;
pub use self::stamper::Opstamp;

/// Alias for the default merge policy, which is the `LogMergePolicy`.
pub type DefaultMergePolicy = LogMergePolicy;
5 changes: 3 additions & 2 deletions src/indexer/operation.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
use indexer::Opstamp;
use schema::Document;
use schema::Term;

/// Timestamped Delete operation.
#[derive(Clone, Eq, PartialEq, Debug)]
pub struct DeleteOperation {
pub opstamp: u64,
pub opstamp: Opstamp,
pub term: Term,
}

/// Timestamped Add operation.
#[derive(Eq, PartialEq, Debug)]
pub struct AddOperation {
pub opstamp: u64,
pub opstamp: Opstamp,
pub document: Document,
}

Expand Down
11 changes: 6 additions & 5 deletions src/indexer/prepared_commit.rs
Original file line number Diff line number Diff line change
@@ -1,35 +1,36 @@
use super::IndexWriter;
use indexer::Opstamp;
use Result;

/// A prepared commit
pub struct PreparedCommit<'a> {
index_writer: &'a mut IndexWriter,
payload: Option<String>,
opstamp: u64,
opstamp: Opstamp,
}

impl<'a> PreparedCommit<'a> {
pub(crate) fn new(index_writer: &'a mut IndexWriter, opstamp: u64) -> PreparedCommit {
pub(crate) fn new(index_writer: &'a mut IndexWriter, opstamp: Opstamp) -> PreparedCommit {
PreparedCommit {
index_writer,
payload: None,
opstamp,
}
}

pub fn opstamp(&self) -> u64 {
pub fn opstamp(&self) -> Opstamp {
self.opstamp
}

pub fn set_payload(&mut self, payload: &str) {
self.payload = Some(payload.to_string())
}

pub fn abort(self) -> Result<()> {
pub fn abort(self) -> Result<Opstamp> {
self.index_writer.rollback()
}

pub fn commit(self) -> Result<u64> {
pub fn commit(self) -> Result<Opstamp> {
info!("committing {}", self.opstamp);
self.index_writer
.segment_updater()
Expand Down
7 changes: 4 additions & 3 deletions src/indexer/segment_updater.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use indexer::merge_operation::MergeOperationInventory;
use indexer::merger::IndexMerger;
use indexer::stamper::Stamper;
use indexer::MergeOperation;
use indexer::Opstamp;
use indexer::SegmentEntry;
use indexer::SegmentSerializer;
use indexer::{DefaultMergePolicy, MergePolicy};
Expand Down Expand Up @@ -224,7 +225,7 @@ impl SegmentUpdater {
///
/// Tne method returns copies of the segment entries,
/// updated with the delete information.
fn purge_deletes(&self, target_opstamp: u64) -> Result<Vec<SegmentEntry>> {
fn purge_deletes(&self, target_opstamp: Opstamp) -> Result<Vec<SegmentEntry>> {
let mut segment_entries = self.0.segment_manager.segment_entries();
for segment_entry in &mut segment_entries {
let segment = self.0.index.segment(segment_entry.meta().clone());
Expand All @@ -233,7 +234,7 @@ impl SegmentUpdater {
Ok(segment_entries)
}

pub fn save_metas(&self, opstamp: u64, commit_message: Option<String>) {
pub fn save_metas(&self, opstamp: Opstamp, commit_message: Option<String>) {
if self.is_alive() {
let index = &self.0.index;
let directory = index.directory();
Expand Down Expand Up @@ -280,7 +281,7 @@ impl SegmentUpdater {
.garbage_collect(|| self.0.segment_manager.list_files());
}

pub fn commit(&self, opstamp: u64, payload: Option<String>) -> Result<()> {
pub fn commit(&self, opstamp: Opstamp, payload: Option<String>) -> Result<()> {
self.run_async(move |segment_updater| {
if segment_updater.is_alive() {
let segment_entries = segment_updater
Expand Down
3 changes: 2 additions & 1 deletion src/indexer/segment_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use core::SerializableSegment;
use fastfield::FastFieldsWriter;
use fieldnorm::FieldNormsWriter;
use indexer::segment_serializer::SegmentSerializer;
use indexer::Opstamp;
use postings::MultiFieldPostingsWriter;
use schema::FieldType;
use schema::Schema;
Expand All @@ -28,7 +29,7 @@ pub struct SegmentWriter {
segment_serializer: SegmentSerializer,
fast_field_writers: FastFieldsWriter,
fieldnorms_writer: FieldNormsWriter,
doc_opstamps: Vec<u64>,
doc_opstamps: Vec<Opstamp>,
tokenizers: Vec<Option<Box<BoxedTokenizer>>>,
}

Expand Down
Loading

0 comments on commit 8e50921

Please sign in to comment.