From 21de518606fda97037ee940bff9831a086eed9cd Mon Sep 17 00:00:00 2001 From: ben Date: Tue, 11 Jan 2022 14:24:24 -0700 Subject: [PATCH] Tweak how manager is run --- src/main.rs | 4 ++-- src/manager.rs | 16 ++++++++++------ tests/common.rs | 6 ++---- 3 files changed, 14 insertions(+), 12 deletions(-) diff --git a/src/main.rs b/src/main.rs index 79d9bfc..f2c251e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -13,8 +13,8 @@ async fn main() -> kustd::Result<()> { tracing::subscriber::set_global_default(subscriber) .expect("setting default subscriber failed"); - let (_manager, future) = Manager::new().await; - future.await; + let manager = Manager::new().await; + manager.start().await; Ok(()) } diff --git a/src/manager.rs b/src/manager.rs index 6907a7f..29427fb 100644 --- a/src/manager.rs +++ b/src/manager.rs @@ -2,11 +2,11 @@ use std::iter::FromIterator; use std::fmt::Debug; use std::sync::Arc; use std::time::Duration; -use std::future::Future; use either::Either; -use futures::{future::join3, FutureExt, StreamExt}; +use futures::{join, FutureExt, StreamExt}; use tracing::{debug, error, info, instrument, warn}; use serde::{de::DeserializeOwned, Serialize}; +use tokio::task; use tokio::sync::RwLock; use async_broadcast::broadcast; use chrono::prelude::*; @@ -58,12 +58,16 @@ pub struct Manager { } impl Manager { - pub async fn new() -> (Self, impl Future) { - let client = Client::try_default().await.expect("Failed to create client"); + pub async fn new() -> Self { let state = Arc::new(RwLock::new(State::new())); + Self { state } + } + + pub async fn start(self) { + let client = Client::try_default().await.expect("Failed to create client"); let context = Context::new(Data { client: client.clone(), - state: state.clone(), + state: self.state, }); let (ns_watcher_tx, ns_watcher_rx) = broadcast(2); @@ -80,7 +84,7 @@ impl Manager { let secret_drainer = Self::create_drainer::(context.clone(), ns_watcher_rx.clone()); let configmap_drainer = Self::create_drainer::(context.clone(), ns_watcher_rx.clone()); - (Self { state }, join3(secret_drainer, configmap_drainer, ns_watcher)) + join!(task::spawn(ns_watcher), secret_drainer, configmap_drainer); } async fn create_drainer(ctx: Context, ns_watcher_rx: async_broadcast::Receiver<()>) -> () diff --git a/tests/common.rs b/tests/common.rs index 9d4c675..43821bd 100644 --- a/tests/common.rs +++ b/tests/common.rs @@ -21,7 +21,6 @@ use kustd::{Manager, syncable::Syncable}; pub struct K8sContext { client: Client, - manager: Manager, random: String, // ApiReource, namespace, name to_cleanup: Vec<(ApiResource, Option, String)>, @@ -29,12 +28,11 @@ pub struct K8sContext { impl K8sContext { pub async fn new() -> Self { - let (manager, future) = Manager::new().await; - tokio::task::spawn(future); + let manager = Manager::new().await; + tokio::task::spawn(manager.start()); Self { client: get_client().await, - manager, random: Self::gen_random(6), to_cleanup: Vec::new(), }