Skip to content

Commit

Permalink
Move selecting txs to the mempool transactions queue
Browse files Browse the repository at this point in the history
Signed-off-by: deniallugo <[email protected]>
  • Loading branch information
Deniallugo committed Mar 22, 2022
1 parent cb1e330 commit ff3f66b
Show file tree
Hide file tree
Showing 2 changed files with 165 additions and 135 deletions.
79 changes: 8 additions & 71 deletions core/lib/mempool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,63 +236,6 @@ struct MempoolBlocksHandler {
}

impl MempoolBlocksHandler {
/// Returns: chunks left from max amount of chunks, ops selected
async fn select_priority_ops(
max_block_size_chunks: usize,
current_unprocessed_priority_op: u64,
transactions_queue: &mut MempoolTransactionsQueue,
) -> (usize, Vec<PriorityOp>) {
let mut result = vec![];

let mut used_chunks = 0;
let mut current_priority_op = current_unprocessed_priority_op;
while let Some(op) = transactions_queue.pop_front_priority_op() {
// Since the transaction addition is asynchronous process and we are checking node many times,
// We can find some already processed priority ops
if op.serial_id < current_priority_op {
vlog::warn!("Already processed priority op was found in queue");
// We can skip already processed priority operations
continue;
}
assert_eq!(
current_priority_op, op.serial_id,
"Wrong order for priority ops"
);
if used_chunks + op.data.chunks() <= max_block_size_chunks {
used_chunks += op.data.chunks();
result.push(op);
current_priority_op += 1;
} else {
break;
}
}
(max_block_size_chunks - used_chunks, result)
}

/// Collect txs depending on the remaining block size
async fn prepare_tx_for_block(
mut chunks_left: usize,
block_timestamp: u64,
transactions_queue: &mut MempoolTransactionsQueue,
mempool_state: &MempoolState,
) -> Result<(usize, Vec<SignedTxVariant>), TxAddError> {
transactions_queue.prepare_new_ready_transactions(block_timestamp);

let mut txs_for_commit = Vec::new();

while let Some(tx) = transactions_queue.pop_front() {
let chunks_for_tx = mempool_state.required_chunks(&tx).await?;
if chunks_left >= chunks_for_tx {
txs_for_commit.push(tx);
chunks_left -= chunks_for_tx;
} else {
break;
}
}

Ok((chunks_left, txs_for_commit))
}

async fn propose_new_block(
&mut self,
current_unprocessed_priority_op: u64,
Expand All @@ -307,20 +250,14 @@ impl MempoolBlocksHandler {
.get_transaction_queue(executed_txs)
.await?;

let (chunks_left, priority_ops) = Self::select_priority_ops(
self.max_block_size_chunks,
current_unprocessed_priority_op,
&mut tx_queue,
)
.await;

let (_chunks_left, txs) = Self::prepare_tx_for_block(
chunks_left,
block_timestamp,
&mut tx_queue,
&self.mempool_state,
)
.await?;
let (txs, priority_ops, chunks_left) = tx_queue
.select_transactions(
self.max_block_size_chunks,
current_unprocessed_priority_op,
block_timestamp,
&self.mempool_state,
)
.await?;

if !priority_ops.is_empty() || !txs.is_empty() {
vlog::debug!(
Expand Down
Loading

0 comments on commit ff3f66b

Please sign in to comment.