Skip to content

Commit

Permalink
feat: add complete exec out + filter
Browse files Browse the repository at this point in the history
  • Loading branch information
aoudiamoncef committed May 11, 2023
1 parent 8617d55 commit 08f3f24
Show file tree
Hide file tree
Showing 15 changed files with 301 additions and 158 deletions.
14 changes: 5 additions & 9 deletions massa-execution-exports/src/mapping_grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,12 @@ impl From<SlotExecutionOutput> for grpc::SlotExecutionOutput {
fn from(value: SlotExecutionOutput) -> Self {
match value {
SlotExecutionOutput::ExecutedSlot(execution_output) => grpc::SlotExecutionOutput {
message: Some(grpc::slot_execution_output::Message::ExecutionOutput(
execution_output.into(),
)),
status: vec![grpc::ExecutionOutputStatus::Candidate as i32],
execution_output: Some(execution_output.into()),
},
SlotExecutionOutput::FinalizedSlot(finalized_slot) => grpc::SlotExecutionOutput {
message: Some(grpc::slot_execution_output::Message::FinalExecutionOutput(
grpc::FinalizedExecutionOutput {
slot: Some(finalized_slot.into()),
},
)),
SlotExecutionOutput::FinalizedSlot(execution_output) => grpc::SlotExecutionOutput {
status: vec![grpc::ExecutionOutputStatus::Final as i32],
execution_output: Some(execution_output.into()),
},
}
}
Expand Down
2 changes: 1 addition & 1 deletion massa-execution-exports/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ pub enum SlotExecutionOutput {
ExecutedSlot(ExecutionOutput),

/// Finalized slot output
FinalizedSlot(Slot),
FinalizedSlot(ExecutionOutput),
}

/// structure describing the output of a single execution
Expand Down
4 changes: 2 additions & 2 deletions massa-execution-worker/src/execution.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1263,7 +1263,7 @@ impl ExecutionState {

// Broadcast a final slot execution output to active channel subscribers.
if self.config.broadcast_enabled {
let slot_exec_out = SlotExecutionOutput::FinalizedSlot(exec_out.slot);
let slot_exec_out = SlotExecutionOutput::FinalizedSlot(exec_out.clone());
if let Err(err) = self
.channels
.slot_execution_output_sender
Expand Down Expand Up @@ -1311,7 +1311,7 @@ impl ExecutionState {

// Broadcast a final slot execution output to active channel subscribers.
if self.config.broadcast_enabled {
let slot_exec_out = SlotExecutionOutput::FinalizedSlot(exec_out.slot);
let slot_exec_out = SlotExecutionOutput::FinalizedSlot(exec_out.clone());
if let Err(err) = self
.channels
.slot_execution_output_sender
Expand Down
2 changes: 1 addition & 1 deletion massa-grpc/src/stream/new_blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ pub(crate) async fn new_blocks(
break;
}
},
Err(e) => {error!("error on receive new block : {}", e)}
Err(e) => error!("error on receive new block : {}", e)
}
},
// Receive a new message from the in_stream
Expand Down
2 changes: 1 addition & 1 deletion massa-grpc/src/stream/new_blocks_headers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ pub(crate) async fn new_blocks_headers(
break;
}
},
Err(e) => {error!("error on receive new block header : {}", e)}
Err(e) => error!("error on receive new block header : {}", e)
}
},
// Receive a new message from the in_stream
Expand Down
2 changes: 1 addition & 1 deletion massa-grpc/src/stream/new_endorsements.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ pub(crate) async fn new_endorsements(
break;
}
},
Err(e) => {error!("error on receive new endorsement : {}", e)}
Err(e) => error!("error on receive new endorsement : {}", e)
}
},
// Receive a new message from the in_stream
Expand Down
2 changes: 1 addition & 1 deletion massa-grpc/src/stream/new_filled_blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ pub(crate) async fn new_filled_blocks(
break;
}
},
Err(e) => {error!("error on receive new block : {}", e)}
Err(e) => error!("error on receive new block : {}", e)
}
},
// Receive a new message from the in_stream
Expand Down
20 changes: 10 additions & 10 deletions massa-grpc/src/stream/new_operations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,17 +102,17 @@ pub(crate) async fn new_operations(
Ok(Box::pin(out_stream) as NewOperationsStreamType)
}

/// Return if the type of operation should be send to client
fn should_send(filter_opt: &Option<grpc::NewOperationsFilter>, ope_type: grpc::OpType) -> bool {
if let Some(filter) = filter_opt {
let filtered_ope_ids = &filter.types;
if filtered_ope_ids.is_empty() {
return true;
match filter_opt {
Some(filter) => {
let filtered_ope_ids = &filter.types;
if filtered_ope_ids.is_empty() {
true
} else {
let id: i32 = ope_type as i32;
filtered_ope_ids.contains(&id)
}
}
let id: i32 = ope_type as i32;
filtered_ope_ids.contains(&id)
} else {
// if user has no filter = All operations type is send
true
None => true, // if user has no filter = All operations type is send
}
}
125 changes: 79 additions & 46 deletions massa-grpc/src/stream/new_slot_execution_outputs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
use crate::error::{match_for_io_error, GrpcError};
use crate::server::MassaGrpc;
use futures_util::StreamExt;
use massa_execution_exports::SlotExecutionOutput;
use massa_proto::massa::api::v1 as grpc;
use std::io::ErrorKind;
use std::pin::Pin;
Expand Down Expand Up @@ -38,60 +39,72 @@ pub(crate) async fn new_slot_execution_outputs(

tokio::spawn(async move {
// Initialize the request_id string
let mut request_id = String::new();
loop {
select! {
// Receive a new slot execution output from the subscriber
event = subscriber.recv() => {
match event {
Ok(massa_slot_execution_output) => {
// Send the new slot execution output through the channel
if let Err(e) = tx.send(Ok(grpc::NewSlotExecutionOutputsResponse {
id: request_id.clone(),
output: Some(massa_slot_execution_output.into())
})).await {
error!("failed to send new slot execution output : {}", e);
break;
}
},
if let Some(Ok(request)) = in_stream.next().await {
let mut request_id = request.id;
let mut filter = request.query.and_then(|q| q.filter);
loop {
select! {
// Receive a new slot execution output from the subscriber
event = subscriber.recv() => {
match event {
Ok(massa_slot_execution_output) => {
// Check if the slot execution output should be sent
if !should_send(&filter, &massa_slot_execution_output) {
continue;
}
// Send the new slot execution output through the channel
if let Err(e) = tx.send(Ok(grpc::NewSlotExecutionOutputsResponse {
id: request_id.clone(),
output: Some(massa_slot_execution_output.into())
})).await {
error!("failed to send new slot execution output : {}", e);
break;
}
},

Err(e) => {error!("error on receive new slot execution output : {}", e)}
}
},
// Receive a new message from the in_stream
res = in_stream.next() => {
match res {
Some(res) => {
match res {
// Get the request_id from the received data
Ok(data) => {
request_id = data.id
},
// Handle any errors that may occur during receiving the data
Err(err) => {
// Check if the error matches any IO errors
if let Some(io_err) = match_for_io_error(&err) {
if io_err.kind() == ErrorKind::BrokenPipe {
warn!("client disconnected, broken pipe: {}", io_err);
Err(e) => error!("error on receive new slot execution output : {}", e)
}
},
// Receive a new message from the in_stream
res = in_stream.next() => {
match res {
Some(res) => {
match res {
// Get the request_id from the received data
Ok(data) => {
// Update current filter && request id
filter = data.query
.and_then(|q| q.filter);
request_id = data.id
},
// Handle any errors that may occur during receiving the data
Err(err) => {
// Check if the error matches any IO errors
if let Some(io_err) = match_for_io_error(&err) {
if io_err.kind() == ErrorKind::BrokenPipe {
warn!("client disconnected, broken pipe: {}", io_err);
break;
}
}
error!("{}", err);
// Send the error response back to the client
if let Err(e) = tx.send(Err(err)).await {
error!("failed to send back new_slot_execution_outputs error response: {}", e);
break;
}
}
error!("{}", err);
// Send the error response back to the client
if let Err(e) = tx.send(Err(err)).await {
error!("failed to send back new_slot_execution_outputs error response: {}", e);
break;
}
}
}
},
None => {
// The client has disconnected
break;
},
},
None => {
// The client has disconnected
break;
},
}
}
}
}
} else {
error!("empty request");
}
});

Expand All @@ -101,3 +114,23 @@ pub(crate) async fn new_slot_execution_outputs(
// Return the new stream of slot execution output
Ok(Box::pin(out_stream) as NewSlotExecutionOutputsStreamType)
}

/// Return if the execution outputs should be send to client
fn should_send(
filter_opt: &Option<grpc::NewSlotExecutionOutputsFilter>,
exec_out_status: &SlotExecutionOutput,
) -> bool {
match filter_opt {
Some(filter) => match exec_out_status {
SlotExecutionOutput::ExecutedSlot(_) => {
let id = grpc::ExecutionOutputStatus::Candidate as i32;
filter.status.contains(&id)
}
SlotExecutionOutput::FinalizedSlot(_) => {
let id = grpc::ExecutionOutputStatus::Final as i32;
filter.status.contains(&id)
}
},
None => true, // if user has no filter = All execution outputs status are sent
}
}
63 changes: 48 additions & 15 deletions massa-proto/doc/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@
- [NewOperationsQuery](#massa-api-v1-NewOperationsQuery)
- [NewOperationsRequest](#massa-api-v1-NewOperationsRequest)
- [NewOperationsResponse](#massa-api-v1-NewOperationsResponse)
- [NewSlotExecutionOutputsFilter](#massa-api-v1-NewSlotExecutionOutputsFilter)
- [NewSlotExecutionOutputsQuery](#massa-api-v1-NewSlotExecutionOutputsQuery)
- [NewSlotExecutionOutputsRequest](#massa-api-v1-NewSlotExecutionOutputsRequest)
- [NewSlotExecutionOutputsResponse](#massa-api-v1-NewSlotExecutionOutputsResponse)
- [OperationResult](#massa-api-v1-OperationResult)
Expand Down Expand Up @@ -100,8 +102,8 @@
- [ScExecutionEventContext](#massa-api-v1-ScExecutionEventContext)
- [SlotExecutionOutput](#massa-api-v1-SlotExecutionOutput)

- [ExecutionOutputStatus](#massa-api-v1-ExecutionOutputStatus)
- [ScExecutionEventStatus](#massa-api-v1-ScExecutionEventStatus)
- [ScExecutionOutputStatus](#massa-api-v1-ScExecutionOutputStatus)

- [operation.proto](#operation-proto)
- [CallSC](#massa-api-v1-CallSC)
Expand Down Expand Up @@ -920,6 +922,36 @@ NewOperationsResponse holds response from NewOperations



<a name="massa-api-v1-NewSlotExecutionOutputsFilter"></a>

### NewSlotExecutionOutputsFilter
NewSlotExecutionOutputs Filter


| Field | Type | Label | Description |
| ----- | ---- | ----- | ----------- |
| status | [ExecutionOutputStatus](#massa-api-v1-ExecutionOutputStatus) | repeated | Execution output status enum |






<a name="massa-api-v1-NewSlotExecutionOutputsQuery"></a>

### NewSlotExecutionOutputsQuery
NewSlotExecutionOutputs Query


| Field | Type | Label | Description |
| ----- | ---- | ----- | ----------- |
| filter | [NewSlotExecutionOutputsFilter](#massa-api-v1-NewSlotExecutionOutputsFilter) | | Filter |






<a name="massa-api-v1-NewSlotExecutionOutputsRequest"></a>

### NewSlotExecutionOutputsRequest
Expand All @@ -929,6 +961,7 @@ NewSlotExecutionOutputsRequest holds request for NewSlotExecutionOutputs
| Field | Type | Label | Description |
| ----- | ---- | ----- | ----------- |
| id | [string](#string) | | Request id |
| query | [NewSlotExecutionOutputsQuery](#massa-api-v1-NewSlotExecutionOutputsQuery) | | Query |



Expand Down Expand Up @@ -1552,8 +1585,8 @@ SlotExecutionOutput

| Field | Type | Label | Description |
| ----- | ---- | ----- | ----------- |
| status | [ExecutionOutputStatus](#massa-api-v1-ExecutionOutputStatus) | repeated | Status |
| execution_output | [ExecutionOutput](#massa-api-v1-ExecutionOutput) | | Executed slot output |
| final_execution_output | [FinalizedExecutionOutput](#massa-api-v1-FinalizedExecutionOutput) | | Executed final slot |



Expand All @@ -1562,30 +1595,30 @@ SlotExecutionOutput



<a name="massa-api-v1-ScExecutionEventStatus"></a>
<a name="massa-api-v1-ExecutionOutputStatus"></a>

### ScExecutionEventStatus
ScExecutionEventStatus type enum
### ExecutionOutputStatus
ExecutionOutputStatus type enum

| Name | Number | Description |
| ---- | ------ | ----------- |
| SC_EXECUTION_EVENT_STATUS_UNSPECIFIED | 0 | Defaut enum value |
| SC_EXECUTION_EVENT_STATUS_FINAL | 1 | Final status |
| SC_EXECUTION_EVENT_STATUS_READ_ONLY | 2 | Read only status |
| SC_EXECUTION_EVENT_STATUS_FAILURE | 3 | Failure status |
| EXECUTION_OUTPUT_STATUS_UNSPECIFIED | 0 | Defaut enum value |
| EXECUTION_OUTPUT_STATUS_CANDIDATE | 1 | Candidate status |
| EXECUTION_OUTPUT_STATUS_FINAL | 2 | Final status |



<a name="massa-api-v1-ScExecutionOutputStatus"></a>
<a name="massa-api-v1-ScExecutionEventStatus"></a>

### ScExecutionOutputStatus
ScExecutionOutputStatus type enum
### ScExecutionEventStatus
ScExecutionEventStatus type enum

| Name | Number | Description |
| ---- | ------ | ----------- |
| SC_EXECUTION_OUTPUT_STATUS_UNSPECIFIED | 0 | Defaut enum value |
| SC_EXECUTION_OUTPUT_STATUS_FINAL | 1 | Final status |
| SC_EXECUTION_OUTPUT_STATUS_CANDIDATE | 2 | Read only status |
| SC_EXECUTION_EVENT_STATUS_UNSPECIFIED | 0 | Defaut enum value |
| SC_EXECUTION_EVENT_STATUS_FINAL | 1 | Final status |
| SC_EXECUTION_EVENT_STATUS_READ_ONLY | 2 | Read only status |
| SC_EXECUTION_EVENT_STATUS_FAILURE | 3 | Failure status |



Expand Down
Loading

0 comments on commit 08f3f24

Please sign in to comment.