diff --git a/README.md b/README.md index 4472627de..3dcdf1fec 100644 --- a/README.md +++ b/README.md @@ -102,28 +102,26 @@ A low level streaming interface (similar to informers) that presents `Applied`, ```rust let api = Api::::namespaced(client, "default"); -let watcher = watcher(api, ListParams::default()); +let mut stream = watcher(api, ListParams::default()).applied_objects(); ``` This now gives a continual stream of events and you do not need to care about the watch having to restart, or connections dropping. ```rust -let mut apply_events = try_flatten_applied(watcher).boxed_local(); -while let Some(event) = apply_events.try_next().await? { +while let Some(event) = stream.try_next().await? { println!("Applied: {}", event.name()); } ``` -NB: the plain stream items a `watcher` returns are different from `WatchEvent`. If you are following along to "see what changed", you should flatten it with one of the utilities like `try_flatten_applied` or `try_flatten_touched`. +NB: the plain items in a `watcher` stream are different from `WatchEvent`. If you are following along to "see what changed", you should flatten it with one of the utilities from `WatchStreamExt`, such as `applied_objects`. ## Reflectors A `reflector` is a `watcher` with `Store` on `K`. It acts on all the `Event` exposed by `watcher` to ensure that the state in the `Store` is as accurate as possible. ```rust -let nodes: Api = Api::namespaced(client, &namespace); -let lp = ListParams::default() - .labels("beta.kubernetes.io/instance-type=m4.2xlarge"); +let nodes: Api = Api::default_namespaced(client); +let lp = ListParams::default().labels("kubernetes.io/arch=amd64"); let store = reflector::store::Writer::::default(); let reader = store.as_reader(); let rf = reflector(store, watcher(nodes, lp)); diff --git a/examples/configmap_reflector.rs b/examples/configmap_reflector.rs index b00c9a66e..fbf852cb1 100644 --- a/examples/configmap_reflector.rs +++ b/examples/configmap_reflector.rs @@ -32,7 +32,7 @@ async fn main() -> anyhow::Result<()> { spawn_periodic_reader(reader); // read from a reader in the background - let mut applied_events = rf.watch_applies().boxed_local(); + let mut applied_events = rf.applied_objects().boxed_local(); while let Some(event) = applied_events.try_next().await? { info!("saw {}", event.name()) } diff --git a/examples/crd_reflector.rs b/examples/crd_reflector.rs index e89727bf2..072f79d38 100644 --- a/examples/crd_reflector.rs +++ b/examples/crd_reflector.rs @@ -48,7 +48,7 @@ async fn main() -> anyhow::Result<()> { info!("Current crds: {:?}", crds); } }); - let mut rfa = rf.watch_applies().boxed(); + let mut rfa = rf.applied_objects().boxed(); while let Some(event) = rfa.try_next().await? { info!("saw {}", event.name()); } diff --git a/examples/dynamic_watcher.rs b/examples/dynamic_watcher.rs index 3fb493eb5..130cb2157 100644 --- a/examples/dynamic_watcher.rs +++ b/examples/dynamic_watcher.rs @@ -28,7 +28,7 @@ async fn main() -> anyhow::Result<()> { let api = Api::::all_with(client, &ar); // Fully compatible with kube-runtime - let mut items = watcher(api, ListParams::default()).watch_applies().boxed(); + let mut items = watcher(api, ListParams::default()).applied_objects().boxed(); while let Some(p) = items.try_next().await? { if caps.scope == Scope::Cluster { info!("saw {}", p.name()); diff --git a/examples/event_watcher.rs b/examples/event_watcher.rs index 57714abed..18e037cba 100644 --- a/examples/event_watcher.rs +++ b/examples/event_watcher.rs @@ -15,7 +15,7 @@ async fn main() -> anyhow::Result<()> { let events: Api = Api::all(client); let lp = ListParams::default(); - let ew = watcher(events, lp).watch_applies(); + let ew = watcher(events, lp).applied_objects(); pin_mut!(ew); while let Some(event) = ew.try_next().await? { diff --git a/examples/kubectl.rs b/examples/kubectl.rs index be2ac077b..2af7a6ac1 100644 --- a/examples/kubectl.rs +++ b/examples/kubectl.rs @@ -118,7 +118,7 @@ impl App { lp = lp.fields(&format!("metadata.name={}", n)); } // present a dumb table for it for now. kubectl does not do this anymore. - let mut stream = watcher(api, lp).watch_applies().boxed(); + let mut stream = watcher(api, lp).applied_objects().boxed(); println!("{0: anyhow::Result<()> { // select on applied events from all watchers let mut combo_stream = stream::select_all(vec![ - dep_watcher.watch_applies().map_ok(Watched::Deploy).boxed(), - cm_watcher.watch_applies().map_ok(Watched::Config).boxed(), - sec_watcher.watch_applies().map_ok(Watched::Secret).boxed(), + dep_watcher.applied_objects().map_ok(Watched::Deploy).boxed(), + cm_watcher.applied_objects().map_ok(Watched::Config).boxed(), + sec_watcher.applied_objects().map_ok(Watched::Secret).boxed(), ]); // SelectAll Stream elements must have the same Item, so all packed in this: #[allow(clippy::large_enum_variant)] diff --git a/examples/node_reflector.rs b/examples/node_reflector.rs index 21fa4b850..2822e19a8 100644 --- a/examples/node_reflector.rs +++ b/examples/node_reflector.rs @@ -31,7 +31,7 @@ async fn main() -> anyhow::Result<()> { }); // Drain and log applied events from the reflector - let mut rfa = rf.watch_applies().boxed(); + let mut rfa = rf.applied_objects().boxed(); while let Some(event) = rfa.try_next().await? { info!("saw {}", event.name()); } diff --git a/examples/node_watcher.rs b/examples/node_watcher.rs index 10e25be8c..971e127aa 100644 --- a/examples/node_watcher.rs +++ b/examples/node_watcher.rs @@ -18,7 +18,7 @@ async fn main() -> anyhow::Result<()> { let lp = ListParams::default().labels("beta.kubernetes.io/arch=amd64"); let obs = watcher(nodes, lp) .backoff(ExponentialBackoff::default()) - .watch_applies(); + .applied_objects(); pin_mut!(obs); while let Some(n) = obs.try_next().await? { diff --git a/examples/pod_watcher.rs b/examples/pod_watcher.rs index a5023690b..d47c17dfe 100644 --- a/examples/pod_watcher.rs +++ b/examples/pod_watcher.rs @@ -14,7 +14,7 @@ async fn main() -> anyhow::Result<()> { let api = Api::::default_namespaced(client); watcher(api, ListParams::default()) - .watch_applies() + .applied_objects() .try_for_each(|p| async move { info!("saw {}", p.name()); if let Some(unready_reason) = pod_unready(&p) { diff --git a/examples/secret_reflector.rs b/examples/secret_reflector.rs index 13039c6fa..9ebd16482 100644 --- a/examples/secret_reflector.rs +++ b/examples/secret_reflector.rs @@ -60,7 +60,7 @@ async fn main() -> anyhow::Result<()> { let rf = reflector(store, watcher(secrets, lp)); spawn_periodic_reader(reader); // read from a reader in the background - rf.watch_applies() + rf.applied_objects() .try_for_each(|s| async move { info!("saw: {}", s.name()); Ok(()) diff --git a/kube-runtime/src/controller/mod.rs b/kube-runtime/src/controller/mod.rs index 995a0077e..176c68d4f 100644 --- a/kube-runtime/src/controller/mod.rs +++ b/kube-runtime/src/controller/mod.rs @@ -462,7 +462,7 @@ where let reader = writer.as_reader(); let mut trigger_selector = stream::SelectAll::new(); let self_watcher = trigger_self( - reflector(writer, watcher(owned_api, lp)).watch_applies(), + reflector(writer, watcher(owned_api, lp)).applied_objects(), dyntype.clone(), ) .boxed(); @@ -532,7 +532,7 @@ where where Child::DynamicType: Debug + Eq + Hash + Clone, { - let child_watcher = trigger_owners(watcher(api, lp).watch_touches(), self.dyntype.clone(), dyntype); + let child_watcher = trigger_owners(watcher(api, lp).touched_objects(), self.dyntype.clone(), dyntype); self.trigger_selector.push(child_watcher.boxed()); self } @@ -583,7 +583,7 @@ where I::IntoIter: Send, Other::DynamicType: Clone, { - let other_watcher = trigger_with(watcher(api, lp).watch_touches(), move |obj| { + let other_watcher = trigger_with(watcher(api, lp).touched_objects(), move |obj| { let watched_obj_ref = ObjectRef::from_obj_with(&obj, dyntype.clone()).erase(); mapper(obj) .into_iter() diff --git a/kube-runtime/src/utils/event_flatten.rs b/kube-runtime/src/utils/event_flatten.rs index 51914c90c..b4834662b 100644 --- a/kube-runtime/src/utils/event_flatten.rs +++ b/kube-runtime/src/utils/event_flatten.rs @@ -7,7 +7,7 @@ use futures::{ready, Stream, TryStream}; use pin_project::pin_project; #[pin_project] -/// Stream returned by the [`watch_applies`](super::WatchStreamExt::watch_applies) and [`watch_touches`](super::WatchStreamExt::watch_touches) method. +/// Stream returned by the [`applied_objects`](super::WatchStreamExt::applied_objects) and [`touched_objects`](super::WatchStreamExt::touched_objects) method. #[must_use = "streams do nothing unless polled"] pub struct EventFlatten { #[pin] diff --git a/kube-runtime/src/utils/mod.rs b/kube-runtime/src/utils/mod.rs index 5c3e52a3a..ff0bf03a0 100644 --- a/kube-runtime/src/utils/mod.rs +++ b/kube-runtime/src/utils/mod.rs @@ -6,6 +6,7 @@ mod stream_backoff; mod watch_ext; pub use backoff_reset_timer::ResetTimerBackoff; +pub use event_flatten::EventFlatten; pub use stream_backoff::StreamBackoff; pub use watch_ext::WatchStreamExt; @@ -28,7 +29,7 @@ use tokio::{runtime::Handle, task::JoinHandle}; /// Flattens each item in the list following the rules of [`watcher::Event::into_iter_applied`]. #[deprecated( since = "0.72.0", - note = "fn replaced with the WatchStreamExt::watch_applies which can be chained onto watcher. Add `use kube::runtime::WatchStreamExt;` and call `stream.watch_applies()` instead. This function will be removed in 0.75.0." + note = "fn replaced with the WatchStreamExt::applied_objects which can be chained onto watcher. Add `use kube::runtime::WatchStreamExt;` and call `stream.applied_objects()` instead. This function will be removed in 0.75.0." )] pub fn try_flatten_applied>>( stream: S, @@ -41,7 +42,7 @@ pub fn try_flatten_applied>>( /// Flattens each item in the list following the rules of [`watcher::Event::into_iter_touched`]. #[deprecated( since = "0.72.0", - note = "fn replaced with the WatchStreamExt::watch_touches which can be chained onto watcher. Add `use kube::runtime::WatchStreamExt;` and call `stream.watch_touches()` instead. This function will be removed in 0.75.0." + note = "fn replaced with the WatchStreamExt::touched_objects which can be chained onto watcher. Add `use kube::runtime::WatchStreamExt;` and call `stream.touched_objects()` instead. This function will be removed in 0.75.0." )] pub fn try_flatten_touched>>( stream: S, diff --git a/kube-runtime/src/utils/watch_ext.rs b/kube-runtime/src/utils/watch_ext.rs index 4a43f33fc..54a6d45bc 100644 --- a/kube-runtime/src/utils/watch_ext.rs +++ b/kube-runtime/src/utils/watch_ext.rs @@ -6,7 +6,7 @@ use backoff::backoff::Backoff; use futures::{Stream, TryStream}; -/// Extension trait for streams returned by [`watcher`] or [`reflector`] +/// Extension trait for streams returned by [`watcher`](watcher()) or [`reflector`](crate::reflector::reflector) pub trait WatchStreamExt: Stream { /// Apply a [`Backoff`] policy to a [`Stream`] using [`StreamBackoff`] fn backoff(self, b: B) -> StreamBackoff @@ -17,22 +17,20 @@ pub trait WatchStreamExt: Stream { StreamBackoff::new(self, b) } - /// Flatten a [`watcher`] stream into a stream of applied objects + /// Flatten a [`watcher()`] stream into a stream of applied objects /// /// All Added/Modified events are passed through, and critical errors bubble up. - /// This is functionally equivalent to calling [`try_flatten_applied`] on a [`watcher`]. - fn watch_applies(self) -> EventFlatten + fn applied_objects(self) -> EventFlatten where Self: Stream, watcher::Error>> + Sized, { EventFlatten::new(self, false) } - /// Flatten a [`watcher`] stream into a stream of touched objects + /// Flatten a [`watcher()`] stream into a stream of touched objects /// /// All Added/Modified/Deleted events are passed through, and critical errors bubble up. - /// This is functionally equivalent to calling [`try_flatten_touched`] on a [`watcher`]. - fn watch_touches(self) -> EventFlatten + fn touched_objects(self) -> EventFlatten where Self: Stream, watcher::Error>> + Sized, { diff --git a/kube-runtime/src/watcher.rs b/kube-runtime/src/watcher.rs index 9b4e555ee..9abf81d06 100644 --- a/kube-runtime/src/watcher.rs +++ b/kube-runtime/src/watcher.rs @@ -191,13 +191,13 @@ async fn step( /// [`try_for_each`](futures::TryStreamExt::try_for_each) and [`try_concat`](futures::TryStreamExt::try_concat)) /// will terminate eagerly as soon as they receive an [`Err`]. /// -/// This is intended to provide a safe and atomic input interface for a state store like a [`reflector`], -/// direct users may want to flatten composite events with [`try_flatten_applied`]: +/// This is intended to provide a safe and atomic input interface for a state store like a [`reflector`]. +/// Direct users may want to flatten composite events via [`WatchStreamExt`]: /// /// ```no_run /// use kube::{ /// api::{Api, ListParams, ResourceExt}, Client, -/// runtime::{utils::try_flatten_applied, watcher} +/// runtime::{watcher, WatchStreamExt} /// }; /// use k8s_openapi::api::core::v1::Pod; /// use futures::{StreamExt, TryStreamExt}; @@ -206,8 +206,7 @@ async fn step( /// let client = Client::try_default().await.unwrap(); /// let pods: Api = Api::namespaced(client, "apps"); /// -/// let watcher = watcher(pods, ListParams::default()); -/// try_flatten_applied(watcher) +/// watcher(pods, ListParams::default()).applied_objects() /// .try_for_each(|p| async move { /// println!("Applied: {}", p.name()); /// Ok(()) @@ -216,7 +215,7 @@ async fn step( /// Ok(()) /// } /// ``` -/// [`try_flatten_applied`]: super::utils::try_flatten_applied +/// [`WatchStreamExt`]: super::WatchStreamExt /// [`reflector`]: super::reflector::reflector /// [`Api::watch`]: kube_client::Api::watch /// diff --git a/kube/src/lib.rs b/kube/src/lib.rs index 31d3cc80b..faa3917f3 100644 --- a/kube/src/lib.rs +++ b/kube/src/lib.rs @@ -55,7 +55,7 @@ //! api::{Api, DeleteParams, ListParams, PatchParams, Patch, ResourceExt}, //! core::CustomResourceExt, //! Client, CustomResource, -//! runtime::{watcher, utils::try_flatten_applied, wait::{conditions, await_condition}}, +//! runtime::{watcher, WatchStreamExt, wait::{conditions, await_condition}}, //! }; //! //! // Our custom resource @@ -88,7 +88,7 @@ //! // Watch for changes to foos in the configured namespace //! let foos: Api = Api::default_namespaced(client.clone()); //! let lp = ListParams::default(); -//! let mut apply_stream = try_flatten_applied(watcher(foos, lp)).boxed(); +//! let mut apply_stream = watcher(foos, lp).applied_objects().boxed(); //! while let Some(f) = apply_stream.try_next().await? { //! println!("saw apply to {}", f.name()); //! }