Skip to content

Commit

Permalink
[State Sync] Update Data Streaming Client API to support stream termi…
Browse files Browse the repository at this point in the history
…nation.
  • Loading branch information
JoshLind authored and bors-libra committed Nov 2, 2021
1 parent 324407b commit 74626d5
Show file tree
Hide file tree
Showing 10 changed files with 255 additions and 89 deletions.
8 changes: 5 additions & 3 deletions state-sync/diem-data-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ use storage_service::UnexpectedResponseError;
use storage_service_types::{self as storage_service, CompleteDataRange, Epoch};
use thiserror::Error;

pub type ResponseId = u64;

pub mod diemnet;

pub type Result<T, E = Error> = ::std::result::Result<T, E>;
Expand Down Expand Up @@ -55,8 +57,8 @@ impl From<UnexpectedResponseError> for Error {
/// the Data Client about invalid or malformed responses.
#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
pub enum ResponseError {
InvalidData,
InvalidPayloadDataType,
MissingData,
ProofVerificationError,
}

Expand Down Expand Up @@ -139,7 +141,7 @@ pub trait DiemDataClient {
/// `notify_bad_response()` API call above.
#[derive(Clone, Debug)]
pub struct Response<T> {
pub id: u64,
pub id: ResponseId,
pub payload: T,
}

Expand All @@ -152,7 +154,7 @@ impl<T> Response<T> {
self.payload
}

pub fn into_parts(self) -> (u64, T) {
pub fn into_parts(self) -> (ResponseId, T) {
(self.id, self.payload)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,6 @@ pub enum DataPayload {
TransactionsWithProof(TransactionListWithProof),
}

/// A sent data notification that tracks the original client request and the
/// client response forwarded along a stream. This is useful for re-fetching.
#[derive(Clone, Debug)]
pub struct SentDataNotification {
pub client_request: DataClientRequest,
pub client_response: DataClientResponse,
}

/// A request that has been sent to the Diem data client.
#[derive(Clone, Debug, Eq, PartialEq)]
pub enum DataClientRequest {
Expand Down
49 changes: 31 additions & 18 deletions state-sync/state-sync-v2/data-streaming-service/src/data_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use crate::{
data_notification,
data_notification::{
DataClientRequest, DataClientResponse, DataNotification, DataPayload, NotificationId,
SentDataNotification,
},
error::Error,
logging::{LogEntry, LogEvent, LogSchema},
Expand All @@ -14,7 +13,7 @@ use crate::{
};
use channel::{diem_channel, message_queues::QueueStyle};
use diem_data_client::{
AdvertisedData, DiemDataClient, GlobalDataSummary, ResponseError, ResponsePayload,
AdvertisedData, DiemDataClient, GlobalDataSummary, ResponseError, ResponseId, ResponsePayload,
};
use diem_id_generator::{IdGenerator, U64IdGenerator};
use diem_infallible::Mutex;
Expand Down Expand Up @@ -65,8 +64,10 @@ pub struct DataStream<T> {
// a data notification can be created and sent along the stream.
sent_data_requests: Option<VecDeque<PendingClientResponse>>,

// The data notifications already sent via this stream.
sent_notifications: HashMap<NotificationId, SentDataNotification>,
// TODO(joshlind): garbage collect me!
// Maps a notification ID (sent along the data stream) to a response ID
// received from the data client. This is useful for providing feedback.
notifications_to_responses: HashMap<NotificationId, ResponseId>,

// The channel on which to send data notifications when they are ready.
notification_sender: channel::diem_channel::Sender<(), DataNotification>,
Expand Down Expand Up @@ -100,7 +101,7 @@ impl<T: DiemDataClient + Send + Clone + 'static> DataStream<T> {
diem_data_client,
stream_progress_tracker,
sent_data_requests: None,
sent_notifications: HashMap::new(),
notifications_to_responses: HashMap::new(),
notification_sender,
notification_id_generator,
stream_end_notification_id: None,
Expand All @@ -126,6 +127,18 @@ impl<T: DiemDataClient + Send + Clone + 'static> DataStream<T> {
self.create_and_send_client_requests(&global_data_summary)
}

/// Returns the response ID associated with a given `notification_id`.
/// Note: this method returns `None` if the `notification_id` does not match
/// a notification sent via this stream.
pub fn get_response_id_for_notification(
&self,
notification_id: &NotificationId,
) -> Option<ResponseId> {
self.notifications_to_responses
.get(notification_id)
.cloned()
}

/// Creates and sends a batch of diem data client requests to the network
fn create_and_send_client_requests(
&mut self,
Expand Down Expand Up @@ -406,18 +419,15 @@ impl<T: DiemDataClient + Send + Clone + 'static> DataStream<T> {
self.notification_id_generator.clone(),
)?
{
// Create and save the data notification to track any future re-fetches
let sent_data_notification = SentDataNotification {
client_request: data_client_request.clone(),
client_response: data_client_response.clone(),
};
if let Some(existing_notification) = self
.sent_notifications
.insert(data_notification.notification_id, sent_data_notification)
// Save the data notification ID and response ID
let notification_id = data_notification.notification_id;
if let Some(response_id) = self
.notifications_to_responses
.insert(notification_id, data_client_response.id)
{
panic!(
"Duplicate sent notification found! This should not occur! ID: {}, notification: {:?}",
data_notification.notification_id, existing_notification
"Duplicate sent notification ID found! Notification ID: {:?}, Response ID: {:?}",
notification_id, response_id,
);
}

Expand All @@ -426,7 +436,10 @@ impl<T: DiemDataClient + Send + Clone + 'static> DataStream<T> {
(LogSchema::new(LogEntry::StreamNotification)
.stream_id(self.data_stream_id)
.event(LogEvent::Success)
.message("Sent a single stream notification!"))
.message(&format!(
"Sent a single stream notification! Notification ID: {:?}",
notification_id
)))
);
self.send_data_notification(data_notification)?;
}
Expand Down Expand Up @@ -463,10 +476,10 @@ impl<T: DiemDataClient + Send + Clone + 'static> DataStream<T> {
&mut self,
) -> (
&mut Option<VecDeque<PendingClientResponse>>,
&mut HashMap<NotificationId, SentDataNotification>,
&mut HashMap<NotificationId, ResponseId>,
) {
let sent_requests = &mut self.sent_data_requests;
let sent_notifications = &mut self.sent_notifications;
let sent_notifications = &mut self.notifications_to_responses;

(sent_requests, sent_notifications)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ pub enum LogEntry {
CheckStreamProgress,
DiemDataClient,
EndOfStreamNotification,
HandleTerminateRequest,
HandleStreamRequest,
InitializeStream,
ReceivedDataResponse,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ impl StreamProgressTracker {
Ok(TransactionStreamTracker::new(stream_request)?.into())
}
_ => Err(Error::UnsupportedRequestEncountered(format!(
"Stream request not currently supported: {:?}",
"Stream request not supported: {:?}",
stream_request
))),
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,19 +59,21 @@ pub trait DataStreamingClient {
max_proof_version: Version,
) -> Result<DataStreamListener, Error>;

/// Refetches the payload for the data notification corresponding to the
/// specified `notification_id`.
/// Terminates the stream that sent the notification with the given
/// `notification_id` and provides feedback for why the payload in the
/// notification was invalid.
///
/// Note: this is required because data payloads may be invalid, e.g., due
/// to invalid or malformed data returned by a misbehaving peer or a failure
/// to verify a proof. The refetch request forces a refetch of the payload
/// and the `refetch_reason` notifies the streaming service as to why the
/// payload must be refetched.
async fn refetch_notification_payload(
/// Note:
/// 1. This is required because data payloads may be invalid, e.g., due to
/// malformed data returned by a misbehaving peer or an invalid proof. This
/// notifies the streaming service that the payload was invalid and thus the
/// stream should be terminated and the responsible peer penalized.
/// 2. Clients that wish to continue fetching data need to open a new stream.
async fn terminate_stream_with_feedback(
&self,
notification_id: NotificationId,
refetch_reason: PayloadRefetchReason,
) -> Result<DataStreamListener, Error>;
payload_feedback: PayloadFeedback,
) -> Result<(), Error>;

/// Continuously streams transactions with proofs as the blockchain grows.
/// The stream starts at `start_version` and `start_epoch` (inclusive).
Expand Down Expand Up @@ -114,7 +116,7 @@ pub enum StreamRequest {
GetAllTransactionOutputs(GetAllTransactionOutputsRequest),
ContinuouslyStreamTransactions(ContinuouslyStreamTransactionsRequest),
ContinuouslyStreamTransactionOutputs(ContinuouslyStreamTransactionOutputsRequest),
RefetchNotificationPayload(RefetchNotificationPayloadRequest),
TerminateStream(TerminateStreamRequest),
}

/// A client request for fetching all account states at a specified version.
Expand Down Expand Up @@ -161,16 +163,16 @@ pub struct ContinuouslyStreamTransactionOutputsRequest {
pub start_epoch: Epoch,
}

/// A client request for refetching a notification payload.
/// A client request for terminating a stream and providing payload feedback.
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct RefetchNotificationPayloadRequest {
pub struct TerminateStreamRequest {
pub notification_id: NotificationId,
pub refetch_reason: PayloadRefetchReason,
pub payload_feedback: PayloadFeedback,
}

/// The reason for having to refetch a data payload in a data notification.
/// The feedback for a given payload.
#[derive(Clone, Debug, Eq, PartialEq)]
pub enum PayloadRefetchReason {
pub enum PayloadFeedback {
InvalidPayloadData,
PayloadTypeIsIncorrect,
ProofVerificationFailed,
Expand All @@ -190,15 +192,23 @@ impl StreamingServiceClient {
async fn send_stream_request(
&self,
client_request: StreamRequest,
) -> Result<DataStreamListener, Error> {
) -> Result<oneshot::Receiver<Result<DataStreamListener, Error>>, Error> {
let mut request_sender = self.request_sender.clone();
let (response_sender, response_receiver) = oneshot::channel();
let request_message = StreamRequestMessage {
stream_request: client_request,
response_sender,
};

request_sender.send(request_message).await?;

Ok(response_receiver)
}

async fn send_request_and_await_response(
&self,
client_request: StreamRequest,
) -> Result<DataStreamListener, Error> {
let response_receiver = self.send_stream_request(client_request).await?;
response_receiver.await?
}
}
Expand All @@ -207,7 +217,7 @@ impl StreamingServiceClient {
impl DataStreamingClient for StreamingServiceClient {
async fn get_all_accounts(&self, version: u64) -> Result<DataStreamListener, Error> {
let client_request = StreamRequest::GetAllAccounts(GetAllAccountsRequest { version });
self.send_stream_request(client_request).await
self.send_request_and_await_response(client_request).await
}

async fn get_all_epoch_ending_ledger_infos(
Expand All @@ -218,7 +228,7 @@ impl DataStreamingClient for StreamingServiceClient {
StreamRequest::GetAllEpochEndingLedgerInfos(GetAllEpochEndingLedgerInfosRequest {
start_epoch,
});
self.send_stream_request(client_request).await
self.send_request_and_await_response(client_request).await
}

async fn get_all_transactions(
Expand All @@ -234,7 +244,7 @@ impl DataStreamingClient for StreamingServiceClient {
max_proof_version,
include_events,
});
self.send_stream_request(client_request).await
self.send_request_and_await_response(client_request).await
}

async fn get_all_transaction_outputs(
Expand All @@ -249,20 +259,21 @@ impl DataStreamingClient for StreamingServiceClient {
end_version,
max_proof_version,
});
self.send_stream_request(client_request).await
self.send_request_and_await_response(client_request).await
}

async fn refetch_notification_payload(
async fn terminate_stream_with_feedback(
&self,
notification_id: u64,
refetch_reason: PayloadRefetchReason,
) -> Result<DataStreamListener, Error> {
let client_request =
StreamRequest::RefetchNotificationPayload(RefetchNotificationPayloadRequest {
notification_id,
refetch_reason,
});
self.send_stream_request(client_request).await
payload_feedback: PayloadFeedback,
) -> Result<(), Error> {
let client_request = StreamRequest::TerminateStream(TerminateStreamRequest {
notification_id,
payload_feedback,
});
// We can ignore the receiver as no data will be sent.
let _ = self.send_stream_request(client_request).await?;
Ok(())
}

async fn continuously_stream_transactions(
Expand All @@ -277,7 +288,7 @@ impl DataStreamingClient for StreamingServiceClient {
start_epoch,
include_events,
});
self.send_stream_request(client_request).await
self.send_request_and_await_response(client_request).await
}

async fn continuously_stream_transaction_outputs(
Expand All @@ -291,7 +302,7 @@ impl DataStreamingClient for StreamingServiceClient {
start_epoch,
},
);
self.send_stream_request(client_request).await
self.send_request_and_await_response(client_request).await
}
}

Expand Down
Loading

0 comments on commit 74626d5

Please sign in to comment.