Skip to content

Commit eb93ee8

Browse files
committed
refactor edge simulations
1 parent 3b54c45 commit eb93ee8

8 files changed

+198
-59
lines changed

core/examples/fizz_buzz/actor/console_printer.rs

+5-14
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use std::time::Duration;
66
use steady_state::*;
77
use std::error::Error;
88
use std::ops::DerefMut;
9+
use steady_state::simulate_edge::Behavior::Equals;
910
use crate::actor::fizz_buzz_processor::FizzBuzzMessage;
1011
use crate::actor::timer_actor::PrintSignal;
1112

@@ -80,20 +81,10 @@ pub async fn run(context: SteadyContext
8081
, fizzbuzz_rx: SteadyRx<FizzBuzzMessage>
8182
, print_rx: SteadyRx<PrintSignal>
8283
) -> Result<(),Box<dyn Error>> {
83-
let mut cmd = context.into_monitor([&fizzbuzz_rx,&print_rx], []);
84-
if let Some(responder) = cmd.sidechannel_responder() {
85-
let mut fizzbuzz_messages_rx = fizzbuzz_rx.lock().await;
86-
let mut print_signal_rx = print_rx.lock().await;
87-
while cmd.is_running(&mut ||
88-
fizzbuzz_messages_rx.is_closed_and_empty() &&
89-
print_signal_rx.is_closed_and_empty()) {
90-
// in main use graph.sidechannel_director node_call(msg,"ConsolePrinter")
91-
let _did_check = responder.equals_responder(&mut cmd,&mut fizzbuzz_messages_rx).await;
92-
// in main use graph.sidechannel_director node_call(msg,"ConsolePrinter")
93-
let _did_check = responder.equals_responder(&mut cmd,&mut print_signal_rx).await;
94-
}
95-
}
96-
Ok(())
84+
85+
context.into_monitor([&fizzbuzz_rx,&print_rx], [])
86+
.simulated_behavior([&Equals(fizzbuzz_rx),&Equals(print_rx)]).await
87+
9788
}
9889

9990

core/src/commander.rs

+5-2
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ use std::future::Future;
33
use std::time::{Duration, Instant};
44
use futures_util::future::FusedFuture;
55
use std::any::Any;
6+
use std::error::Error;
7+
use std::fmt::Debug;
68
use crate::{steady_config, ActorIdentity, GraphLivelinessState, Rx, RxCoreBundle, SendSaturation, Tx, TxCoreBundle};
79
use crate::graph_testing::SideChannelResponder;
810
use crate::monitor::{RxMetaData, TxMetaData};
@@ -15,7 +17,7 @@ use crate::commander_monitor::LocalMonitor;
1517
use crate::core_rx::RxCore;
1618
use crate::core_tx::TxCore;
1719
use crate::distributed::distributed_stream::{Defrag, StreamItem};
18-
20+
use crate::simulate_edge::{Behavior, IntoSymRunner, SymRunner};
1921

2022
impl SteadyContext {
2123
/// Converts the context into a local monitor.
@@ -132,8 +134,9 @@ impl<X> SendOutcome<X> {
132134
#[allow(async_fn_in_trait)]
133135
pub trait SteadyCommander {
134136

137+
async fn simulated_behavior<const LEN: usize >(self, sims: [&dyn IntoSymRunner<Self>;LEN]) -> Result<(), Box<dyn Error>>;
135138

136-
/// set log level for the entire application
139+
/// set log level for the entire application
137140
fn loglevel(&self, loglevel: crate::LogLevel);
138141

139142
/// Triggers the transmission of all collected telemetry data to the configured telemetry endpoints.

core/src/commander_context.rs

+7-1
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ use std::sync::Arc;
33
use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
44
use parking_lot::RwLock;
55
use std::any::Any;
6+
use std::error::Error;
7+
use std::fmt::Debug;
68
use futures_util::lock::{Mutex};
79
use futures::channel::oneshot;
810
use futures_util::stream::FuturesUnordered;
@@ -16,14 +18,15 @@ use ringbuf::consumer::Consumer;
1618
use ringbuf::traits::Observer;
1719
use ringbuf::producer::Producer;
1820
use std::ops::DerefMut;
19-
use crate::{ActorIdentity, GraphLiveliness, GraphLivelinessState, Rx, RxCoreBundle, SendSaturation, SteadyCommander, Tx, TxCoreBundle};
21+
use crate::{simulate_edge, ActorIdentity, GraphLiveliness, GraphLivelinessState, Rx, RxCoreBundle, SendSaturation, SteadyCommander, Tx, TxCoreBundle};
2022
use crate::actor_builder::NodeTxRx;
2123
use crate::commander::SendOutcome;
2224
use crate::core_rx::RxCore;
2325
use crate::core_tx::TxCore;
2426
use crate::distributed::distributed_stream::{Defrag, StreamItem};
2527
use crate::graph_testing::SideChannelResponder;
2628
use crate::monitor::{ActorMetaData};
29+
use crate::simulate_edge::{Behavior, IntoSymRunner, SymRunner};
2730
use crate::telemetry::metrics_collector::CollectorDetail;
2831
use crate::util::logger;
2932
use crate::yield_now::yield_now;
@@ -74,6 +77,9 @@ impl Clone for SteadyContext {
7477
impl SteadyCommander for SteadyContext {
7578

7679

80+
async fn simulated_behavior<const LEN: usize >(self, sims: [&dyn IntoSymRunner<SteadyContext>;LEN]) -> Result<(), Box<dyn Error>> {
81+
simulate_edge::simulated_behavior::<SteadyContext, LEN>(self, sims).await
82+
}
7783

7884
/// Initializes the logger with the specified log level.
7985
fn loglevel(&self, loglevel: crate::LogLevel) {

core/src/commander_monitor.rs

+16-3
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ use parking_lot::RwLock;
66
use futures_util::lock::{Mutex, MutexGuard};
77
use futures::channel::oneshot;
88
use std::any::{type_name, Any};
9+
use std::error::Error;
10+
use std::fmt::Debug;
911
use futures_util::future::{FusedFuture};
1012
use futures_timer::Delay;
1113
use futures_util::{select, FutureExt, StreamExt};
@@ -18,14 +20,15 @@ use ringbuf::traits::Observer;
1820
use ringbuf::consumer::Consumer;
1921
use ringbuf::producer::Producer;
2022
use crate::monitor::{DriftCountIterator, FinallyRollupProfileGuard, CALL_BATCH_READ, CALL_BATCH_WRITE, CALL_OTHER, CALL_SINGLE_READ, CALL_SINGLE_WRITE, CALL_WAIT};
21-
use crate::{yield_now, ActorIdentity, GraphLiveliness, GraphLivelinessState, Rx, RxCoreBundle, SendSaturation, SteadyCommander, SteadyState, Tx, TxCoreBundle, MONITOR_NOT};
23+
use crate::{simulate_edge, yield_now, ActorIdentity, GraphLiveliness, GraphLivelinessState, Rx, RxCoreBundle, SendSaturation, SteadyCommander, SteadyContext, SteadyState, Tx, TxCoreBundle, MONITOR_NOT};
2224
use crate::actor_builder::NodeTxRx;
2325
use crate::commander::SendOutcome;
2426
use crate::core_rx::RxCore;
2527
use crate::core_tx::TxCore;
2628
use crate::distributed::distributed_stream::{Defrag, StreamItem};
2729
use crate::graph_testing::SideChannelResponder;
2830
use crate::monitor_telemetry::SteadyTelemetry;
31+
use crate::simulate_edge::{Behavior, IntoSymRunner, SymRunner};
2932
use crate::steady_config::{CONSUMED_MESSAGES_BY_COLLECTOR, REAL_CHANNEL_LENGTH_TO_COLLECTOR};
3033
use crate::steady_rx::RxDone;
3134
use crate::steady_tx::TxDone;
@@ -78,7 +81,10 @@ pub struct LocalMonitor<const RX_LEN: usize, const TX_LEN: usize> {
7881
/// Implementation of `LocalMonitor`.
7982
impl<const RXL: usize, const TXL: usize> LocalMonitor<RXL, TXL> {
8083

81-
84+
async fn simulated_behavior< const LEN: usize >(self, sims: [&dyn IntoSymRunner<Self>;LEN]
85+
) -> Result<(), Box<dyn Error>> {
86+
simulate_edge::simulated_behavior::<Self, LEN>(self,sims).await
87+
}
8288

8389
/// Marks the start of a high-activity profile period for telemetry monitoring.
8490
///
@@ -147,6 +153,10 @@ impl<const RXL: usize, const TXL: usize> LocalMonitor<RXL, TXL> {
147153

148154
impl<const RX_LEN: usize, const TX_LEN: usize> SteadyCommander for LocalMonitor<RX_LEN, TX_LEN> {
149155

156+
async fn simulated_behavior<const LEN: usize >(self, sims: [&dyn IntoSymRunner<Self>;LEN]
157+
) -> Result<(), Box<dyn Error>> {
158+
simulate_edge::simulated_behavior::<LocalMonitor<RX_LEN, TX_LEN>, LEN>(self,sims).await
159+
}
150160

151161
/// set loglevel for the application
152162
fn loglevel(&self, loglevel: crate::LogLevel) {
@@ -470,7 +480,10 @@ impl<const RX_LEN: usize, const TX_LEN: usize> SteadyCommander for LocalMonitor<
470480
match this.shared_try_send(msg) {
471481
Ok(done_count) => {
472482
if let Some(ref mut tel) = self.telemetry.send_tx {
473-
this.telemetry_inc(done_count, tel); } else { this.monitor_not(); };
483+
this.telemetry_inc(done_count, tel);
484+
} else {
485+
this.monitor_not();
486+
};
474487
SendOutcome::Success
475488
}
476489
Err(sensitive) => SendOutcome::Blocked(sensitive),

core/src/core_tx.rs

+20-2
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,9 @@ use ringbuf::producer::Producer;
1212
use crate::monitor_telemetry::SteadyTelemetrySend;
1313
use crate::steady_tx::TxDone;
1414
use crate::{steady_config, ActorIdentity, SendSaturation, StreamSessionMessage, StreamSimpleMessage, Tx, MONITOR_NOT};
15+
use crate::commander::SendOutcome;
1516
use crate::distributed::distributed_stream::{StreamItem, StreamTx};
16-
17+
use crate::graph_testing::SideChannelResponder;
1718

1819
pub trait TxCore {
1920
type MsgIn<'a>;
@@ -24,6 +25,7 @@ pub trait TxCore {
2425
fn shared_send_iter_until_full<'a,I: Iterator<Item = Self::MsgIn<'a>>>(&mut self, iter: I) -> usize;
2526
fn log_perodic(&mut self) -> bool;
2627

28+
fn one(&self) -> Self::MsgSize;
2729
fn telemetry_inc<const LEN:usize>(&mut self, done_count:TxDone , tel:& mut SteadyTelemetrySend<LEN>);
2830
fn monitor_not(&mut self);
2931
fn shared_capacity(&self) -> usize;
@@ -50,6 +52,11 @@ impl<T> TxCore for Tx<T> {
5052
type MsgOut = T;
5153
type MsgSize = usize;
5254

55+
56+
fn one(&self) -> Self::MsgSize {
57+
1
58+
}
59+
5360
fn log_perodic(&mut self) -> bool {
5461
if self.last_error_send.elapsed().as_secs() < steady_config::MAX_TELEMETRY_ERROR_RATE_SECONDS as u64 {
5562
false
@@ -251,7 +258,9 @@ impl TxCore for StreamTx<StreamSessionMessage> {
251258
type MsgOut = StreamSessionMessage;
252259
type MsgSize = (usize, usize);
253260

254-
261+
fn one(&self) -> Self::MsgSize {
262+
(1,self.payload_channel.capacity()/self.item_channel.capacity())
263+
}
255264
fn log_perodic(&mut self) -> bool {
256265
if self.item_channel.last_error_send.elapsed().as_secs() < steady_config::MAX_TELEMETRY_ERROR_RATE_SECONDS as u64 {
257266
false
@@ -569,6 +578,10 @@ impl TxCore for StreamTx<StreamSimpleMessage> {
569578
type MsgSize = (usize, usize);
570579

571580

581+
fn one(&self) -> Self::MsgSize {
582+
(1,self.payload_channel.capacity()/self.item_channel.capacity())
583+
}
584+
572585
fn log_perodic(&mut self) -> bool {
573586
if self.item_channel.last_error_send.elapsed().as_secs() < steady_config::MAX_TELEMETRY_ERROR_RATE_SECONDS as u64 {
574587
false
@@ -878,6 +891,11 @@ impl<T: TxCore> TxCore for MutexGuard<'_, T> {
878891
type MsgOut = <T as TxCore>::MsgOut;
879892
type MsgSize = <T as TxCore>::MsgSize;
880893

894+
895+
fn one(&self) -> Self::MsgSize {
896+
<T as TxCore>::one(& **self)
897+
}
898+
881899
fn log_perodic(&mut self) -> bool {
882900
<T as TxCore>::log_perodic(&mut **self)
883901
}

core/src/graph_testing.rs

+60-7
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
//! This enables simulation of real-world scenarios and makes SteadyState stand out from other solutions
55
//! by providing a robust way to test graphs.
66
7+
use crate::core_tx;
78
use std::any::Any;
89
use std::collections::HashMap;
910
use std::fmt::Debug;
@@ -24,6 +25,8 @@ use crate::channel_builder::{ChannelBacking, InternalReceiver, InternalSender};
2425
use ringbuf::traits::Observer;
2526
use crate::actor_builder::NodeTxRx;
2627
use crate::commander::SendOutcome;
28+
use crate::core_rx::RxCore;
29+
use crate::core_tx::TxCore;
2730

2831
/// Represents the result of a graph test, which can either be `Ok` with a value of type `K`
2932
/// or `Err` with a value of type `E`.
@@ -148,12 +151,66 @@ impl SideChannelHub {
148151
}
149152

150153
/// The `SideChannelResponder` struct provides a way to respond to messages from a side channel.
154+
#[derive(Clone)]
151155
pub struct SideChannelResponder {
152156
pub(crate) arc: Arc<Mutex<(SideChannel,Receiver<()>)>>,
153157
pub(crate) identity: ActorIdentity,
154158
}
155159

156160
impl SideChannelResponder {
161+
162+
pub async fn simulate_echo<'a, T: 'static, X: TxCore<MsgIn<'a> = T>, C: SteadyCommander>(&self, tx_core: &mut X, cmd: & Arc<Mutex<C>>) -> bool
163+
where <X as TxCore>::MsgOut: std::marker::Send, <X as TxCore>::MsgOut: Sync, <X as core_tx::TxCore>::MsgOut: 'static {
164+
if self.should_apply::<T>().await { //we got a message and now confirm we have room to send it
165+
166+
if tx_core.shared_wait_vacant_units(tx_core.one()).await {
167+
//we hold cmd just as long as it takes us to respond.
168+
let mut cmd_guard = cmd.lock().await;
169+
self.respond_with(move |message| {
170+
match cmd_guard.try_send(tx_core,*message.downcast::<T>().expect("error casting")) {
171+
SendOutcome::Success => {Box::new("ok".to_string())}
172+
SendOutcome::Blocked(msg) => {Box::new(msg)}
173+
}
174+
}).await
175+
} else {
176+
false
177+
}
178+
} else {
179+
false
180+
}
181+
}
182+
183+
pub async fn simulate_equals<T: Debug + Eq + 'static, X: RxCore<MsgOut = T>, C: SteadyCommander>(&self, rx_core: &mut X, cmd: & Arc<Mutex<C>>) -> bool where <X as RxCore>::MsgOut: std::fmt::Debug {
184+
if self.should_apply::<T>().await { //we have a message and now block until a unit arrives
185+
if rx_core.shared_wait_avail_units(1).await {
186+
//for testing we hold the cmd lock only while we check equals and respond
187+
let mut cmd_guard = cmd.lock().await;
188+
self.respond_with(move |message| {
189+
// Attempt to downcast to the expected message type
190+
let msg = *message.downcast::<T>().expect("error casting");
191+
match cmd_guard.try_take(rx_core) {
192+
Some(measured) => {
193+
if msg.eq(&measured) {
194+
Box::new("ok".to_string())
195+
} else {
196+
let failure = format!("no match {:?} {:?}", msg, measured);
197+
Box::new(failure)
198+
}
199+
}
200+
None => {
201+
Box::new("no message".to_string())
202+
}
203+
}
204+
}).await
205+
} else {
206+
false
207+
}
208+
} else {
209+
false
210+
}
211+
}
212+
213+
157214
/// Creates a new `SideChannelResponder`.
158215
///
159216
/// # Arguments
@@ -201,18 +258,15 @@ impl SideChannelResponder {
201258
///
202259
/// # Returns
203260
/// - `bool`: `true` if the operation succeeded; otherwise, `false`.
204-
pub async fn echo_responder<M: 'static + Clone + Debug + Send + Sync, C: SteadyCommander>(
261+
pub async fn echo_responder<M: 'static + Debug + Send + Sync, C: SteadyCommander>(
205262
&self,
206263
cmd: &mut C,
207264
target_tx: &mut Tx<M>,
208265
) -> bool {
209266
if self.should_apply::<M>().await {
210267
if cmd.wait_vacant(target_tx, 1).await {
211268
self.respond_with(move |message| {
212-
// Attempt to downcast to the expected message type
213-
let msg = message.downcast_ref::<M>().expect("error casting");
214-
// Try sending the message to the target channel
215-
match cmd.try_send(target_tx, msg.clone()) {
269+
match cmd.try_send(target_tx, *message.downcast::<M>().expect("error casting")) {
216270
SendOutcome::Success => {Box::new("ok".to_string())}
217271
SendOutcome::Blocked(msg) => {Box::new(msg)}
218272
}
@@ -274,7 +328,7 @@ impl SideChannelResponder {
274328
///
275329
/// # Returns
276330
/// - `bool`: `true` if the operation succeeded; otherwise, `false`.
277-
pub async fn equals_responder<M: 'static + Clone + Debug + Send + Eq, C: SteadyCommander>(
331+
pub async fn equals_responder<M: 'static + Debug + Send + Eq, C: SteadyCommander>(
278332
&self,
279333
cmd: &mut C,
280334
source_rx: &mut Rx<M>,
@@ -284,7 +338,6 @@ impl SideChannelResponder {
284338
self.respond_with(move |message| {
285339
// Attempt to downcast to the expected message type
286340
let msg = *message.downcast::<M>().expect("error casting");
287-
288341
match cmd.try_take(source_rx) {
289342
Some(measured) => {
290343
if measured.eq(&msg) {

core/src/loop_driver.rs

+1-3
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,7 @@
1-
use std::pin::{pin, Pin};
1+
use std::pin::{Pin};
22
use futures::future::Future;
3-
use futures::future::join_all;
43
use futures::future::FutureExt;
54
use futures::select;
6-
use futures::stream::FuturesUnordered;
75
use futures::StreamExt;
86
use futures::pin_mut;
97

0 commit comments

Comments
 (0)