Skip to content

Commit

Permalink
Rename unreleased watcher ext methods slightly (kube-rs#906)
Browse files Browse the repository at this point in the history
* Rename unreleased watcher ext methods slightly

as suggested by kube-rs#899 (review)

Signed-off-by: clux <[email protected]>

* fix forgotten references to old method in docs

Signed-off-by: clux <[email protected]>

* accidental private

Signed-off-by: clux <[email protected]>

* ugh fmt reorder

Signed-off-by: clux <[email protected]>
  • Loading branch information
clux authored May 11, 2022
1 parent beeeb95 commit d0bf02f
Show file tree
Hide file tree
Showing 17 changed files with 36 additions and 40 deletions.
12 changes: 5 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -102,28 +102,26 @@ A low level streaming interface (similar to informers) that presents `Applied`,

```rust
let api = Api::<Pod>::namespaced(client, "default");
let watcher = watcher(api, ListParams::default());
let mut stream = watcher(api, ListParams::default()).applied_objects();
```

This now gives a continual stream of events and you do not need to care about the watch having to restart, or connections dropping.

```rust
let mut apply_events = try_flatten_applied(watcher).boxed_local();
while let Some(event) = apply_events.try_next().await? {
while let Some(event) = stream.try_next().await? {
println!("Applied: {}", event.name());
}
```

NB: the plain stream items a `watcher` returns are different from `WatchEvent`. If you are following along to "see what changed", you should flatten it with one of the utilities like `try_flatten_applied` or `try_flatten_touched`.
NB: the plain items in a `watcher` stream are different from `WatchEvent`. If you are following along to "see what changed", you should flatten it with one of the utilities from `WatchStreamExt`, such as `applied_objects`.

## Reflectors

A `reflector` is a `watcher` with `Store` on `K`. It acts on all the `Event<K>` exposed by `watcher` to ensure that the state in the `Store` is as accurate as possible.

```rust
let nodes: Api<Node> = Api::namespaced(client, &namespace);
let lp = ListParams::default()
.labels("beta.kubernetes.io/instance-type=m4.2xlarge");
let nodes: Api<Node> = Api::default_namespaced(client);
let lp = ListParams::default().labels("kubernetes.io/arch=amd64");
let store = reflector::store::Writer::<Node>::default();
let reader = store.as_reader();
let rf = reflector(store, watcher(nodes, lp));
Expand Down
2 changes: 1 addition & 1 deletion examples/configmap_reflector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ async fn main() -> anyhow::Result<()> {

spawn_periodic_reader(reader); // read from a reader in the background

let mut applied_events = rf.watch_applies().boxed_local();
let mut applied_events = rf.applied_objects().boxed_local();
while let Some(event) = applied_events.try_next().await? {
info!("saw {}", event.name())
}
Expand Down
2 changes: 1 addition & 1 deletion examples/crd_reflector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ async fn main() -> anyhow::Result<()> {
info!("Current crds: {:?}", crds);
}
});
let mut rfa = rf.watch_applies().boxed();
let mut rfa = rf.applied_objects().boxed();
while let Some(event) = rfa.try_next().await? {
info!("saw {}", event.name());
}
Expand Down
2 changes: 1 addition & 1 deletion examples/dynamic_watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ async fn main() -> anyhow::Result<()> {
let api = Api::<DynamicObject>::all_with(client, &ar);

// Fully compatible with kube-runtime
let mut items = watcher(api, ListParams::default()).watch_applies().boxed();
let mut items = watcher(api, ListParams::default()).applied_objects().boxed();
while let Some(p) = items.try_next().await? {
if caps.scope == Scope::Cluster {
info!("saw {}", p.name());
Expand Down
2 changes: 1 addition & 1 deletion examples/event_watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ async fn main() -> anyhow::Result<()> {
let events: Api<Event> = Api::all(client);
let lp = ListParams::default();

let ew = watcher(events, lp).watch_applies();
let ew = watcher(events, lp).applied_objects();

pin_mut!(ew);
while let Some(event) = ew.try_next().await? {
Expand Down
2 changes: 1 addition & 1 deletion examples/kubectl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ impl App {
lp = lp.fields(&format!("metadata.name={}", n));
}
// present a dumb table for it for now. kubectl does not do this anymore.
let mut stream = watcher(api, lp).watch_applies().boxed();
let mut stream = watcher(api, lp).applied_objects().boxed();
println!("{0:<width$} {1:<20}", "NAME", "AGE", width = 63);
while let Some(inst) = stream.try_next().await? {
let age = format_creation_since(inst.creation_timestamp());
Expand Down
6 changes: 3 additions & 3 deletions examples/multi_watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ async fn main() -> anyhow::Result<()> {

// select on applied events from all watchers
let mut combo_stream = stream::select_all(vec![
dep_watcher.watch_applies().map_ok(Watched::Deploy).boxed(),
cm_watcher.watch_applies().map_ok(Watched::Config).boxed(),
sec_watcher.watch_applies().map_ok(Watched::Secret).boxed(),
dep_watcher.applied_objects().map_ok(Watched::Deploy).boxed(),
cm_watcher.applied_objects().map_ok(Watched::Config).boxed(),
sec_watcher.applied_objects().map_ok(Watched::Secret).boxed(),
]);
// SelectAll Stream elements must have the same Item, so all packed in this:
#[allow(clippy::large_enum_variant)]
Expand Down
2 changes: 1 addition & 1 deletion examples/node_reflector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ async fn main() -> anyhow::Result<()> {
});

// Drain and log applied events from the reflector
let mut rfa = rf.watch_applies().boxed();
let mut rfa = rf.applied_objects().boxed();
while let Some(event) = rfa.try_next().await? {
info!("saw {}", event.name());
}
Expand Down
2 changes: 1 addition & 1 deletion examples/node_watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ async fn main() -> anyhow::Result<()> {
let lp = ListParams::default().labels("beta.kubernetes.io/arch=amd64");
let obs = watcher(nodes, lp)
.backoff(ExponentialBackoff::default())
.watch_applies();
.applied_objects();

pin_mut!(obs);
while let Some(n) = obs.try_next().await? {
Expand Down
2 changes: 1 addition & 1 deletion examples/pod_watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ async fn main() -> anyhow::Result<()> {
let api = Api::<Pod>::default_namespaced(client);

watcher(api, ListParams::default())
.watch_applies()
.applied_objects()
.try_for_each(|p| async move {
info!("saw {}", p.name());
if let Some(unready_reason) = pod_unready(&p) {
Expand Down
2 changes: 1 addition & 1 deletion examples/secret_reflector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ async fn main() -> anyhow::Result<()> {
let rf = reflector(store, watcher(secrets, lp));
spawn_periodic_reader(reader); // read from a reader in the background

rf.watch_applies()
rf.applied_objects()
.try_for_each(|s| async move {
info!("saw: {}", s.name());
Ok(())
Expand Down
6 changes: 3 additions & 3 deletions kube-runtime/src/controller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,7 @@ where
let reader = writer.as_reader();
let mut trigger_selector = stream::SelectAll::new();
let self_watcher = trigger_self(
reflector(writer, watcher(owned_api, lp)).watch_applies(),
reflector(writer, watcher(owned_api, lp)).applied_objects(),
dyntype.clone(),
)
.boxed();
Expand Down Expand Up @@ -532,7 +532,7 @@ where
where
Child::DynamicType: Debug + Eq + Hash + Clone,
{
let child_watcher = trigger_owners(watcher(api, lp).watch_touches(), self.dyntype.clone(), dyntype);
let child_watcher = trigger_owners(watcher(api, lp).touched_objects(), self.dyntype.clone(), dyntype);
self.trigger_selector.push(child_watcher.boxed());
self
}
Expand Down Expand Up @@ -583,7 +583,7 @@ where
I::IntoIter: Send,
Other::DynamicType: Clone,
{
let other_watcher = trigger_with(watcher(api, lp).watch_touches(), move |obj| {
let other_watcher = trigger_with(watcher(api, lp).touched_objects(), move |obj| {
let watched_obj_ref = ObjectRef::from_obj_with(&obj, dyntype.clone()).erase();
mapper(obj)
.into_iter()
Expand Down
2 changes: 1 addition & 1 deletion kube-runtime/src/utils/event_flatten.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use futures::{ready, Stream, TryStream};
use pin_project::pin_project;

#[pin_project]
/// Stream returned by the [`watch_applies`](super::WatchStreamExt::watch_applies) and [`watch_touches`](super::WatchStreamExt::watch_touches) method.
/// Stream returned by the [`applied_objects`](super::WatchStreamExt::applied_objects) and [`touched_objects`](super::WatchStreamExt::touched_objects) method.
#[must_use = "streams do nothing unless polled"]
pub struct EventFlatten<St, K> {
#[pin]
Expand Down
5 changes: 3 additions & 2 deletions kube-runtime/src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ mod stream_backoff;
mod watch_ext;

pub use backoff_reset_timer::ResetTimerBackoff;
pub use event_flatten::EventFlatten;
pub use stream_backoff::StreamBackoff;
pub use watch_ext::WatchStreamExt;

Expand All @@ -28,7 +29,7 @@ use tokio::{runtime::Handle, task::JoinHandle};
/// Flattens each item in the list following the rules of [`watcher::Event::into_iter_applied`].
#[deprecated(
since = "0.72.0",
note = "fn replaced with the WatchStreamExt::watch_applies which can be chained onto watcher. Add `use kube::runtime::WatchStreamExt;` and call `stream.watch_applies()` instead. This function will be removed in 0.75.0."
note = "fn replaced with the WatchStreamExt::applied_objects which can be chained onto watcher. Add `use kube::runtime::WatchStreamExt;` and call `stream.applied_objects()` instead. This function will be removed in 0.75.0."
)]
pub fn try_flatten_applied<K, S: TryStream<Ok = watcher::Event<K>>>(
stream: S,
Expand All @@ -41,7 +42,7 @@ pub fn try_flatten_applied<K, S: TryStream<Ok = watcher::Event<K>>>(
/// Flattens each item in the list following the rules of [`watcher::Event::into_iter_touched`].
#[deprecated(
since = "0.72.0",
note = "fn replaced with the WatchStreamExt::watch_touches which can be chained onto watcher. Add `use kube::runtime::WatchStreamExt;` and call `stream.watch_touches()` instead. This function will be removed in 0.75.0."
note = "fn replaced with the WatchStreamExt::touched_objects which can be chained onto watcher. Add `use kube::runtime::WatchStreamExt;` and call `stream.touched_objects()` instead. This function will be removed in 0.75.0."
)]
pub fn try_flatten_touched<K, S: TryStream<Ok = watcher::Event<K>>>(
stream: S,
Expand Down
12 changes: 5 additions & 7 deletions kube-runtime/src/utils/watch_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use backoff::backoff::Backoff;

use futures::{Stream, TryStream};

/// Extension trait for streams returned by [`watcher`] or [`reflector`]
/// Extension trait for streams returned by [`watcher`](watcher()) or [`reflector`](crate::reflector::reflector)
pub trait WatchStreamExt: Stream {
/// Apply a [`Backoff`] policy to a [`Stream`] using [`StreamBackoff`]
fn backoff<B>(self, b: B) -> StreamBackoff<Self, B>
Expand All @@ -17,22 +17,20 @@ pub trait WatchStreamExt: Stream {
StreamBackoff::new(self, b)
}

/// Flatten a [`watcher`] stream into a stream of applied objects
/// Flatten a [`watcher()`] stream into a stream of applied objects
///
/// All Added/Modified events are passed through, and critical errors bubble up.
/// This is functionally equivalent to calling [`try_flatten_applied`] on a [`watcher`].
fn watch_applies<K>(self) -> EventFlatten<Self, K>
fn applied_objects<K>(self) -> EventFlatten<Self, K>
where
Self: Stream<Item = Result<watcher::Event<K>, watcher::Error>> + Sized,
{
EventFlatten::new(self, false)
}

/// Flatten a [`watcher`] stream into a stream of touched objects
/// Flatten a [`watcher()`] stream into a stream of touched objects
///
/// All Added/Modified/Deleted events are passed through, and critical errors bubble up.
/// This is functionally equivalent to calling [`try_flatten_touched`] on a [`watcher`].
fn watch_touches<K>(self) -> EventFlatten<Self, K>
fn touched_objects<K>(self) -> EventFlatten<Self, K>
where
Self: Stream<Item = Result<watcher::Event<K>, watcher::Error>> + Sized,
{
Expand Down
11 changes: 5 additions & 6 deletions kube-runtime/src/watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,13 +191,13 @@ async fn step<K: Resource + Clone + DeserializeOwned + Debug + Send + 'static>(
/// [`try_for_each`](futures::TryStreamExt::try_for_each) and [`try_concat`](futures::TryStreamExt::try_concat))
/// will terminate eagerly as soon as they receive an [`Err`].
///
/// This is intended to provide a safe and atomic input interface for a state store like a [`reflector`],
/// direct users may want to flatten composite events with [`try_flatten_applied`]:
/// This is intended to provide a safe and atomic input interface for a state store like a [`reflector`].
/// Direct users may want to flatten composite events via [`WatchStreamExt`]:
///
/// ```no_run
/// use kube::{
/// api::{Api, ListParams, ResourceExt}, Client,
/// runtime::{utils::try_flatten_applied, watcher}
/// runtime::{watcher, WatchStreamExt}
/// };
/// use k8s_openapi::api::core::v1::Pod;
/// use futures::{StreamExt, TryStreamExt};
Expand All @@ -206,8 +206,7 @@ async fn step<K: Resource + Clone + DeserializeOwned + Debug + Send + 'static>(
/// let client = Client::try_default().await.unwrap();
/// let pods: Api<Pod> = Api::namespaced(client, "apps");
///
/// let watcher = watcher(pods, ListParams::default());
/// try_flatten_applied(watcher)
/// watcher(pods, ListParams::default()).applied_objects()
/// .try_for_each(|p| async move {
/// println!("Applied: {}", p.name());
/// Ok(())
Expand All @@ -216,7 +215,7 @@ async fn step<K: Resource + Clone + DeserializeOwned + Debug + Send + 'static>(
/// Ok(())
/// }
/// ```
/// [`try_flatten_applied`]: super::utils::try_flatten_applied
/// [`WatchStreamExt`]: super::WatchStreamExt
/// [`reflector`]: super::reflector::reflector
/// [`Api::watch`]: kube_client::Api::watch
///
Expand Down
4 changes: 2 additions & 2 deletions kube/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
//! api::{Api, DeleteParams, ListParams, PatchParams, Patch, ResourceExt},
//! core::CustomResourceExt,
//! Client, CustomResource,
//! runtime::{watcher, utils::try_flatten_applied, wait::{conditions, await_condition}},
//! runtime::{watcher, WatchStreamExt, wait::{conditions, await_condition}},
//! };
//!
//! // Our custom resource
Expand Down Expand Up @@ -88,7 +88,7 @@
//! // Watch for changes to foos in the configured namespace
//! let foos: Api<Foo> = Api::default_namespaced(client.clone());
//! let lp = ListParams::default();
//! let mut apply_stream = try_flatten_applied(watcher(foos, lp)).boxed();
//! let mut apply_stream = watcher(foos, lp).applied_objects().boxed();
//! while let Some(f) = apply_stream.try_next().await? {
//! println!("saw apply to {}", f.name());
//! }
Expand Down

0 comments on commit d0bf02f

Please sign in to comment.