Skip to content

Commit

Permalink
[enhancement] Refactor libra_channel interface to add an explicit Key
Browse files Browse the repository at this point in the history
Summary
The current libra_channel interface is agnostic of the concept of a Key. We have pushed the abstraction of Key to ValidatorMessage trait which forces each message to implement a trait which returns AccountAddress

This PR updates the libra_channel interface to accept an arbitrary Key in its put() method which will make it easy on the Sender side to add messages. The Sender side will no longer be forced to implement ValidatorMessage trait.

With this design, we do leak the concept of a Key explicitly to the sender, but we were not hiding it very well with the ValidatorMessage trait either.

Test Plan
Updated unit tests

Closes: aptos-labs#1500
Approved by: ankushagarwal
  • Loading branch information
ankushagarwal authored and bors-libra committed Oct 24, 2019
1 parent 6ce120b commit cee5949
Show file tree
Hide file tree
Showing 4 changed files with 161 additions and 142 deletions.
10 changes: 6 additions & 4 deletions common/channel/src/libra_channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,17 @@ use futures::async_await::FusedStream;
use futures::stream::Stream;
use futures::task::Context;
use futures::Poll;
use std::hash::Hash;

/// MessageQueue is a trait which provides a very simple set of methods for implementing
/// a queue which will be used as the internal queue libra_channel.
pub trait MessageQueue {
type Key: Eq + Hash;
/// The actual type of the messages stored in this MessageQueue
type Message;

/// Push a message to this queue
fn push(&mut self, message: Self::Message);
/// Push a message with the given key to this queue
fn push(&mut self, key: Self::Key, message: Self::Message);

/// Pop a message from this queue
fn pop(&mut self) -> Option<Self::Message>;
Expand Down Expand Up @@ -52,9 +54,9 @@ impl<T: MessageQueue> Sender<T> {
/// This adds the message into the internal queue data structure. This is a non-blocking
/// synchronous call.
/// TODO: We can have this return a boolean if the queue of a validator is capacity
pub fn put(&mut self, message: <T as MessageQueue>::Message) {
pub fn put(&mut self, key: T::Key, message: T::Message) {
let mut shared_state = self.shared_state.lock().unwrap();
shared_state.internal_queue.push(message);
shared_state.internal_queue.push(key, message);
if let Some(w) = shared_state.waker.take() {
w.wake();
}
Expand Down
17 changes: 9 additions & 8 deletions common/channel/src/libra_channel_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@ struct TestMessageQueue {
}

impl MessageQueue for TestMessageQueue {
type Key = u8;
type Message = u8;

fn push(&mut self, message: Self::Message) {
fn push(&mut self, _key: Self::Key, message: Self::Message) {
self.queue.push_back(message);
}

Expand All @@ -27,10 +28,10 @@ fn test_send_recv_order() {
queue: VecDeque::new(),
};
let (mut sender, mut receiver) = libra_channel::new(mq);
sender.put(0);
sender.put(1);
sender.put(2);
sender.put(3);
sender.put(0, 0);
sender.put(0, 1);
sender.put(0, 2);
sender.put(0, 3);
let task = async move {
// Ensure that messages are received in order
assert_eq!(receiver.select_next_some().await, 0);
Expand Down Expand Up @@ -73,10 +74,10 @@ fn test_waker() {
});
});
thread::sleep(Duration::from_millis(100));
sender.put(0);
sender.put(0, 0);
thread::sleep(Duration::from_millis(100));
sender.put(1);
sender.put(0, 1);
thread::sleep(Duration::from_millis(100));
sender.put(2);
sender.put(0, 2);
join_handle.join().unwrap();
}
18 changes: 5 additions & 13 deletions common/channel/src/message_queues.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,6 @@ use crate::libra_channel::MessageQueue;
use libra_types::account_address::AccountAddress;
use std::collections::{HashMap, VecDeque};

/// ValidatorMessage trait needs to be implemented by every message which gets pushed into the Libra
/// channel so that we can ensure fairness among validators
pub trait ValidatorMessage {
/// Extract the Validator from which this message arrived. This
/// will be used for ensuring fairness among validators.
fn get_validator(&self) -> AccountAddress;
}

/// QueueStyle is an enum which can be used as a configuration option for
/// PerValidatorQueue. Since the queue per validator is going to be bounded,
/// QueueStyle also determines the policy for dropping messages.
Expand Down Expand Up @@ -77,21 +69,21 @@ impl<T> PerValidatorQueue<T> {
}
}

impl<T: ValidatorMessage> MessageQueue for PerValidatorQueue<T> {
impl<T> MessageQueue for PerValidatorQueue<T> {
type Key = AccountAddress;
type Message = T;

/// push a message to the appropriate queue in per_validator_queue
/// add the validator to round_robin_queue if it didnt already exist
fn push(&mut self, message: Self::Message) {
let validator = message.get_validator();
fn push(&mut self, key: Self::Key, message: Self::Message) {
let max_queue_size = self.max_queue_size;
let validator_message_queue = self
.per_validator_queue
.entry(validator)
.entry(key)
.or_insert_with(|| VecDeque::with_capacity(max_queue_size));
// Add the validator to our round-robin queue if it's not already there
if validator_message_queue.is_empty() {
self.round_robin_queue.push_back(validator);
self.round_robin_queue.push_back(key);
}
// Push the message to the actual validator message queue
if validator_message_queue.len() == max_queue_size {
Expand Down
Loading

0 comments on commit cee5949

Please sign in to comment.