Skip to content

Commit 7996f7a

Browse files
committed
still looking
1 parent 4cbebab commit 7996f7a

File tree

4 files changed

+38
-22
lines changed

4 files changed

+38
-22
lines changed

core/src/distributed/distributed_stream.rs

+19-5
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
//! `aeron_subscribe`.
66
77
use crate::core_tx::TxCore;
8-
use crate::{abstract_executor, channel_builder::ChannelBuilder, monitor::{RxMetaData, TxMetaData}, Rx, SteadyCommander, Tx};
8+
use crate::{abstract_executor, channel_builder::ChannelBuilder, Rx, SteadyCommander, Tx};
99
use ahash::AHashMap;
1010
use async_ringbuf::wrap::AsyncWrap;
1111
use async_ringbuf::AsyncRb;
@@ -867,15 +867,29 @@ impl<T: StreamItem> LazyStreamRx<T> {
867867
}
868868
}
869869

870+
//TODO: these two methods control streams showing items not bytes, this may need to be corrected.
871+
870872
impl<T: StreamItem> RxMetaDataProvider for SteadyStreamRx<T> {
871-
fn meta_data(&self) -> RxMetaData {
872-
self.meta_data()
873+
fn meta_data(&self) -> Arc<ChannelMetaData> {
874+
match self.try_lock() {
875+
Some(guard) => guard.item_channel.channel_meta_data.meta_data(),
876+
None => {
877+
let guard = abstract_executor::block_on(self.lock());
878+
guard.item_channel.channel_meta_data.meta_data()
879+
}
880+
}
873881
}
874882
}
875883

876884
impl<T: StreamItem> TxMetaDataProvider for SteadyStreamTx<T> {
877-
fn meta_data(&self) -> TxMetaData {
878-
self.meta_data()
885+
fn meta_data(&self) -> Arc<ChannelMetaData> {
886+
match self.try_lock() {
887+
Some(guard) => guard.item_channel.channel_meta_data.meta_data(),
888+
None => {
889+
let guard = abstract_executor::block_on(self.lock());
890+
guard.item_channel.channel_meta_data.meta_data()
891+
}
892+
}
879893
}
880894
}
881895

core/src/monitor.rs

+16-14
Original file line numberDiff line numberDiff line change
@@ -947,20 +947,22 @@ pub(crate) mod monitor_tests {
947947
}
948948

949949
// Test for wait_shutdown
950-
// #[async_std::test]
951-
// async fn test_wait_shutdown() {
952-
// let context = test_steady_context();
953-
// let monitor = context.into_monitor([], []);
954-
//
955-
// // Simulate shutdown
956-
// {
957-
// let mut liveliness = monitor.runtime_state.write();
958-
// liveliness.request_shutdown();
959-
// }
960-
//
961-
// let result = monitor.wait_shutdown().await;
962-
// assert!(result);
963-
// }
950+
#[async_std::test]
951+
async fn test_wait_shutdown() {
952+
let context = test_steady_context();
953+
error!("expecting hang 1");
954+
let monitor = context.into_monitor([], []);
955+
error!("expecting hang 2");
956+
// Simulate shutdown
957+
{
958+
let mut liveliness = monitor.runtime_state.write();
959+
liveliness.request_shutdown();
960+
}
961+
error!("expecting hang 3");
962+
963+
let result = monitor.wait_shutdown().await;
964+
assert!(result);
965+
}
964966

965967
// Test for wait_periodic
966968
#[async_std::test]

core/src/steady_rx.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -372,7 +372,7 @@ impl<T> Rx<T> {
372372
}
373373

374374
// difficult to move because we have dual iterators and peek
375-
pub(crate) async fn shared_peek_async_iter_timeout(&mut self, wait_for_count: usize, timeout: Option<Duration>) -> impl Iterator<Item = &T> {
375+
pub(crate) async fn _shared_peek_async_iter_timeout(&mut self, wait_for_count: usize, timeout: Option<Duration>) -> impl Iterator<Item = &T> {
376376
let mut one_down = &mut self.oneshot_shutdown;
377377
if !one_down.is_terminated() {
378378
let mut operation = &mut self.rx.wait_occupied(wait_for_count);

core/src/telemetry/metrics_server.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,9 @@ pub(crate) async fn run(context: SteadyContext, rx: SteadyRx<DiagramData>) -> Re
4141
, steady_config::telemetry_server_port()));
4242

4343
let frame_rate_ms = context.frame_rate_ms;
44-
let mut ctrl = context;
44+
let ctrl = context;
4545
#[cfg(feature = "telemetry_on_telemetry")]
46-
let mut ctrl = ctrl.into_monitor([&rx], []);
46+
let ctrl = ctrl.into_monitor([&rx], []);
4747

4848
internal_behavior(ctrl, frame_rate_ms, rx, addr).await
4949
}

0 commit comments

Comments
 (0)