Skip to content

Commit

Permalink
[State Sync] Add initial logging support to the Data Streaming Service.
Browse files Browse the repository at this point in the history
  • Loading branch information
JoshLind authored and bors-libra committed Nov 2, 2021
1 parent ddc74bf commit 324407b
Show file tree
Hide file tree
Showing 12 changed files with 242 additions and 22 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions state-sync/state-sync-v2/data-streaming-service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ diem-crypto = { path = "../../../crypto/crypto" }
diem-data-client = { path = "../../diem-data-client" }
diem-id-generator = { path = "../../../common/id-generator" }
diem-infallible = { path = "../../../common/infallible" }
diem-logger = { path = "../../../common/logger" }
diem-types = { path = "../../../types" }
diem-workspace-hack = { path = "../../../common/workspace-hack" }

Expand Down
52 changes: 47 additions & 5 deletions state-sync/state-sync-v2/data-streaming-service/src/data_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::{
SentDataNotification,
},
error::Error,
logging::{LogEntry, LogEvent, LogSchema},
stream_progress_tracker::{DataStreamTracker, StreamProgressTracker},
streaming_client::StreamRequest,
};
Expand All @@ -17,6 +18,7 @@ use diem_data_client::{
};
use diem_id_generator::{IdGenerator, U64IdGenerator};
use diem_infallible::Mutex;
use diem_logger::prelude::*;
use futures::{stream::FusedStream, Stream};
use std::{
collections::{HashMap, VecDeque},
Expand Down Expand Up @@ -49,6 +51,9 @@ pub type PendingClientResponse = Arc<Mutex<Box<data_notification::PendingClientR
/// proofs must be sent with monotonically increasing versions).
#[derive(Debug)]
pub struct DataStream<T> {
// The unique ID for this data stream. This is useful for logging.
data_stream_id: DataStreamId,

// The data client through which to fetch data from the Diem network
diem_data_client: T,

Expand All @@ -75,6 +80,7 @@ pub struct DataStream<T> {

impl<T: DiemDataClient + Send + Clone + 'static> DataStream<T> {
pub fn new(
data_stream_id: DataStreamId,
stream_request: &StreamRequest,
diem_data_client: T,
notification_id_generator: Arc<U64IdGenerator>,
Expand All @@ -90,6 +96,7 @@ impl<T: DiemDataClient + Send + Clone + 'static> DataStream<T> {

// Create a new data stream
let data_stream = Self {
data_stream_id,
diem_data_client,
stream_progress_tracker,
sent_data_requests: None,
Expand Down Expand Up @@ -144,6 +151,15 @@ impl<T: DiemDataClient + Send + Clone + 'static> DataStream<T> {
self.get_sent_data_requests()
.push_back(pending_client_response);
}
debug!(
(LogSchema::new(LogEntry::SendDataRequests)
.stream_id(self.data_stream_id)
.event(LogEvent::Success)
.message(&format!(
"Sent {:?} data requests to the network",
max_num_requests_to_send
)))
);
}
Ok(())
}
Expand Down Expand Up @@ -246,6 +262,12 @@ impl<T: DiemDataClient + Send + Clone + 'static> DataStream<T> {
};

// Send the data notification
info!(
(LogSchema::new(LogEntry::EndOfStreamNotification)
.stream_id(self.data_stream_id)
.event(LogEvent::Pending)
.message("Sent the end of stream notification"))
);
self.stream_end_notification_id = Some(notification_id);
self.send_data_notification(data_notification)
}
Expand Down Expand Up @@ -324,8 +346,13 @@ impl<T: DiemDataClient + Send + Clone + 'static> DataStream<T> {
fn handle_data_client_error(
&mut self,
data_client_request: &DataClientRequest,
_data_client_error: &diem_data_client::Error,
data_client_error: &diem_data_client::Error,
) -> Result<(), Error> {
error!(LogSchema::new(LogEntry::ReceivedDataResponse)
.stream_id(self.data_stream_id)
.event(LogEvent::Error)
.error(&data_client_error.clone().into()));

// TODO(joshlind): don't just resend the request. Identify the best
// way to react based on the error.
self.resend_data_client_request(data_client_request)
Expand All @@ -349,10 +376,19 @@ impl<T: DiemDataClient + Send + Clone + 'static> DataStream<T> {

/// Notifies the Diem data client of a bad client response
fn notify_bad_response(&self, data_client_response: &DataClientResponse) {
self.diem_data_client.notify_bad_response(
data_client_response.id,
ResponseError::InvalidPayloadDataType,
);
let response_id = data_client_response.id;
let response_error = ResponseError::InvalidPayloadDataType;

info!(LogSchema::new(LogEntry::ReceivedDataResponse)
.stream_id(self.data_stream_id)
.event(LogEvent::Error)
.message(&format!(
"Notifying the data client of a bad response. Response id: {:?}, error: {:?}",
response_id, response_error
)));

self.diem_data_client
.notify_bad_response(response_id, response_error);
}

/// Sends a data notification to the client along the stream
Expand Down Expand Up @@ -386,6 +422,12 @@ impl<T: DiemDataClient + Send + Clone + 'static> DataStream<T> {
}

// Send the notification along the stream
debug!(
(LogSchema::new(LogEntry::StreamNotification)
.stream_id(self.data_stream_id)
.event(LogEvent::Success)
.message("Sent a single stream notification!"))
);
self.send_data_notification(data_notification)?;
}

Expand Down
1 change: 1 addition & 0 deletions state-sync/state-sync-v2/data-streaming-service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
mod data_notification;
mod data_stream;
mod error;
mod logging;
mod stream_progress_tracker;
mod streaming_client;
mod streaming_service;
Expand Down
50 changes: 50 additions & 0 deletions state-sync/state-sync-v2/data-streaming-service/src/logging.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Copyright (c) The Diem Core Contributors
// SPDX-License-Identifier: Apache-2.0

use crate::error::Error;
use diem_logger::Schema;
use serde::Serialize;

#[derive(Schema)]
pub struct LogSchema<'a> {
name: LogEntry,
error: Option<&'a Error>,
event: Option<LogEvent>,
message: Option<&'a str>,
stream_id: Option<u64>,
}

impl<'a> LogSchema<'a> {
pub fn new(name: LogEntry) -> Self {
Self {
name,
error: None,
event: None,
message: None,
stream_id: None,
}
}
}

#[derive(Clone, Copy, Serialize)]
#[serde(rename_all = "snake_case")]
pub enum LogEntry {
CheckStreamProgress,
DiemDataClient,
EndOfStreamNotification,
HandleStreamRequest,
InitializeStream,
ReceivedDataResponse,
RefreshGlobalData,
RespondToStreamRequest,
SendDataRequests,
StreamNotification,
}

#[derive(Clone, Copy, Serialize)]
#[serde(rename_all = "snake_case")]
pub enum LogEvent {
Error,
Pending,
Success,
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@ use crate::{
NumberOfAccountsRequest, TransactionOutputsWithProofRequest, TransactionsWithProofRequest,
},
error::Error,
logging::{LogEntry, LogEvent, LogSchema},
streaming_client::{
Epoch, GetAllAccountsRequest, GetAllEpochEndingLedgerInfosRequest, StreamRequest,
},
};
use diem_data_client::{AdvertisedData, GlobalDataSummary, ResponsePayload};
use diem_id_generator::{IdGenerator, U64IdGenerator};
use diem_logger::prelude::*;
use diem_types::{ledger_info::LedgerInfoWithSignatures, transaction::Version};
use enum_dispatch::enum_dispatch;
use itertools::Itertools;
Expand Down Expand Up @@ -215,6 +217,14 @@ impl DataStreamTracker for AccountsStreamTracker {

Ok(client_requests)
} else {
info!(
(LogSchema::new(LogEntry::DiemDataClient)
.event(LogEvent::Pending)
.message(&format!(
"Requested the number of accounts at version: {:?}",
self.request.version
)))
);
self.account_num_requested = true;
Ok(vec![DataClientRequest::NumberOfAccounts(
NumberOfAccountsRequest {
Expand Down Expand Up @@ -273,13 +283,19 @@ impl DataStreamTracker for AccountsStreamTracker {
);
return Ok(Some(data_notification));
}
NumberOfAccounts(_) => {
NumberOfAccounts(request) => {
if let ResponsePayload::NumberOfAccountStates(number_of_accounts) =
client_response.payload
{
// We got a response. Save the number of accounts.
self.number_of_accounts = Some(number_of_accounts);
self.account_num_requested = false;
info!(
(LogSchema::new(LogEntry::ReceivedDataResponse)
.event(LogEvent::Success)
.message(&format!("Received number of accounts at version: {:?}. Total accounts: {:?}",
request.version, number_of_accounts)))
);
}
}
request => invalid_client_request!(request, self),
Expand Down Expand Up @@ -482,6 +498,14 @@ impl DataStreamTracker for ContinuousTransactionStreamTracker {
self.select_target_ledger_info(&global_data_summary.advertised_data)?;
if target_ledger_info.ledger_info().epoch() > next_request_epoch {
// There was an epoch change. Request an epoch ending ledger info.
info!(
(LogSchema::new(LogEntry::DiemDataClient)
.event(LogEvent::Pending)
.message(&format!(
"Requested an epoch ending ledger info for epoch: {:?}",
target_ledger_info.ledger_info().epoch()
)))
);
self.end_of_epoch_requested = true;
return Ok(vec![DataClientRequest::EpochEndingLedgerInfos(
EpochEndingLedgerInfosRequest {
Expand All @@ -490,6 +514,15 @@ impl DataStreamTracker for ContinuousTransactionStreamTracker {
},
)]);
} else {
info!(
(LogSchema::new(LogEntry::ReceivedDataResponse)
.event(LogEvent::Success)
.message(&format!(
"Setting new target ledger info. Version: {:?}, Epoch: {:?}",
target_ledger_info.ledger_info().version(),
target_ledger_info.ledger_info().epoch()
)))
);
self.target_ledger_info = Some(target_ledger_info);
}
}
Expand Down Expand Up @@ -559,13 +592,39 @@ impl DataStreamTracker for ContinuousTransactionStreamTracker {
if let ResponsePayload::EpochEndingLedgerInfos(epoch_ending_ledger_infos) =
&client_response.payload
{
if let [target_ledger_info] = &epoch_ending_ledger_infos[..] {
self.target_ledger_info = Some(target_ledger_info.clone());
} else {
// TODO(joshlind): notify the data client of the bad response
match &epoch_ending_ledger_infos[..] {
[target_ledger_info] => {
info!(
(LogSchema::new(LogEntry::ReceivedDataResponse)
.event(LogEvent::Success)
.message(&format!(
"Received an epoch ending ledger info for epoch: {:?}. \
Setting new target version: {:?}",
target_ledger_info.ledger_info().epoch(),
target_ledger_info.ledger_info().version()
)))
);
self.target_ledger_info = Some(target_ledger_info.clone());
}
response_payload => {
// TODO(joshlind): notify the data client of the bad response
debug!(
(LogSchema::new(LogEntry::ReceivedDataResponse)
.event(LogEvent::Error)
.message(&format!("Received an incorrect number of epoch ending ledger infos. Response: {:?}", response_payload))
));
}
}
} else {
// TODO(joshlind): notify the data client of the bad response
debug!(
(LogSchema::new(LogEntry::ReceivedDataResponse)
.event(LogEvent::Error)
.message(&format!(
"Received an invalid epoch ending ledger response: {:?}",
client_response.payload
)))
);
}
self.end_of_epoch_requested = false;
Ok(None)
Expand Down Expand Up @@ -656,6 +715,14 @@ impl EpochEndingStreamTracker {
end_epoch, request.start_epoch
)));
}
info!(
(LogSchema::new(LogEntry::ReceivedDataResponse)
.event(LogEvent::Success)
.message(&format!(
"Setting the end epoch for the stream at: {:?}",
end_epoch
)))
);

Ok(EpochEndingStreamTracker {
request: request.clone(),
Expand Down
Loading

0 comments on commit 324407b

Please sign in to comment.