From f665d5de1fed92de9425dbf5ee9bb53a69027980 Mon Sep 17 00:00:00 2001 From: vsilent Date: Wed, 20 Mar 2024 15:04:05 +0200 Subject: [PATCH 1/7] mq listener, read queue and update status --- Dockerfile | 3 +- docker-compose.yml | 16 ++++++ docker/dev/docker-compose.yml | 20 +++++++ src/console/commands/mq/listener.rs | 86 ++++++++++++++++++----------- src/console/main.rs | 3 +- src/db/deployment.rs | 26 +++++++++ src/helpers/mq_manager.rs | 19 ++++++- src/routes/project/deploy.rs | 1 - 8 files changed, 135 insertions(+), 39 deletions(-) diff --git a/Dockerfile b/Dockerfile index 34fa4b0..66222d1 100644 --- a/Dockerfile +++ b/Dockerfile @@ -37,7 +37,7 @@ COPY ./src ./src # cargo build --release RUN apt-get update && apt-get install --no-install-recommends -y libssl-dev; \ - cargo build --release + cargo build --bin=console && cargo build --release #RUN ls -la /app/target/release/ >&2 @@ -51,6 +51,7 @@ RUN mkdir ./files && chmod 0777 ./files # copy binary and configuration files COPY --from=builder /app/target/release/server . +COPY --from=builder /app/target/release/console . COPY --from=builder /app/.env . COPY --from=builder /app/configuration.yaml . COPY --from=builder /usr/local/cargo/bin/sqlx sqlx diff --git a/docker-compose.yml b/docker-compose.yml index e9aee75..f87b3e2 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -29,6 +29,22 @@ services: # stackerdb: # condition: service_healthy + stacker_queue: + image: trydirect/stacker:0.0.7 + container_name: stacker_queue + restart: always + volumes: + - ./configuration.yaml:/app/configuration.yaml + - ./.env:/app/.env + environment: + - RUST_LOG=debug + - RUST_BACKTRACE=1 + env_file: + - ./.env + depends_on: + stackerdb: + condition: service_healthy + entrypoint: /app/console mq listen # stackerdb: # container_name: stackerdb diff --git a/docker/dev/docker-compose.yml b/docker/dev/docker-compose.yml index 6bf8eb5..3577b14 100644 --- a/docker/dev/docker-compose.yml +++ b/docker/dev/docker-compose.yml @@ -37,6 +37,26 @@ services: - backend + stacker_queue: + image: trydirect/stacker:0.0.7 + container_name: stacker_queue + restart: always + volumes: + - ./configuration.yaml:/app/configuration.yaml + - ./.env:/app/.env + environment: + - RUST_LOG=debug + - RUST_BACKTRACE=1 + env_file: + - ./.env + depends_on: + stackerdb: + condition: service_healthy + entrypoint: /app/console mq listen + networks: + - backend + + stackerdb: container_name: stackerdb healthcheck: diff --git a/src/console/commands/mq/listener.rs b/src/console/commands/mq/listener.rs index cfe329a..501d199 100644 --- a/src/console/commands/mq/listener.rs +++ b/src/console/commands/mq/listener.rs @@ -1,19 +1,31 @@ use crate::configuration::get_configuration; use actix_web::rt; use actix_web::web; +use chrono::Utc; use lapin::{Channel, Queue}; use lapin::options::{BasicAckOptions, BasicConsumeOptions}; use lapin::types::FieldTable; use sqlx::PgPool; use db::deployment; -use crate::{db, helpers}; -use crate::helpers::mq_manager; +use crate::{db, forms, helpers}; +use crate::helpers::{JsonResponse, mq_manager}; use crate::helpers::mq_manager::MqManager; use futures_lite::stream::StreamExt; +use serde_derive::{Deserialize, Serialize}; +use crate::forms::project::ProjectForm; pub struct ListenCommand { } +#[derive(Serialize, Deserialize, Debug)] +struct ProgressMessage { + alert: i32, + id: String, + message: String, + status: String, + progress: String +} + impl ListenCommand { pub fn new() -> Self { Self {} @@ -31,18 +43,20 @@ impl crate::console::commands::CallableTrait for ListenCommand { let db_pool = web::Data::new(db_pool); + println!("Declare exchange"); let mq_manager = MqManager::try_new(settings.amqp.connection_string())?; let consumer_channel= mq_manager .consume( "install_progress", - "install_progress_*******" + "install.progress.#" ) .await?; + println!("Declare queue"); let mut consumer = consumer_channel .basic_consume( - "install_progress", + "#", "console_listener", BasicConsumeOptions::default(), FieldTable::default(), @@ -50,38 +64,46 @@ impl crate::console::commands::CallableTrait for ListenCommand { .await .expect("Basic consume"); - // .map_err(|err| format!("Error {:?}", err)); + println!("Waiting for messages .."); + while let Some(delivery) = consumer.next().await { + // println!("checking messages delivery {:?}", delivery); + let delivery = delivery.expect("error in consumer"); + let s:String = match String::from_utf8(delivery.data.to_owned()) { + //delivery.data is of type Vec + Ok(v) => v, + Err(e) => panic!("Invalid UTF-8 sequence: {}", e), + }; - tracing::info!("will consume"); - // if let Ok(consumer) = consumer { - while let Some(delivery) = consumer.next().await { - let delivery = delivery.expect("error in consumer"); - delivery.ack(BasicAckOptions::default()).await.expect("ack"); - } - // } + match serde_json::from_str::(&s) { + Ok(msg) => { + println!("message {:?}", msg); + // println!("id {:?}", msg.id); + // println!("status {:?}", msg.status); + delivery.ack(BasicAckOptions::default()).await.expect("ack"); - // while let Some(delivery) = consumer.next().await { - // tracing::debug!(message=?delivery, "received message"); - // if let Ok(delivery) = delivery { - // delivery - // .ack(BasicAckOptions::default()) - // .await - // .expect("basic_ack"); - // } - // } + if msg.status == "complete" { + let id = msg.id + .parse::() + .map_err(|err| "Could not parse deployment id".to_string() )?; + match crate::db::deployment::fetch( + db_pool.get_ref(), id + ) + .await? { - // on_complete() - // let deployment = crate::models::deployment::Deployment { - // id: 0, - // project_id: 0, - // deleted: false, - // status: "".to_string(), - // body: Default::default(), - // created_at: Default::default(), - // updated_at: Default::default(), - // }; - // deployment::update(db_pool.get_ref(), deployment).await?; + Some(mut row) => { + row.status = msg.status; + row.updated_at = Utc::now(); + deployment::update(db_pool.get_ref(), row).await?; + println!("deployment {} completed successfully", id); + } + None => println!("Deployment record not found in db") + } + } + } + Err(err) => { tracing::debug!("Invalid message format")} + } + } Ok(()) }) diff --git a/src/console/main.rs b/src/console/main.rs index 752e10a..f6bc804 100644 --- a/src/console/main.rs +++ b/src/console/main.rs @@ -52,7 +52,6 @@ fn get_command(cli: Cli) -> Result Ok(Box::new( stacker::console::commands::mq::ListenCommand::new(), )), - }, - _ => Err("command does not match".to_string()), + } } } diff --git a/src/db/deployment.rs b/src/db/deployment.rs index 11e7a2e..7f78f0c 100644 --- a/src/db/deployment.rs +++ b/src/db/deployment.rs @@ -2,6 +2,32 @@ use crate::models; use sqlx::PgPool; use tracing::Instrument; + +pub async fn fetch(pool: &PgPool, id: i32) -> Result, String> { + tracing::info!("Fetch deployment {}", id); + sqlx::query_as!( + models::Deployment, + r#" + SELECT + * + FROM deployment + WHERE id=$1 + LIMIT 1 + "#, + id + ) + .fetch_one(pool) + .await + .map(|deployment| Some(deployment)) + .or_else(|err| match err { + sqlx::Error::RowNotFound => Ok(None), + e => { + tracing::error!("Failed to fetch deployment, error: {:?}", e); + Err("Could not fetch data".to_string()) + } + }) +} + pub async fn insert(pool: &PgPool, mut deployment: models::Deployment) -> Result { let query_span = tracing::info_span!("Saving new deployment into the database"); sqlx::query!( diff --git a/src/helpers/mq_manager.rs b/src/helpers/mq_manager.rs index c265825..f30d238 100644 --- a/src/helpers/mq_manager.rs +++ b/src/helpers/mq_manager.rs @@ -4,6 +4,7 @@ use lapin::{options::*, publisher_confirm::{Confirmation, PublisherConfirm}, Bas use lapin::types::AMQPType::ShortString; use lapin::types::{AMQPValue, FieldTable}; use serde::ser::Serialize; +use serde_valid::validation::error::Format::Default; #[derive(Debug)] pub struct MqManager { @@ -127,10 +128,22 @@ impl MqManager { .await .expect("Exchange declare failed"); + let mut args = FieldTable::default(); + args.insert("x-expires".into(), AMQPValue::LongUInt(180000)); + let queue = channel.queue_declare( - routing_key, - QueueDeclareOptions::default(), - Default::default(), + // "install_progress_all", + "#", + // "install_progress_hy181TZa4DaabUZWklsrxw", + QueueDeclareOptions { + passive: false, + durable: false, + exclusive: false, + auto_delete: true, + nowait: false, + }, + // FieldTable::default(), + args, ) .await .expect("Queue declare failed"); diff --git a/src/routes/project/deploy.rs b/src/routes/project/deploy.rs index b0665d8..9844035 100644 --- a/src/routes/project/deploy.rs +++ b/src/routes/project/deploy.rs @@ -49,7 +49,6 @@ pub async fn item( JsonResponse::::build().internal_server_error(err) })?; - // Save cloud credentials if requested let mut cloud_creds: models::Cloud = (&form.cloud).into(); cloud_creds.project_id = Some(id); From d4d21b441d3aae4d232d34c88d67fb100bbe5773 Mon Sep 17 00:00:00 2001 From: vsilent Date: Wed, 20 Mar 2024 15:13:24 +0200 Subject: [PATCH 2/7] mq listener, read queue and update status --- Dockerfile | 3 +- access_control.conf.dist | 2 +- docker-compose.yml | 16 ++++++ docker/dev/docker-compose.yml | 20 +++++++ src/console/commands/mq/listener.rs | 86 ++++++++++++++++++----------- src/console/main.rs | 3 +- src/db/deployment.rs | 26 +++++++++ src/forms/cloud.rs | 12 ++-- src/forms/mod.rs | 1 + src/forms/server.rs | 12 ++-- src/forms/user.rs | 4 +- src/helpers/mq_manager.rs | 19 ++++++- src/routes/cloud/add.rs | 3 +- src/routes/cloud/update.rs | 3 +- src/routes/project/deploy.rs | 33 +++++------ src/routes/server/update.rs | 3 +- src/startup.rs | 1 - tests/middleware_trydirect.rs | 4 -- 18 files changed, 172 insertions(+), 79 deletions(-) diff --git a/Dockerfile b/Dockerfile index 34fa4b0..66222d1 100644 --- a/Dockerfile +++ b/Dockerfile @@ -37,7 +37,7 @@ COPY ./src ./src # cargo build --release RUN apt-get update && apt-get install --no-install-recommends -y libssl-dev; \ - cargo build --release + cargo build --bin=console && cargo build --release #RUN ls -la /app/target/release/ >&2 @@ -51,6 +51,7 @@ RUN mkdir ./files && chmod 0777 ./files # copy binary and configuration files COPY --from=builder /app/target/release/server . +COPY --from=builder /app/target/release/console . COPY --from=builder /app/.env . COPY --from=builder /app/configuration.yaml . COPY --from=builder /usr/local/cargo/bin/sqlx sqlx diff --git a/access_control.conf.dist b/access_control.conf.dist index 459a15f..f164af1 100644 --- a/access_control.conf.dist +++ b/access_control.conf.dist @@ -11,4 +11,4 @@ g = _, _ e = some(where (p.eft == allow)) [matchers] -m = g(r.sub, p.sub) && r.obj == p.obj && regexMatch(r.act, p.act) +m = g(r.sub, p.sub) && keyMatch2(r.obj, p.obj) && r.act == p.act diff --git a/docker-compose.yml b/docker-compose.yml index e9aee75..f87b3e2 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -29,6 +29,22 @@ services: # stackerdb: # condition: service_healthy + stacker_queue: + image: trydirect/stacker:0.0.7 + container_name: stacker_queue + restart: always + volumes: + - ./configuration.yaml:/app/configuration.yaml + - ./.env:/app/.env + environment: + - RUST_LOG=debug + - RUST_BACKTRACE=1 + env_file: + - ./.env + depends_on: + stackerdb: + condition: service_healthy + entrypoint: /app/console mq listen # stackerdb: # container_name: stackerdb diff --git a/docker/dev/docker-compose.yml b/docker/dev/docker-compose.yml index 6bf8eb5..3577b14 100644 --- a/docker/dev/docker-compose.yml +++ b/docker/dev/docker-compose.yml @@ -37,6 +37,26 @@ services: - backend + stacker_queue: + image: trydirect/stacker:0.0.7 + container_name: stacker_queue + restart: always + volumes: + - ./configuration.yaml:/app/configuration.yaml + - ./.env:/app/.env + environment: + - RUST_LOG=debug + - RUST_BACKTRACE=1 + env_file: + - ./.env + depends_on: + stackerdb: + condition: service_healthy + entrypoint: /app/console mq listen + networks: + - backend + + stackerdb: container_name: stackerdb healthcheck: diff --git a/src/console/commands/mq/listener.rs b/src/console/commands/mq/listener.rs index cfe329a..501d199 100644 --- a/src/console/commands/mq/listener.rs +++ b/src/console/commands/mq/listener.rs @@ -1,19 +1,31 @@ use crate::configuration::get_configuration; use actix_web::rt; use actix_web::web; +use chrono::Utc; use lapin::{Channel, Queue}; use lapin::options::{BasicAckOptions, BasicConsumeOptions}; use lapin::types::FieldTable; use sqlx::PgPool; use db::deployment; -use crate::{db, helpers}; -use crate::helpers::mq_manager; +use crate::{db, forms, helpers}; +use crate::helpers::{JsonResponse, mq_manager}; use crate::helpers::mq_manager::MqManager; use futures_lite::stream::StreamExt; +use serde_derive::{Deserialize, Serialize}; +use crate::forms::project::ProjectForm; pub struct ListenCommand { } +#[derive(Serialize, Deserialize, Debug)] +struct ProgressMessage { + alert: i32, + id: String, + message: String, + status: String, + progress: String +} + impl ListenCommand { pub fn new() -> Self { Self {} @@ -31,18 +43,20 @@ impl crate::console::commands::CallableTrait for ListenCommand { let db_pool = web::Data::new(db_pool); + println!("Declare exchange"); let mq_manager = MqManager::try_new(settings.amqp.connection_string())?; let consumer_channel= mq_manager .consume( "install_progress", - "install_progress_*******" + "install.progress.#" ) .await?; + println!("Declare queue"); let mut consumer = consumer_channel .basic_consume( - "install_progress", + "#", "console_listener", BasicConsumeOptions::default(), FieldTable::default(), @@ -50,38 +64,46 @@ impl crate::console::commands::CallableTrait for ListenCommand { .await .expect("Basic consume"); - // .map_err(|err| format!("Error {:?}", err)); + println!("Waiting for messages .."); + while let Some(delivery) = consumer.next().await { + // println!("checking messages delivery {:?}", delivery); + let delivery = delivery.expect("error in consumer"); + let s:String = match String::from_utf8(delivery.data.to_owned()) { + //delivery.data is of type Vec + Ok(v) => v, + Err(e) => panic!("Invalid UTF-8 sequence: {}", e), + }; - tracing::info!("will consume"); - // if let Ok(consumer) = consumer { - while let Some(delivery) = consumer.next().await { - let delivery = delivery.expect("error in consumer"); - delivery.ack(BasicAckOptions::default()).await.expect("ack"); - } - // } + match serde_json::from_str::(&s) { + Ok(msg) => { + println!("message {:?}", msg); + // println!("id {:?}", msg.id); + // println!("status {:?}", msg.status); + delivery.ack(BasicAckOptions::default()).await.expect("ack"); - // while let Some(delivery) = consumer.next().await { - // tracing::debug!(message=?delivery, "received message"); - // if let Ok(delivery) = delivery { - // delivery - // .ack(BasicAckOptions::default()) - // .await - // .expect("basic_ack"); - // } - // } + if msg.status == "complete" { + let id = msg.id + .parse::() + .map_err(|err| "Could not parse deployment id".to_string() )?; + match crate::db::deployment::fetch( + db_pool.get_ref(), id + ) + .await? { - // on_complete() - // let deployment = crate::models::deployment::Deployment { - // id: 0, - // project_id: 0, - // deleted: false, - // status: "".to_string(), - // body: Default::default(), - // created_at: Default::default(), - // updated_at: Default::default(), - // }; - // deployment::update(db_pool.get_ref(), deployment).await?; + Some(mut row) => { + row.status = msg.status; + row.updated_at = Utc::now(); + deployment::update(db_pool.get_ref(), row).await?; + println!("deployment {} completed successfully", id); + } + None => println!("Deployment record not found in db") + } + } + } + Err(err) => { tracing::debug!("Invalid message format")} + } + } Ok(()) }) diff --git a/src/console/main.rs b/src/console/main.rs index 752e10a..f6bc804 100644 --- a/src/console/main.rs +++ b/src/console/main.rs @@ -52,7 +52,6 @@ fn get_command(cli: Cli) -> Result Ok(Box::new( stacker::console::commands::mq::ListenCommand::new(), )), - }, - _ => Err("command does not match".to_string()), + } } } diff --git a/src/db/deployment.rs b/src/db/deployment.rs index 11e7a2e..7f78f0c 100644 --- a/src/db/deployment.rs +++ b/src/db/deployment.rs @@ -2,6 +2,32 @@ use crate::models; use sqlx::PgPool; use tracing::Instrument; + +pub async fn fetch(pool: &PgPool, id: i32) -> Result, String> { + tracing::info!("Fetch deployment {}", id); + sqlx::query_as!( + models::Deployment, + r#" + SELECT + * + FROM deployment + WHERE id=$1 + LIMIT 1 + "#, + id + ) + .fetch_one(pool) + .await + .map(|deployment| Some(deployment)) + .or_else(|err| match err { + sqlx::Error::RowNotFound => Ok(None), + e => { + tracing::error!("Failed to fetch deployment, error: {:?}", e); + Err("Could not fetch data".to_string()) + } + }) +} + pub async fn insert(pool: &PgPool, mut deployment: models::Deployment) -> Result { let query_span = tracing::info_span!("Saving new deployment into the database"); sqlx::query!( diff --git a/src/forms/cloud.rs b/src/forms/cloud.rs index 138b298..472bdb7 100644 --- a/src/forms/cloud.rs +++ b/src/forms/cloud.rs @@ -15,14 +15,14 @@ pub struct Cloud { pub save_token: Option, } -impl Into for Cloud { +impl Into for &Cloud { fn into(self) -> models::Cloud { let mut cloud = models::Cloud::default(); - cloud.provider = self.provider; - cloud.cloud_token = self.cloud_token; - cloud.cloud_key = self.cloud_key; - cloud.cloud_secret = self.cloud_secret; - cloud.save_token = self.save_token; + cloud.provider = self.provider.clone(); + cloud.cloud_token = self.cloud_token.clone(); + cloud.cloud_key = self.cloud_key.clone(); + cloud.cloud_secret = self.cloud_secret.clone(); + cloud.save_token = self.save_token.clone(); cloud } diff --git a/src/forms/mod.rs b/src/forms/mod.rs index f5fcc9c..0647181 100644 --- a/src/forms/mod.rs +++ b/src/forms/mod.rs @@ -7,3 +7,4 @@ pub(crate) mod server; pub use rating::*; pub use cloud::*; pub use server::*; +pub use user::UserForm; diff --git a/src/forms/server.rs b/src/forms/server.rs index 1e55a89..cba997f 100644 --- a/src/forms/server.rs +++ b/src/forms/server.rs @@ -14,14 +14,14 @@ pub struct Server { pub disk_type: Option, } -impl Into for Server { +impl Into for &Server { fn into(self) -> models::Server { let mut server = models::Server::default(); - server.disk_type = self.disk_type; - server.region = self.region; - server.server = self.server; - server.zone = self.zone; - server.os = self.os; + server.disk_type = self.disk_type.clone(); + server.region = self.region.clone(); + server.server = self.server.clone(); + server.zone = self.zone.clone(); + server.os = self.os.clone(); server.created_at = Utc::now(); server.updated_at = Utc::now(); diff --git a/src/forms/user.rs b/src/forms/user.rs index 8ff0b4c..0fd0024 100644 --- a/src/forms/user.rs +++ b/src/forms/user.rs @@ -10,6 +10,8 @@ pub struct UserForm { pub user: User, } +//todo deref for UserForm. userForm.id, userForm.first_name + #[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize, Validate)] #[serde(rename_all = "camelCase")] pub struct User { @@ -139,4 +141,4 @@ impl TryInto for UserForm { }) } -} \ No newline at end of file +} diff --git a/src/helpers/mq_manager.rs b/src/helpers/mq_manager.rs index c265825..f30d238 100644 --- a/src/helpers/mq_manager.rs +++ b/src/helpers/mq_manager.rs @@ -4,6 +4,7 @@ use lapin::{options::*, publisher_confirm::{Confirmation, PublisherConfirm}, Bas use lapin::types::AMQPType::ShortString; use lapin::types::{AMQPValue, FieldTable}; use serde::ser::Serialize; +use serde_valid::validation::error::Format::Default; #[derive(Debug)] pub struct MqManager { @@ -127,10 +128,22 @@ impl MqManager { .await .expect("Exchange declare failed"); + let mut args = FieldTable::default(); + args.insert("x-expires".into(), AMQPValue::LongUInt(180000)); + let queue = channel.queue_declare( - routing_key, - QueueDeclareOptions::default(), - Default::default(), + // "install_progress_all", + "#", + // "install_progress_hy181TZa4DaabUZWklsrxw", + QueueDeclareOptions { + passive: false, + durable: false, + exclusive: false, + auto_delete: true, + nowait: false, + }, + // FieldTable::default(), + args, ) .await .expect("Queue declare failed"); diff --git a/src/routes/cloud/add.rs b/src/routes/cloud/add.rs index cb57209..6178ff1 100644 --- a/src/routes/cloud/add.rs +++ b/src/routes/cloud/add.rs @@ -1,3 +1,4 @@ +use std::ops::Deref; use crate::forms; use crate::helpers::JsonResponse; use crate::models; @@ -29,7 +30,7 @@ pub async fn add( return Err(JsonResponse::::build().form_error(errors)); } - let mut cloud: models::Cloud = form.into_inner().into(); + let mut cloud: models::Cloud = form.deref().into(); cloud.user_id = user.id.clone(); db::cloud::insert(pg_pool.get_ref(), cloud) diff --git a/src/routes/cloud/update.rs b/src/routes/cloud/update.rs index 3e2e564..b48fd8b 100644 --- a/src/routes/cloud/update.rs +++ b/src/routes/cloud/update.rs @@ -7,6 +7,7 @@ use serde_valid::Validate; use sqlx::PgPool; use std::sync::Arc; use tracing::Instrument; +use std::ops::Deref; #[tracing::instrument(name = "Update cloud.")] #[put("/{id}")] @@ -33,7 +34,7 @@ pub async fn item( return Err(JsonResponse::::build().form_error(errors.to_string())); } - let mut cloud:models::Cloud = form.into_inner().into(); + let mut cloud:models::Cloud = form.deref().into(); cloud.id = cloud_row.id; cloud.user_id = user.id.clone(); diff --git a/src/routes/project/deploy.rs b/src/routes/project/deploy.rs index a1261ab..9844035 100644 --- a/src/routes/project/deploy.rs +++ b/src/routes/project/deploy.rs @@ -43,33 +43,30 @@ pub async fn item( })?; // Build compose - let id = project.id.clone(); + let id = project.id; let dc = DcBuilder::new(project); let fc = dc.build().map_err(|err| { JsonResponse::::build().internal_server_error(err) })?; - // Save cloud credentials if requested - let mut cloud_creds: models::Cloud = form.cloud.clone().into(); - cloud_creds.project_id = Some(id.clone()); + let mut cloud_creds: models::Cloud = (&form.cloud).into(); + cloud_creds.project_id = Some(id); cloud_creds.user_id = user.id.clone(); - if let Some(save_token) = cloud_creds.save_token { - if save_token { - db::cloud::insert(pg_pool.get_ref(), cloud_creds.clone()) - .await - .map(|cloud| cloud) - .map_err(|_| { - JsonResponse::::build().internal_server_error("Internal Server Error") - })?; - } + if Some(true) == cloud_creds.save_token { + db::cloud::insert(pg_pool.get_ref(), cloud_creds.clone()) + .await + .map(|cloud| cloud) + .map_err(|_| { + JsonResponse::::build().internal_server_error("Internal Server Error") + })?; } // Save server type and region - let mut server: models::Server = form.server.clone().into(); + let mut server: models::Server = (&form.server).into(); server.user_id = user.id.clone(); - server.project_id = id.clone(); + server.project_id = id; let server = db::server::insert(pg_pool.get_ref(), server) .await .map(|server| server) @@ -89,10 +86,9 @@ pub async fn item( payload.docker_compose = Some(compress(fc.as_str())); // Store deployment attempts into deployment table in db - let project_id = dc.project.id.clone(); let json_request = dc.project.body.clone(); let deployment = models::Deployment::new( - project_id, + dc.project.id, String::from("pending"), json_request ); @@ -189,10 +185,9 @@ pub async fn saved_item( payload.docker_compose = Some(compress(fc.as_str())); - let project_id = dc.project.id.clone(); let json_request = dc.project.body.clone(); let deployment = models::Deployment::new( - project_id, + dc.project.id, String::from("pending"), json_request ); diff --git a/src/routes/server/update.rs b/src/routes/server/update.rs index 6220869..02c4b0e 100644 --- a/src/routes/server/update.rs +++ b/src/routes/server/update.rs @@ -7,6 +7,7 @@ use serde_valid::Validate; use sqlx::PgPool; use std::sync::Arc; use tracing::Instrument; +use std::ops::Deref; #[tracing::instrument(name = "Update server.")] #[put("/{id}")] @@ -33,7 +34,7 @@ pub async fn item( return Err(JsonResponse::::build().form_error(errors.to_string())); } - let mut server:models::Server = form.into_inner().into(); + let mut server:models::Server = form.deref().into(); server.id = server_row.id; server.project_id = server_row.project_id; server.user_id = user.id.clone(); diff --git a/src/startup.rs b/src/startup.rs index 2c7c4ba..a95acfb 100644 --- a/src/startup.rs +++ b/src/startup.rs @@ -7,7 +7,6 @@ use actix_web::{ App, HttpServer, }; use crate::middleware; -use actix_web_httpauth::middleware::HttpAuthentication; use sqlx::{Pool, Postgres}; use std::net::TcpListener; use tracing_actix_web::TracingLogger; diff --git a/tests/middleware_trydirect.rs b/tests/middleware_trydirect.rs index 0590618..4937781 100644 --- a/tests/middleware_trydirect.rs +++ b/tests/middleware_trydirect.rs @@ -22,8 +22,4 @@ async fn middleware_trydirect_works() { assert!(response.status().is_success()); assert_eq!(Some(0), response.content_length()); - - - //todo header stacker-id not found - // } From dc6c54fffd04b58e2a6ec4ec3b7c8464be0f6dd2 Mon Sep 17 00:00:00 2001 From: vsilent Date: Wed, 20 Mar 2024 15:42:10 +0200 Subject: [PATCH 3/7] fix warnings --- docker-compose.yml | 32 ++++++++++++++++---------------- src/db/cloud.rs | 2 +- src/db/project.rs | 4 ++-- src/db/server.rs | 2 +- src/forms/project/form.rs | 2 +- src/helpers/dockerhub.rs | 6 +++--- src/routes/project/deploy.rs | 4 ++-- src/routes/server/update.rs | 2 +- src/startup.rs | 1 + 9 files changed, 28 insertions(+), 27 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index f87b3e2..eeb10eb 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -29,22 +29,22 @@ services: # stackerdb: # condition: service_healthy - stacker_queue: - image: trydirect/stacker:0.0.7 - container_name: stacker_queue - restart: always - volumes: - - ./configuration.yaml:/app/configuration.yaml - - ./.env:/app/.env - environment: - - RUST_LOG=debug - - RUST_BACKTRACE=1 - env_file: - - ./.env - depends_on: - stackerdb: - condition: service_healthy - entrypoint: /app/console mq listen +# stacker_queue: +# image: trydirect/stacker:0.0.7 +# container_name: stacker_queue +# restart: always +# volumes: +# - ./configuration.yaml:/app/configuration.yaml +# - ./.env:/app/.env +# environment: +# - RUST_LOG=debug +# - RUST_BACKTRACE=1 +# env_file: +# - ./.env +# depends_on: +# stackerdb: +# condition: service_healthy +# entrypoint: /app/console mq listen # stackerdb: # container_name: stackerdb diff --git a/src/db/cloud.rs b/src/db/cloud.rs index 07f5285..a5e71c2 100644 --- a/src/db/cloud.rs +++ b/src/db/cloud.rs @@ -151,7 +151,7 @@ pub async fn delete(pool: &PgPool, id: i32) -> Result { }); Ok(true) } - Err(err) => { + Err(_err) => { let _ = tx.rollback().await.map_err(|err| println!("{:?}", err)); Ok(false) } diff --git a/src/db/project.rs b/src/db/project.rs index d1f469f..b7978e6 100644 --- a/src/db/project.rs +++ b/src/db/project.rs @@ -167,14 +167,14 @@ pub async fn delete(pool: &PgPool, id: i32) -> Result { }) { Ok(_) => { - tx.commit().await.map_err(|err| { + let _ = tx.commit().await.map_err(|err| { tracing::error!("Failed to commit transaction: {:?}", err); false }); Ok(true) } Err(err) => { - tx.rollback().await.map_err(|err| println!("{:?}", err)); + let _ = tx.rollback().await.map_err(|err| println!("{:?}", err)); Ok(false) } // todo, when empty commit() diff --git a/src/db/server.rs b/src/db/server.rs index 86f147b..75513a9 100644 --- a/src/db/server.rs +++ b/src/db/server.rs @@ -182,7 +182,7 @@ pub async fn delete(pool: &PgPool, id: i32) -> Result { }); Ok(true) } - Err(err) => { + Err(_err) => { let _ = tx.rollback().await.map_err(|err| println!("{:?}", err)); Ok(false) } diff --git a/src/forms/project/form.rs b/src/forms/project/form.rs index d2498cf..c122ffc 100644 --- a/src/forms/project/form.rs +++ b/src/forms/project/form.rs @@ -63,7 +63,7 @@ pub(crate) async fn body_into_form(body: Bytes) -> actix_web::Result::build().bad_request(msg) }) - .and_then(|mut form: forms::project::ProjectForm| { + .and_then(|form: forms::project::ProjectForm| { if !form.validate().is_ok() { let errors = form.validate().unwrap_err().to_string(); let err_msg = format!("Invalid data received {:?}", &errors); diff --git a/src/helpers/dockerhub.rs b/src/helpers/dockerhub.rs index d5e4ad6..d7771c7 100644 --- a/src/helpers/dockerhub.rs +++ b/src/helpers/dockerhub.rs @@ -111,7 +111,7 @@ impl<'a> DockerHub<'a> { let client = reqwest::Client::new() .get(url) .header("Accept", "application/json"); - let mut client = self.set_token(client).await?; + let client = self.set_token(client).await?; client .send() .await @@ -146,7 +146,7 @@ impl<'a> DockerHub<'a> { let client = reqwest::Client::new() .get(url) .header("Accept", "application/json"); - let mut client = self.set_token(client).await?; + let client = self.set_token(client).await?; client .send() .await @@ -186,7 +186,7 @@ impl<'a> DockerHub<'a> { } } - pub async fn set_token(&self, mut client: RequestBuilder) -> Result { + pub async fn set_token(&self, client: RequestBuilder) -> Result { if self.creds.password.is_empty() { tracing::debug!("Password is empty. Image should be public"); return Ok(client); diff --git a/src/routes/project/deploy.rs b/src/routes/project/deploy.rs index 9844035..f3d242a 100644 --- a/src/routes/project/deploy.rs +++ b/src/routes/project/deploy.rs @@ -156,7 +156,7 @@ pub async fn saved_item( } } } - Err(e) => { + Err(_e) => { return Err(JsonResponse::::build().not_found("No cloud configured")); } }; @@ -169,7 +169,7 @@ pub async fn saved_item( // } server.into_iter().nth(0).unwrap() // @todo refactoring is required } - Err(err) => { + Err(_e) => { return Err(JsonResponse::::build().not_found("No servers configured")); } }; diff --git a/src/routes/server/update.rs b/src/routes/server/update.rs index 02c4b0e..4f83d1e 100644 --- a/src/routes/server/update.rs +++ b/src/routes/server/update.rs @@ -19,7 +19,7 @@ pub async fn item( ) -> Result { let id = path.0; - let mut server_row = db::server::fetch(pg_pool.get_ref(), id) + let server_row = db::server::fetch(pg_pool.get_ref(), id) .await .map_err(|err| JsonResponse::::build().internal_server_error(err)) .and_then(|server| match server { diff --git a/src/startup.rs b/src/startup.rs index a95acfb..2c7c4ba 100644 --- a/src/startup.rs +++ b/src/startup.rs @@ -7,6 +7,7 @@ use actix_web::{ App, HttpServer, }; use crate::middleware; +use actix_web_httpauth::middleware::HttpAuthentication; use sqlx::{Pool, Postgres}; use std::net::TcpListener; use tracing_actix_web::TracingLogger; From 3e367ee3dfdc4259039c3ec8a6695141ed784eec Mon Sep 17 00:00:00 2001 From: vsilent Date: Wed, 20 Mar 2024 15:51:32 +0200 Subject: [PATCH 4/7] fix restart policy for service --- src/forms/project/app.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/forms/project/app.rs b/src/forms/project/app.rs index 64598e6..56a5da9 100644 --- a/src/forms/project/app.rs +++ b/src/forms/project/app.rs @@ -143,7 +143,7 @@ impl App { service.ports = dctypes::Ports::Long(ports); - service.restart = Some("always".to_owned()); + service.restart = Some(self.restart.clone()); service.volumes = volumes; service.environment = dctypes::Environment::KvPair(envs); From 858ab0e890f8073e4856135f7314d9a3d1c6d4d8 Mon Sep 17 00:00:00 2001 From: vsilent Date: Thu, 21 Mar 2024 11:52:25 +0200 Subject: [PATCH 5/7] import fixes --- src/console/commands/mq/listener.rs | 25 ++++++++++++++----------- src/db/project.rs | 2 +- src/forms/cloud.rs | 1 - src/forms/project/compose_networks.rs | 1 - src/forms/project/form.rs | 1 - src/forms/project/network.rs | 2 -- src/forms/project/payload.rs | 1 - src/forms/project/port.rs | 2 +- src/forms/user.rs | 1 - src/helpers/dockerhub.rs | 10 ++++++---- src/helpers/mq_manager.rs | 5 +---- src/helpers/project/builder.rs | 7 +++---- src/middleware/access_manager.rs | 2 +- src/middleware/trydirect.rs | 1 - src/models/deployment.rs | 7 ++++++- src/models/project.rs | 7 ++++++- src/routes/client/add.rs | 1 - src/routes/client/disable.rs | 1 - src/routes/client/enable.rs | 1 - src/routes/client/update.rs | 1 - src/routes/cloud/add.rs | 1 - src/routes/cloud/delete.rs | 4 +--- src/routes/cloud/get.rs | 1 - src/routes/cloud/mod.rs | 8 ++++---- src/routes/cloud/update.rs | 3 +-- src/routes/project/compose.rs | 2 +- src/routes/project/delete.rs | 4 +--- src/routes/project/get.rs | 9 ++++----- src/routes/project/mod.rs | 8 ++++---- src/routes/project/update.rs | 1 - src/routes/rating/add.rs | 1 - src/routes/rating/get.rs | 1 - src/routes/server/delete.rs | 3 +-- src/routes/server/get.rs | 2 +- src/routes/server/mod.rs | 8 ++++---- src/routes/server/update.rs | 3 +-- 36 files changed, 62 insertions(+), 76 deletions(-) diff --git a/src/console/commands/mq/listener.rs b/src/console/commands/mq/listener.rs index 501d199..a9e6c04 100644 --- a/src/console/commands/mq/listener.rs +++ b/src/console/commands/mq/listener.rs @@ -2,25 +2,24 @@ use crate::configuration::get_configuration; use actix_web::rt; use actix_web::web; use chrono::Utc; -use lapin::{Channel, Queue}; use lapin::options::{BasicAckOptions, BasicConsumeOptions}; use lapin::types::FieldTable; use sqlx::PgPool; use db::deployment; -use crate::{db, forms, helpers}; -use crate::helpers::{JsonResponse, mq_manager}; +use crate::db; use crate::helpers::mq_manager::MqManager; use futures_lite::stream::StreamExt; use serde_derive::{Deserialize, Serialize}; -use crate::forms::project::ProjectForm; +// use crate::forms::project::ProjectForm; pub struct ListenCommand { } #[derive(Serialize, Deserialize, Debug)] struct ProgressMessage { - alert: i32, id: String, + deploy_id: Option, + alert: i32, message: String, status: String, progress: String @@ -74,17 +73,19 @@ impl crate::console::commands::CallableTrait for ListenCommand { Err(e) => panic!("Invalid UTF-8 sequence: {}", e), }; + println!("incoming data {:?}", s); + let statuses = vec!["complete", "paused", "failed"]; match serde_json::from_str::(&s) { Ok(msg) => { - println!("message {:?}", msg); + println!("message {:?}", s); // println!("id {:?}", msg.id); // println!("status {:?}", msg.status); - delivery.ack(BasicAckOptions::default()).await.expect("ack"); - if msg.status == "complete" { - let id = msg.id + if statuses.contains(&(msg.status.as_str())) { + println!("Process on complete status"); + let id = msg.deploy_id.unwrap() .parse::() - .map_err(|err| "Could not parse deployment id".to_string() )?; + .map_err(|_err| "Could not parse deployment id".to_string() )?; match crate::db::deployment::fetch( db_pool.get_ref(), id @@ -101,8 +102,10 @@ impl crate::console::commands::CallableTrait for ListenCommand { } } } - Err(err) => { tracing::debug!("Invalid message format")} + Err(_err) => { tracing::debug!("Invalid message format {:?}", _err)} } + + delivery.ack(BasicAckOptions::default()).await.expect("ack"); } Ok(()) diff --git a/src/db/project.rs b/src/db/project.rs index b7978e6..0e8e24c 100644 --- a/src/db/project.rs +++ b/src/db/project.rs @@ -173,7 +173,7 @@ pub async fn delete(pool: &PgPool, id: i32) -> Result { }); Ok(true) } - Err(err) => { + Err(_err) => { let _ = tx.rollback().await.map_err(|err| println!("{:?}", err)); Ok(false) } diff --git a/src/forms/cloud.rs b/src/forms/cloud.rs index 472bdb7..ab040c3 100644 --- a/src/forms/cloud.rs +++ b/src/forms/cloud.rs @@ -1,7 +1,6 @@ use crate::models; use serde::{Deserialize, Serialize}; use serde_valid::Validate; -use chrono::Utc; #[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize, Validate)] pub struct Cloud { diff --git a/src/forms/project/compose_networks.rs b/src/forms/project/compose_networks.rs index bb45a36..b38eb8f 100644 --- a/src/forms/project/compose_networks.rs +++ b/src/forms/project/compose_networks.rs @@ -1,7 +1,6 @@ use serde::{Deserialize, Serialize}; use docker_compose_types as dctypes; use indexmap::IndexMap; -use crate::forms::project; use crate::forms::project::network::Network; #[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)] diff --git a/src/forms/project/form.rs b/src/forms/project/form.rs index c122ffc..3924c6c 100644 --- a/src/forms/project/form.rs +++ b/src/forms/project/form.rs @@ -1,5 +1,4 @@ use serde::{Deserialize, Serialize}; -use serde_json::Value; use serde_valid::Validate; use actix_web::Error; use actix_web::web::Bytes; diff --git a/src/forms/project/network.rs b/src/forms/project/network.rs index 9b848ff..2e0e183 100644 --- a/src/forms/project/network.rs +++ b/src/forms/project/network.rs @@ -1,8 +1,6 @@ use serde::{Deserialize, Serialize}; use serde_valid::Validate; -use crate::forms::project; use docker_compose_types as dctypes; -use indexmap::IndexMap; use crate::forms::project::NetworkDriver; diff --git a/src/forms/project/payload.rs b/src/forms/project/payload.rs index 7108a1f..bc8e8fb 100644 --- a/src/forms/project/payload.rs +++ b/src/forms/project/payload.rs @@ -1,7 +1,6 @@ use std::convert::TryFrom; use crate::models; use crate::forms; -use serde_json::Value; use serde::{Deserialize, Serialize}; use serde_valid::Validate; diff --git a/src/forms/project/port.rs b/src/forms/project/port.rs index 4da0c49..06c3020 100644 --- a/src/forms/project/port.rs +++ b/src/forms/project/port.rs @@ -53,7 +53,7 @@ impl TryInto for &Port { let cp = self.container_port .clone() .parse::() - .map_err(|err| "Could not parse container port".to_string() )?; + .map_err(|_err| "Could not parse container port".to_string() )?; let hp = match self.host_port.clone() { Some(hp) => { diff --git a/src/forms/user.rs b/src/forms/user.rs index 0fd0024..5a7fa48 100644 --- a/src/forms/user.rs +++ b/src/forms/user.rs @@ -1,7 +1,6 @@ use serde_derive::{Serialize, Deserialize}; use serde_json::Value; use serde_valid::{Validate}; -use tracing_subscriber::fmt::format; use crate::models::user::User as UserModel; #[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)] diff --git a/src/helpers/dockerhub.rs b/src/helpers/dockerhub.rs index d7771c7..fa41df7 100644 --- a/src/helpers/dockerhub.rs +++ b/src/helpers/dockerhub.rs @@ -116,12 +116,12 @@ impl<'a> DockerHub<'a> { .send() .await .map_err(|err| { - tracing::debug!("Error response {:?}", err); + tracing::debug!("🟥Error response {:?}", err); format!("{}", err) })? .json::() .await - .map_err(|err| format!("Error on getting results:: {}", err)) + .map_err(|err| format!("🟥Error on getting results:: {}", err)) .map(|repositories| { tracing::debug!("Get public image repositories response {:?}", repositories); if repositories.count.unwrap_or(0) > 0 { @@ -130,6 +130,7 @@ impl<'a> DockerHub<'a> { .results .into_iter() .any(|repo| repo.status == 1); + tracing::debug!("✅ Image is active"); active } else { false @@ -150,11 +151,11 @@ impl<'a> DockerHub<'a> { client .send() .await - .map_err(|err| format!("{}", err))? + .map_err(|err| format!("🟥{}", err))? .json::() .await .map_err(|err| { - tracing::debug!("Error response {:?}", err); + tracing::debug!("🟥Error response {:?}", err); format!("{}", err) }) .map(|tags| { @@ -165,6 +166,7 @@ impl<'a> DockerHub<'a> { .results .into_iter() .any(|tag| tag.tag_status.contains("active")); + tracing::debug!("✅ Image is active"); active } else { false diff --git a/src/helpers/mq_manager.rs b/src/helpers/mq_manager.rs index f30d238..f205f1a 100644 --- a/src/helpers/mq_manager.rs +++ b/src/helpers/mq_manager.rs @@ -1,10 +1,7 @@ -use actix_web::web; use deadpool_lapin::{Config, CreatePoolError, Object, Pool, Runtime}; -use lapin::{options::*, publisher_confirm::{Confirmation, PublisherConfirm}, BasicProperties, Channel, ExchangeKind, Queue}; -use lapin::types::AMQPType::ShortString; +use lapin::{options::*, publisher_confirm::{Confirmation, PublisherConfirm}, BasicProperties, Channel, ExchangeKind}; use lapin::types::{AMQPValue, FieldTable}; use serde::ser::Serialize; -use serde_valid::validation::error::Format::Default; #[derive(Debug)] pub struct MqManager { diff --git a/src/helpers/project/builder.rs b/src/helpers/project/builder.rs index 7112fe3..9c2a33a 100644 --- a/src/helpers/project/builder.rs +++ b/src/helpers/project/builder.rs @@ -2,14 +2,13 @@ use crate::forms; use docker_compose_types as dctypes; use crate::models; use serde_yaml; -use crate::helpers::project::*; -use tracing::Value; +// use crate::helpers::project::*; /// A builder for constructing docker compose. #[derive(Clone, Debug)] pub struct DcBuilder { - config: Config, + // config: Config, pub(crate) project: models::Project, } @@ -17,7 +16,7 @@ pub struct DcBuilder { impl DcBuilder { pub fn new(project: models::Project) -> Self { DcBuilder { - config: Config::default(), + // config: Config::default(), project, } } diff --git a/src/middleware/access_manager.rs b/src/middleware/access_manager.rs index 4760114..f251e9d 100644 --- a/src/middleware/access_manager.rs +++ b/src/middleware/access_manager.rs @@ -17,7 +17,7 @@ pub async fn try_new(db_connection_address: String) -> Result Self { Deployment { + id: 0, + project_id: 0, + deleted: None, status: "pending".to_string(), - ..Default::default() + body: Default::default(), + created_at: Default::default(), + updated_at: Default::default(), } } } diff --git a/src/models/project.rs b/src/models/project.rs index 7742267..29b260b 100644 --- a/src/models/project.rs +++ b/src/models/project.rs @@ -34,9 +34,14 @@ impl Project { impl Default for Project { fn default() -> Self { Project { + id: 0, + stack_id: Default::default(), user_id: "".to_string(), name: "".to_string(), - ..Default::default() + body: Default::default(), + request_json: Default::default(), + created_at: Default::default(), + updated_at: Default::default(), } } } diff --git a/src/routes/client/add.rs b/src/routes/client/add.rs index 0b526d1..bddbb74 100644 --- a/src/routes/client/add.rs +++ b/src/routes/client/add.rs @@ -6,7 +6,6 @@ use crate::models; use actix_web::{post, web, Responder, Result}; use sqlx::PgPool; use std::sync::Arc; -use tracing::Instrument; #[tracing::instrument(name = "Add client.")] #[post("")] diff --git a/src/routes/client/disable.rs b/src/routes/client/disable.rs index 70d2a1c..eb4f43a 100644 --- a/src/routes/client/disable.rs +++ b/src/routes/client/disable.rs @@ -5,7 +5,6 @@ use crate::models; use actix_web::{put, web, Responder, Result}; use sqlx::PgPool; use std::sync::Arc; -use tracing::Instrument; #[tracing::instrument(name = "Disable client.")] #[put("/{id}/disable")] diff --git a/src/routes/client/enable.rs b/src/routes/client/enable.rs index c87fc44..200f080 100644 --- a/src/routes/client/enable.rs +++ b/src/routes/client/enable.rs @@ -6,7 +6,6 @@ use crate::models; use actix_web::{put, web, Responder, Result}; use sqlx::PgPool; use std::sync::Arc; -use tracing::Instrument; #[tracing::instrument(name = "Enable client.")] #[put("/{id}/enable")] diff --git a/src/routes/client/update.rs b/src/routes/client/update.rs index 5f19de5..243cfa8 100644 --- a/src/routes/client/update.rs +++ b/src/routes/client/update.rs @@ -5,7 +5,6 @@ use crate::{configuration::Settings, helpers::JsonResponse}; use actix_web::{put, web, Responder, Result}; use sqlx::PgPool; use std::sync::Arc; -use tracing::Instrument; #[tracing::instrument(name = "Update client.")] #[put("/{id}")] diff --git a/src/routes/cloud/add.rs b/src/routes/cloud/add.rs index 6178ff1..ca8e7b8 100644 --- a/src/routes/cloud/add.rs +++ b/src/routes/cloud/add.rs @@ -5,7 +5,6 @@ use crate::models; use crate::db; use actix_web::{post, web, Responder, Result}; use sqlx::PgPool; -use tracing::Instrument; use std::sync::Arc; use serde_valid::Validate; diff --git a/src/routes/cloud/delete.rs b/src/routes/cloud/delete.rs index b69aec4..94188c7 100644 --- a/src/routes/cloud/delete.rs +++ b/src/routes/cloud/delete.rs @@ -3,8 +3,6 @@ use crate::models; use actix_web::{delete, web, Responder, Result}; use sqlx::PgPool; use std::sync::Arc; -use futures_util::FutureExt; -use tracing::Instrument; use crate::db; use crate::models::Cloud; @@ -15,7 +13,7 @@ pub async fn item( path: web::Path<(i32,)>, pg_pool: web::Data, ) -> Result { - /// Get cloud apps of logged user only + // Get cloud apps of logged user only let (id,) = path.into_inner(); let cloud = db::cloud::fetch(pg_pool.get_ref(), id) diff --git a/src/routes/cloud/get.rs b/src/routes/cloud/get.rs index 62f2a47..23af867 100644 --- a/src/routes/cloud/get.rs +++ b/src/routes/cloud/get.rs @@ -4,7 +4,6 @@ use crate::helpers::JsonResponse; use crate::models; use actix_web::{get, web, Responder, Result}; use sqlx::PgPool; -use tracing::Instrument; // workflow // add, update, list, get(user_id), ACL, diff --git a/src/routes/cloud/mod.rs b/src/routes/cloud/mod.rs index bc45c2a..e4ea6c1 100644 --- a/src/routes/cloud/mod.rs +++ b/src/routes/cloud/mod.rs @@ -3,7 +3,7 @@ pub mod get; pub mod update; pub(crate) mod delete; -pub use add::*; -pub use get::*; -pub use update::*; -pub use delete::*; +// pub use add::*; +// pub use get::*; +// pub use update::*; +// pub use delete::*; diff --git a/src/routes/cloud/update.rs b/src/routes/cloud/update.rs index b48fd8b..bdc0a15 100644 --- a/src/routes/cloud/update.rs +++ b/src/routes/cloud/update.rs @@ -2,11 +2,10 @@ use crate::forms; use crate::helpers::JsonResponse; use crate::models; use crate::db; -use actix_web::{web, web::Data, Responder, Result, post, put}; +use actix_web::{web, web::Data, Responder, Result, put}; use serde_valid::Validate; use sqlx::PgPool; use std::sync::Arc; -use tracing::Instrument; use std::ops::Deref; #[tracing::instrument(name = "Update cloud.")] diff --git a/src/routes/project/compose.rs b/src/routes/project/compose.rs index d0ea432..de52ca0 100644 --- a/src/routes/project/compose.rs +++ b/src/routes/project/compose.rs @@ -40,7 +40,7 @@ pub async fn admin( path: web::Path<(i32,)>, pg_pool: Data, ) -> Result { - /// Admin function for generating compose file for specified user + // Admin function for generating compose file for specified user let id = path.0; let project = db::project::fetch(pg_pool.get_ref(), id) .await diff --git a/src/routes/project/delete.rs b/src/routes/project/delete.rs index c0e1b26..92c6d98 100644 --- a/src/routes/project/delete.rs +++ b/src/routes/project/delete.rs @@ -3,8 +3,6 @@ use crate::models; use actix_web::{delete, web, Responder, Result}; use sqlx::PgPool; use std::sync::Arc; -use futures_util::FutureExt; -use tracing::Instrument; use crate::db; use crate::models::Project; @@ -15,7 +13,7 @@ pub async fn item( path: web::Path<(i32,)>, pg_pool: web::Data, ) -> Result { - /// Get project apps of logged user only + // Get project apps of logged user only let (id,) = path.into_inner(); let project = db::project::fetch(pg_pool.get_ref(), id) diff --git a/src/routes/project/get.rs b/src/routes/project/get.rs index 3cd7fc3..e5d5a16 100644 --- a/src/routes/project/get.rs +++ b/src/routes/project/get.rs @@ -4,7 +4,6 @@ use crate::models; use actix_web::{get, web, Responder, Result}; use sqlx::PgPool; use std::sync::Arc; -use tracing::Instrument; #[tracing::instrument(name = "Get logged user project.")] #[get("/{id}")] @@ -13,7 +12,7 @@ pub async fn item( path: web::Path<(i32,)>, pg_pool: web::Data, ) -> Result { - /// Get project apps of logged user only + // Get project apps of logged user only let (id,) = path.into_inner(); db::project::fetch(pg_pool.get_ref(), id) @@ -35,9 +34,9 @@ pub async fn list( path: web::Path<(String,)>, pg_pool: web::Data, ) -> Result { - /// This is admin endpoint, used by a client app, client app is confidential - /// it should return projects by user id - /// in order to pass validation at external deployment service + // This is admin endpoint, used by a client app, client app is confidential + // it should return projects by user id + // in order to pass validation at external deployment service let user_id = path.into_inner().0; db::project::fetch_by_user(pg_pool.get_ref(), &user_id) diff --git a/src/routes/project/mod.rs b/src/routes/project/mod.rs index 6d66205..05f7de8 100644 --- a/src/routes/project/mod.rs +++ b/src/routes/project/mod.rs @@ -5,7 +5,7 @@ pub mod update; pub(crate) mod compose; pub(crate) mod delete; -pub use add::*; -pub use update::*; -pub use deploy::*; -pub use get::*; +pub use add::item; +// pub use update::*; +// pub use deploy::*; +// pub use get::*; diff --git a/src/routes/project/update.rs b/src/routes/project/update.rs index ecd72b1..4fc8568 100644 --- a/src/routes/project/update.rs +++ b/src/routes/project/update.rs @@ -9,7 +9,6 @@ use serde_valid::Validate; use sqlx::PgPool; use std::sync::Arc; use actix_web::web::Bytes; -use tracing::Instrument; use std::str; #[tracing::instrument(name = "Update project.")] diff --git a/src/routes/rating/add.rs b/src/routes/rating/add.rs index 74bc725..e91dbe6 100644 --- a/src/routes/rating/add.rs +++ b/src/routes/rating/add.rs @@ -4,7 +4,6 @@ use crate::models; use crate::db; use actix_web::{post, web, Responder, Result}; use sqlx::PgPool; -use tracing::Instrument; use std::sync::Arc; // workflow diff --git a/src/routes/rating/get.rs b/src/routes/rating/get.rs index 34ef6af..c34677d 100644 --- a/src/routes/rating/get.rs +++ b/src/routes/rating/get.rs @@ -3,7 +3,6 @@ use crate::helpers::JsonResponse; use crate::models; use actix_web::{get, web, Responder, Result}; use sqlx::PgPool; -use tracing::Instrument; // workflow // add, update, list, get(user_id), ACL, diff --git a/src/routes/server/delete.rs b/src/routes/server/delete.rs index cd9a824..35440ec 100644 --- a/src/routes/server/delete.rs +++ b/src/routes/server/delete.rs @@ -3,7 +3,6 @@ use crate::models; use actix_web::{delete, web, Responder, Result}; use sqlx::PgPool; use std::sync::Arc; -use tracing::Instrument; use crate::db; use crate::models::Server; @@ -14,7 +13,7 @@ pub async fn item( path: web::Path<(i32,)>, pg_pool: web::Data, ) -> Result { - /// Get server apps of logged user only + // Get server apps of logged user only let (id,) = path.into_inner(); let server = db::server::fetch(pg_pool.get_ref(), id) diff --git a/src/routes/server/get.rs b/src/routes/server/get.rs index 23bac5b..de33e8f 100644 --- a/src/routes/server/get.rs +++ b/src/routes/server/get.rs @@ -4,7 +4,7 @@ use crate::helpers::JsonResponse; use crate::models; use actix_web::{get, web, Responder, Result}; use sqlx::PgPool; -use tracing::Instrument; +// use tracing::Instrument; // workflow // add, update, list, get(user_id), ACL, diff --git a/src/routes/server/mod.rs b/src/routes/server/mod.rs index af796d2..8ef07d3 100644 --- a/src/routes/server/mod.rs +++ b/src/routes/server/mod.rs @@ -3,7 +3,7 @@ pub(crate) mod get; pub(crate) mod delete; pub(crate) mod update; -pub use get::*; -pub use add::*; -pub use update::*; -pub use delete::*; +// pub use get::*; +// pub use add::*; +// pub use update::*; +// pub use delete::*; diff --git a/src/routes/server/update.rs b/src/routes/server/update.rs index 4f83d1e..2be6aca 100644 --- a/src/routes/server/update.rs +++ b/src/routes/server/update.rs @@ -2,11 +2,10 @@ use crate::forms; use crate::helpers::JsonResponse; use crate::models; use crate::db; -use actix_web::{web, web::Data, Responder, Result, post, put}; +use actix_web::{web, web::Data, Responder, Result, put}; use serde_valid::Validate; use sqlx::PgPool; use std::sync::Arc; -use tracing::Instrument; use std::ops::Deref; #[tracing::instrument(name = "Update server.")] From 2f0507df0d1ad4032bb175c2f72d06af09322262 Mon Sep 17 00:00:00 2001 From: vsilent Date: Fri, 22 Mar 2024 14:24:01 +0200 Subject: [PATCH 6/7] reconfigure rabbitmq queue --- src/console/commands/mq/listener.rs | 37 ++++++++++++++++++----------- src/forms/project/payload.rs | 3 ++- src/helpers/compressor.rs | 7 ------ src/helpers/dockerhub.rs | 2 ++ src/helpers/mq_manager.rs | 14 ++++------- src/routes/project/deploy.rs | 11 +++++++-- src/startup.rs | 1 + 7 files changed, 42 insertions(+), 33 deletions(-) diff --git a/src/console/commands/mq/listener.rs b/src/console/commands/mq/listener.rs index a9e6c04..3e67a9c 100644 --- a/src/console/commands/mq/listener.rs +++ b/src/console/commands/mq/listener.rs @@ -10,7 +10,6 @@ use crate::db; use crate::helpers::mq_manager::MqManager; use futures_lite::stream::StreamExt; use serde_derive::{Deserialize, Serialize}; -// use crate::forms::project::ProjectForm; pub struct ListenCommand { } @@ -44,10 +43,13 @@ impl crate::console::commands::CallableTrait for ListenCommand { println!("Declare exchange"); let mq_manager = MqManager::try_new(settings.amqp.connection_string())?; + // let queue_name = "stacker_listener"; + let queue_name = "install_progress_m383emvfP9zQKs8lkgSU_Q"; let consumer_channel= mq_manager .consume( "install_progress", - "install.progress.#" + queue_name, + "install.progress.*.*.*" ) .await?; @@ -55,7 +57,7 @@ impl crate::console::commands::CallableTrait for ListenCommand { println!("Declare queue"); let mut consumer = consumer_channel .basic_consume( - "#", + queue_name, "console_listener", BasicConsumeOptions::default(), FieldTable::default(), @@ -73,32 +75,39 @@ impl crate::console::commands::CallableTrait for ListenCommand { Err(e) => panic!("Invalid UTF-8 sequence: {}", e), }; - println!("incoming data {:?}", s); - let statuses = vec!["complete", "paused", "failed"]; + let statuses = vec![ + "completed", + "paused", + "failed", + "in_progress", + "error", + "wait_resume", + "wait_start", + "confirmed" + ]; match serde_json::from_str::(&s) { Ok(msg) => { println!("message {:?}", s); - // println!("id {:?}", msg.id); - // println!("status {:?}", msg.status); - if statuses.contains(&(msg.status.as_str())) { - println!("Process on complete status"); + if statuses.contains(&(msg.status.as_ref())) && msg.deploy_id.is_some() { + println!("Update DB on status change .."); let id = msg.deploy_id.unwrap() .parse::() - .map_err(|_err| "Could not parse deployment id".to_string() )?; + .map_err(|_err| "Could not parse deployment id".to_string())?; - match crate::db::deployment::fetch( + match deployment::fetch( db_pool.get_ref(), id ) .await? { - Some(mut row) => { row.status = msg.status; row.updated_at = Utc::now(); + println!("Deployment {} updated with status {}", + &id, &row.status + ); deployment::update(db_pool.get_ref(), row).await?; - println!("deployment {} completed successfully", id); } - None => println!("Deployment record not found in db") + None => println!("Deployment record was not found in db") } } } diff --git a/src/forms/project/payload.rs b/src/forms/project/payload.rs index bc8e8fb..d7e84d4 100644 --- a/src/forms/project/payload.rs +++ b/src/forms/project/payload.rs @@ -8,6 +8,7 @@ use serde_valid::Validate; #[serde(rename_all = "snake_case")] pub struct Payload { pub(crate) id: Option, + pub(crate) project_id: Option, pub(crate) user_token: Option, pub(crate) user_email: Option, #[serde(flatten)] @@ -30,7 +31,7 @@ impl TryFrom<&models::Project> for Payload { format!("{:?}", err) })?; - project_data.id = Some(project.id.clone()); + project_data.project_id = Some(project.id); Ok(project_data) } diff --git a/src/helpers/compressor.rs b/src/helpers/compressor.rs index 624747d..c6e1258 100644 --- a/src/helpers/compressor.rs +++ b/src/helpers/compressor.rs @@ -10,11 +10,4 @@ pub fn compress(input: &str) -> Vec { compressor.flush().unwrap(); drop(compressor); compressed -} - -pub fn decompress(input: &[u8]) -> String { - let mut decompressed = String::new(); - let mut decompressor = Decompressor::new(input, 4096); - decompressor.read_to_string(&mut decompressed).unwrap(); - decompressed } \ No newline at end of file diff --git a/src/helpers/dockerhub.rs b/src/helpers/dockerhub.rs index fa41df7..81f4feb 100644 --- a/src/helpers/dockerhub.rs +++ b/src/helpers/dockerhub.rs @@ -133,6 +133,7 @@ impl<'a> DockerHub<'a> { tracing::debug!("✅ Image is active"); active } else { + tracing::debug!("🟥 Image tag is not active"); false } }) @@ -169,6 +170,7 @@ impl<'a> DockerHub<'a> { tracing::debug!("✅ Image is active"); active } else { + tracing::debug!("🟥 Image tag is not active"); false } }) diff --git a/src/helpers/mq_manager.rs b/src/helpers/mq_manager.rs index f205f1a..dfcda23 100644 --- a/src/helpers/mq_manager.rs +++ b/src/helpers/mq_manager.rs @@ -102,11 +102,10 @@ impl MqManager { pub async fn consume( &self, exchange_name: &str, + queue_name: &str, routing_key: &str, ) -> Result { - let mut args = FieldTable::default(); - args.insert("x-expires".into(), AMQPValue::LongUInt(180000)); let channel = self.create_channel().await?; channel @@ -120,18 +119,16 @@ impl MqManager { internal: false, nowait: false, }, - args + FieldTable::default() ) .await .expect("Exchange declare failed"); let mut args = FieldTable::default(); - args.insert("x-expires".into(), AMQPValue::LongUInt(180000)); + args.insert("x-expires".into(), AMQPValue::LongUInt(3600000)); let queue = channel.queue_declare( - // "install_progress_all", - "#", - // "install_progress_hy181TZa4DaabUZWklsrxw", + queue_name, QueueDeclareOptions { passive: false, durable: false, @@ -139,7 +136,6 @@ impl MqManager { auto_delete: true, nowait: false, }, - // FieldTable::default(), args, ) .await @@ -147,7 +143,7 @@ impl MqManager { let _ = channel .queue_bind( - queue.name().as_str(), + queue_name, exchange_name, routing_key, QueueBindOptions::default(), diff --git a/src/routes/project/deploy.rs b/src/routes/project/deploy.rs index f3d242a..0fa7030 100644 --- a/src/routes/project/deploy.rs +++ b/src/routes/project/deploy.rs @@ -95,7 +95,11 @@ pub async fn item( let result = db::deployment::insert(pg_pool.get_ref(), deployment) .await - .map(|deployment| deployment) + .map(|deployment| { + payload.id = Some(deployment.id); + deployment + } + ) .map_err(|_| { JsonResponse::::build().internal_server_error("Internal Server Error") }); @@ -194,7 +198,10 @@ pub async fn saved_item( let result = db::deployment::insert(pg_pool.get_ref(), deployment) .await - .map(|deployment| deployment) + .map(|deployment| { + payload.id = Some(deployment.id); + deployment + }) .map_err(|_| { JsonResponse::::build().internal_server_error("Internal Server Error") }); diff --git a/src/startup.rs b/src/startup.rs index 2c7c4ba..29d0c9b 100644 --- a/src/startup.rs +++ b/src/startup.rs @@ -72,6 +72,7 @@ pub async fn run( )) .wrap(Cors::permissive()) .service(crate::routes::project::deploy::item) + .service(crate::routes::project::deploy::saved_item) .service(crate::routes::project::compose::add) .service(crate::routes::project::compose::admin) .service(crate::routes::project::get::item) From b1295b8df088bf5d52d895674e65fab6302fbbab Mon Sep 17 00:00:00 2001 From: vsilent Date: Fri, 22 Mar 2024 21:45:09 +0200 Subject: [PATCH 7/7] docker image tag validation fix --- src/console/commands/mq/listener.rs | 5 +++-- src/forms/project/form.rs | 12 ++++-------- src/routes/cloud/delete.rs | 2 +- src/routes/project/deploy.rs | 8 ++++++-- src/routes/project/update.rs | 2 +- 5 files changed, 15 insertions(+), 14 deletions(-) diff --git a/src/console/commands/mq/listener.rs b/src/console/commands/mq/listener.rs index 3e67a9c..5d4b0c7 100644 --- a/src/console/commands/mq/listener.rs +++ b/src/console/commands/mq/listener.rs @@ -43,8 +43,9 @@ impl crate::console::commands::CallableTrait for ListenCommand { println!("Declare exchange"); let mq_manager = MqManager::try_new(settings.amqp.connection_string())?; - // let queue_name = "stacker_listener"; - let queue_name = "install_progress_m383emvfP9zQKs8lkgSU_Q"; + let queue_name = "stacker_listener"; + // let queue_name = "install_progress_m383emvfP9zQKs8lkgSU_Q"; + // let queue_name = "install_progress_hy181TZa4DaabUZWklsrxw"; let consumer_channel= mq_manager .consume( "install_progress", diff --git a/src/forms/project/form.rs b/src/forms/project/form.rs index 3924c6c..840bfaa 100644 --- a/src/forms/project/form.rs +++ b/src/forms/project/form.rs @@ -23,19 +23,16 @@ impl TryFrom<&models::Project> for ProjectForm { impl ProjectForm { pub async fn is_readable_docker_image(&self) -> Result { - let mut is_active = true; for app in &self.custom.web { if !app.app.docker_image.is_active().await? { - is_active = false; - break; + return Ok(false); } } if let Some(service) = &self.custom.service { for app in service { if !app.app.docker_image.is_active().await? { - is_active = false; - break; + return Ok(false); } } } @@ -43,12 +40,11 @@ impl ProjectForm { if let Some(features) = &self.custom.feature { for app in features { if !app.app.docker_image.is_active().await? { - is_active = false; - break; + return Ok(false); } } } - Ok(is_active) + Ok(true) } } diff --git a/src/routes/cloud/delete.rs b/src/routes/cloud/delete.rs index 94188c7..2654bde 100644 --- a/src/routes/cloud/delete.rs +++ b/src/routes/cloud/delete.rs @@ -27,7 +27,7 @@ pub async fn item( Some(cloud) => { Ok(cloud) }, - None => Err(JsonResponse::::build().not_found("")) + None => Err(JsonResponse::::build().not_found("not found")) } })?; diff --git a/src/routes/project/deploy.rs b/src/routes/project/deploy.rs index 0fa7030..7cdbb43 100644 --- a/src/routes/project/deploy.rs +++ b/src/routes/project/deploy.rs @@ -137,6 +137,7 @@ pub async fn saved_item( //let cloud_id = Some(1); tracing::debug!("User {:?} is deploying project: {} to cloud: {} ", user, id, cloud_id); + // Validate project let project = db::project::fetch(pg_pool.get_ref(), id) .await .map_err(|err| JsonResponse::::build().internal_server_error(err)) @@ -145,7 +146,8 @@ pub async fn saved_item( None => Err(JsonResponse::::build().not_found("Project not found")), })?; - let id = project.id.clone(); + // Build compose + let id = project.id; let dc = DcBuilder::new(project); let fc = dc.build().map_err(|err| { JsonResponse::::build().internal_server_error(err) @@ -181,6 +183,7 @@ pub async fn saved_item( // let mut payload = forms::project::Payload::default(); let mut payload = forms::project::Payload::try_from(&dc.project) .map_err(|err| JsonResponse::::build().bad_request(err))?; + payload.server = Some(server.into()); payload.cloud = Some(cloud.into()); payload.user_token = Some(user.id.clone()); @@ -188,7 +191,7 @@ pub async fn saved_item( // let compressed = fc.unwrap_or("".to_string()); payload.docker_compose = Some(compress(fc.as_str())); - + // Store deployment attempts into deployment table in db let json_request = dc.project.body.clone(); let deployment = models::Deployment::new( dc.project.id, @@ -209,6 +212,7 @@ pub async fn saved_item( tracing::debug!("Save deployment result: {:?}", result); tracing::debug!("Send project data <<<<<<<<<<<>>>>>>>>>>>>>>>>{:?}", payload); + // Send Payload mq_manager .publish( "install".to_string(), diff --git a/src/routes/project/update.rs b/src/routes/project/update.rs index 4fc8568..9bddfb9 100644 --- a/src/routes/project/update.rs +++ b/src/routes/project/update.rs @@ -47,7 +47,7 @@ pub async fn item( let project_name = form.custom.custom_stack_code.clone(); - if !form.is_readable_docker_image().await.is_ok() { + if Ok(false) == form.is_readable_docker_image().await { return Err(JsonResponse::::build().bad_request("Can not access docker image")); }