diff --git a/sui_core/src/unit_tests/gateway_tests.rs b/sui_core/src/unit_tests/gateway_tests.rs index a5019f98eab40..69b11205eb74d 100644 --- a/sui_core/src/unit_tests/gateway_tests.rs +++ b/sui_core/src/unit_tests/gateway_tests.rs @@ -13,8 +13,9 @@ use std::{ }; use async_trait::async_trait; -use futures::channel::mpsc::Receiver; +use futures::channel::mpsc::{channel, Receiver}; use futures::lock::Mutex; +use futures::SinkExt; use move_core_types::{account_address::AccountAddress, ident_str, identifier::Identifier}; use signature::Signer; use typed_store::Map; @@ -29,6 +30,8 @@ use sui_types::messages::Transaction; use sui_types::object::{Data, Object, Owner, GAS_VALUE_FOR_TESTING}; use crate::authority::{AuthorityState, AuthorityStore}; +use crate::authority_client::BUFFER_SIZE; +use crate::gateway_state::gateway_store::AccountStore; use crate::gateway_state::{GatewayAPI, GatewayState}; use super::*; @@ -116,9 +119,31 @@ impl AuthorityAPI for LocalAuthorityClient { /// Handle Batch information requests for this authority. async fn handle_batch_streaming( &self, - _request: BatchInfoRequest, + request: BatchInfoRequest, ) -> Result>, io::Error> { - todo!() + let state = self.0.clone(); + let (mut tx_output, tr_output) = channel(BUFFER_SIZE); + + let update_items = state + .lock() + .await + .handle_batch_info_request(request) + .await; + + match update_items { + Ok(t) => { + let mut deq = t.0; + while let Some(update_item) = deq.pop_front() { + let batch_info_response_item = BatchInfoResponseItem(update_item.clone()); + let _ = tx_output.send(Ok(batch_info_response_item)).await; + } + }, + Err(e) => { + let err = std::io::Error::new(std::io::ErrorKind::Other, e); + return Err(err); + }, + } + Ok(tr_output) } }