Skip to content

Commit

Permalink
only increment and check sequence progress at batches
Browse files Browse the repository at this point in the history
  • Loading branch information
lanvidr committed Jun 10, 2022
1 parent d847570 commit 6ec4862
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 17 deletions.
4 changes: 2 additions & 2 deletions crates/sui-core/src/authority_active.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,12 +205,12 @@ where
A: AuthorityAPI + Send + Sync + 'static + Clone,
{
pub async fn spawn_checkpoint_process(self) {
self._spawn_checkpoint_process(Some(CheckpointProcessControl::default()))
self.spawn_checkpoint_process_with_config(Some(CheckpointProcessControl::default()))
.await
}

/// Spawn all active tasks.
pub async fn _spawn_checkpoint_process(
pub async fn spawn_checkpoint_process_with_config(
self,
checkpoint_process_control: Option<CheckpointProcessControl>,
) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ async fn checkpoint_active_flow_crash_client_with_gossip() {
.unwrap();
// Spin the gossip service.
active_state
._spawn_checkpoint_process(Some(CheckpointProcessControl::default()))
.spawn_checkpoint_process_with_config(Some(CheckpointProcessControl::default()))
.await;
});
}
Expand Down Expand Up @@ -198,7 +198,7 @@ async fn checkpoint_active_flow_crash_client_no_gossip() {
.unwrap();
// Spin the gossip service.
active_state
._spawn_checkpoint_process(Some(CheckpointProcessControl::default()))
.spawn_checkpoint_process_with_config(Some(CheckpointProcessControl::default()))
.await;
});
}
Expand Down
20 changes: 9 additions & 11 deletions crates/sui-core/src/authority_active/gossip/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,27 +269,25 @@ where
Some(Ok(BatchInfoResponseItem(UpdateItem::Batch(signed_batch)) )) => {
let next_seq = signed_batch.batch.next_sequence_number;
self.follower_store.record_next_sequence(&self.peer_name, next_seq)?;
match self.max_seq {
Some(max_seq) => {
if next_seq < max_seq {
info!("Gossip sequence number unexpected: found {:?} but previously received {:?}", next_seq, max_seq);
}
}
None => {}
}
},

// Upon receiving a transaction digest, store it if it is not processed already.
Some(Ok(BatchInfoResponseItem(UpdateItem::Transaction((seq, digest))))) => {
Some(Ok(BatchInfoResponseItem(UpdateItem::Transaction((_seq, digest))))) => {
if !self.state.database.effects_exists(&digest.transaction)? {
queue.push(async move {
tokio::time::sleep(Duration::from_millis(EACH_ITEM_DELAY_MS)).await;
digest
});
self.state.metrics.gossip_queued_count.inc();

}
match self.max_seq {
Some(max_seq) => {
if seq < max_seq {
info!("Gossip sequence number unexpected: found {:?} but previously received {:?}", seq, max_seq);
}
}
None => {}
}
self.max_seq = Some(seq + 1);
},

// Return any errors.
Expand Down
4 changes: 2 additions & 2 deletions crates/sui/tests/checkpoints_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ async fn end_to_end() {
..CheckpointProcessControl::default()
};
active_state
._spawn_checkpoint_process(Some(checkpoint_process_control))
.spawn_checkpoint_process_with_config(Some(checkpoint_process_control))
.await
});
}
Expand Down Expand Up @@ -230,7 +230,7 @@ async fn checkpoint_with_shared_objects() {
..CheckpointProcessControl::default()
};
active_state
._spawn_checkpoint_process(Some(checkpoint_process_control))
.spawn_checkpoint_process_with_config(Some(checkpoint_process_control))
.await
});
}
Expand Down

0 comments on commit 6ec4862

Please sign in to comment.