Skip to content

Commit

Permalink
Tweak how manager is run
Browse files Browse the repository at this point in the history
  • Loading branch information
ben committed Jan 11, 2022
1 parent 41004a1 commit 21de518
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 12 deletions.
4 changes: 2 additions & 2 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ async fn main() -> kustd::Result<()> {
tracing::subscriber::set_global_default(subscriber)
.expect("setting default subscriber failed");

let (_manager, future) = Manager::new().await;
future.await;
let manager = Manager::new().await;
manager.start().await;

Ok(())
}
16 changes: 10 additions & 6 deletions src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ use std::iter::FromIterator;
use std::fmt::Debug;
use std::sync::Arc;
use std::time::Duration;
use std::future::Future;
use either::Either;
use futures::{future::join3, FutureExt, StreamExt};
use futures::{join, FutureExt, StreamExt};
use tracing::{debug, error, info, instrument, warn};
use serde::{de::DeserializeOwned, Serialize};
use tokio::task;
use tokio::sync::RwLock;
use async_broadcast::broadcast;
use chrono::prelude::*;
Expand Down Expand Up @@ -58,12 +58,16 @@ pub struct Manager {
}

impl Manager {
pub async fn new() -> (Self, impl Future<Output = ((),(), ())>) {
let client = Client::try_default().await.expect("Failed to create client");
pub async fn new() -> Self {
let state = Arc::new(RwLock::new(State::new()));
Self { state }
}

pub async fn start(self) {
let client = Client::try_default().await.expect("Failed to create client");
let context = Context::new(Data {
client: client.clone(),
state: state.clone(),
state: self.state,
});

let (ns_watcher_tx, ns_watcher_rx) = broadcast(2);
Expand All @@ -80,7 +84,7 @@ impl Manager {
let secret_drainer = Self::create_drainer::<Secret>(context.clone(), ns_watcher_rx.clone());
let configmap_drainer = Self::create_drainer::<ConfigMap>(context.clone(), ns_watcher_rx.clone());

(Self { state }, join3(secret_drainer, configmap_drainer, ns_watcher))
join!(task::spawn(ns_watcher), secret_drainer, configmap_drainer);
}

async fn create_drainer<T>(ctx: Context<Data>, ns_watcher_rx: async_broadcast::Receiver<()>) -> ()
Expand Down
6 changes: 2 additions & 4 deletions tests/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,18 @@ use kustd::{Manager, syncable::Syncable};

pub struct K8sContext {
client: Client,
manager: Manager,
random: String,
// ApiReource, namespace, name
to_cleanup: Vec<(ApiResource, Option<String>, String)>,
}

impl K8sContext {
pub async fn new() -> Self {
let (manager, future) = Manager::new().await;
tokio::task::spawn(future);
let manager = Manager::new().await;
tokio::task::spawn(manager.start());

Self {
client: get_client().await,
manager,
random: Self::gen_random(6),
to_cleanup: Vec::new(),
}
Expand Down

0 comments on commit 21de518

Please sign in to comment.