Skip to content

Commit

Permalink
Merge pull request #41 from trydirect/issue-33
Browse files Browse the repository at this point in the history
Issue 33
  • Loading branch information
vsilent authored Mar 24, 2024
2 parents 0bc8e23 + f53197a commit 5522fa8
Show file tree
Hide file tree
Showing 46 changed files with 236 additions and 141 deletions.
3 changes: 2 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ COPY ./src ./src
# cargo build --release

RUN apt-get update && apt-get install --no-install-recommends -y libssl-dev; \
cargo build --release
cargo build --bin=console && cargo build --release

#RUN ls -la /app/target/release/ >&2

Expand All @@ -51,6 +51,7 @@ RUN mkdir ./files && chmod 0777 ./files

# copy binary and configuration files
COPY --from=builder /app/target/release/server .
COPY --from=builder /app/target/release/console .
COPY --from=builder /app/.env .
COPY --from=builder /app/configuration.yaml .
COPY --from=builder /usr/local/cargo/bin/sqlx sqlx
Expand Down
16 changes: 16 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,22 @@ services:
# stackerdb:
# condition: service_healthy

# stacker_queue:
# image: trydirect/stacker:0.0.7
# container_name: stacker_queue
# restart: always
# volumes:
# - ./configuration.yaml:/app/configuration.yaml
# - ./.env:/app/.env
# environment:
# - RUST_LOG=debug
# - RUST_BACKTRACE=1
# env_file:
# - ./.env
# depends_on:
# stackerdb:
# condition: service_healthy
# entrypoint: /app/console mq listen

# stackerdb:
# container_name: stackerdb
Expand Down
20 changes: 20 additions & 0 deletions docker/dev/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,26 @@ services:
- backend


stacker_queue:
image: trydirect/stacker:0.0.7
container_name: stacker_queue
restart: always
volumes:
- ./configuration.yaml:/app/configuration.yaml
- ./.env:/app/.env
environment:
- RUST_LOG=debug
- RUST_BACKTRACE=1
env_file:
- ./.env
depends_on:
stackerdb:
condition: service_healthy
entrypoint: /app/console mq listen
networks:
- backend


stackerdb:
container_name: stackerdb
healthcheck:
Expand Down
101 changes: 68 additions & 33 deletions src/console/commands/mq/listener.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,29 @@
use crate::configuration::get_configuration;
use actix_web::rt;
use actix_web::web;
use lapin::{Channel, Queue};
use chrono::Utc;
use lapin::options::{BasicAckOptions, BasicConsumeOptions};
use lapin::types::FieldTable;
use sqlx::PgPool;
use db::deployment;
use crate::{db, helpers};
use crate::helpers::mq_manager;
use crate::db;
use crate::helpers::mq_manager::MqManager;
use futures_lite::stream::StreamExt;
use serde_derive::{Deserialize, Serialize};

pub struct ListenCommand {
}

#[derive(Serialize, Deserialize, Debug)]
struct ProgressMessage {
id: String,
deploy_id: Option<String>,
alert: i32,
message: String,
status: String,
progress: String
}

impl ListenCommand {
pub fn new() -> Self {
Self {}
Expand All @@ -31,57 +41,82 @@ impl crate::console::commands::CallableTrait for ListenCommand {

let db_pool = web::Data::new(db_pool);

println!("Declare exchange");
let mq_manager = MqManager::try_new(settings.amqp.connection_string())?;
let queue_name = "stacker_listener";
// let queue_name = "install_progress_m383emvfP9zQKs8lkgSU_Q";
// let queue_name = "install_progress_hy181TZa4DaabUZWklsrxw";
let consumer_channel= mq_manager
.consume(
"install_progress",
"install_progress_*******"
queue_name,
"install.progress.*.*.*"
)
.await?;


println!("Declare queue");
let mut consumer = consumer_channel
.basic_consume(
"install_progress",
queue_name,
"console_listener",
BasicConsumeOptions::default(),
FieldTable::default(),
)
.await
.expect("Basic consume");

// .map_err(|err| format!("Error {:?}", err));
println!("Waiting for messages ..");
while let Some(delivery) = consumer.next().await {
// println!("checking messages delivery {:?}", delivery);
let delivery = delivery.expect("error in consumer");
let s:String = match String::from_utf8(delivery.data.to_owned()) {
//delivery.data is of type Vec<u8>
Ok(v) => v,
Err(e) => panic!("Invalid UTF-8 sequence: {}", e),
};

tracing::info!("will consume");
// if let Ok(consumer) = consumer {
while let Some(delivery) = consumer.next().await {
let delivery = delivery.expect("error in consumer");
delivery.ack(BasicAckOptions::default()).await.expect("ack");
}
// }
let statuses = vec![
"completed",
"paused",
"failed",
"in_progress",
"error",
"wait_resume",
"wait_start",
"confirmed"
];
match serde_json::from_str::<ProgressMessage>(&s) {
Ok(msg) => {
println!("message {:?}", s);

// while let Some(delivery) = consumer.next().await {
// tracing::debug!(message=?delivery, "received message");
// if let Ok(delivery) = delivery {
// delivery
// .ack(BasicAckOptions::default())
// .await
// .expect("basic_ack");
// }
// }
if statuses.contains(&(msg.status.as_ref())) && msg.deploy_id.is_some() {
println!("Update DB on status change ..");
let id = msg.deploy_id.unwrap()
.parse::<i32>()
.map_err(|_err| "Could not parse deployment id".to_string())?;

match deployment::fetch(
db_pool.get_ref(), id
)
.await? {
Some(mut row) => {
row.status = msg.status;
row.updated_at = Utc::now();
println!("Deployment {} updated with status {}",
&id, &row.status
);
deployment::update(db_pool.get_ref(), row).await?;
}
None => println!("Deployment record was not found in db")
}
}
}
Err(_err) => { tracing::debug!("Invalid message format {:?}", _err)}
}

// on_complete()
// let deployment = crate::models::deployment::Deployment {
// id: 0,
// project_id: 0,
// deleted: false,
// status: "".to_string(),
// body: Default::default(),
// created_at: Default::default(),
// updated_at: Default::default(),
// };
// deployment::update(db_pool.get_ref(), deployment).await?;
delivery.ack(BasicAckOptions::default()).await.expect("ack");
}

Ok(())
})
Expand Down
3 changes: 1 addition & 2 deletions src/console/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ fn get_command(cli: Cli) -> Result<Box<dyn stacker::console::commands::CallableT
AppMqCommands::Listen {} => Ok(Box::new(
stacker::console::commands::mq::ListenCommand::new(),
)),
},
_ => Err("command does not match".to_string()),
}
}
}
2 changes: 1 addition & 1 deletion src/db/cloud.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ pub async fn delete(pool: &PgPool, id: i32) -> Result<bool, String> {
});
Ok(true)
}
Err(err) => {
Err(_err) => {
let _ = tx.rollback().await.map_err(|err| println!("{:?}", err));
Ok(false)
}
Expand Down
26 changes: 26 additions & 0 deletions src/db/deployment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,32 @@ use crate::models;
use sqlx::PgPool;
use tracing::Instrument;


pub async fn fetch(pool: &PgPool, id: i32) -> Result<Option<models::Deployment>, String> {
tracing::info!("Fetch deployment {}", id);
sqlx::query_as!(
models::Deployment,
r#"
SELECT
*
FROM deployment
WHERE id=$1
LIMIT 1
"#,
id
)
.fetch_one(pool)
.await
.map(|deployment| Some(deployment))
.or_else(|err| match err {
sqlx::Error::RowNotFound => Ok(None),
e => {
tracing::error!("Failed to fetch deployment, error: {:?}", e);
Err("Could not fetch data".to_string())
}
})
}

pub async fn insert(pool: &PgPool, mut deployment: models::Deployment) -> Result<models::Deployment, String> {
let query_span = tracing::info_span!("Saving new deployment into the database");
sqlx::query!(
Expand Down
6 changes: 3 additions & 3 deletions src/db/project.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,14 +167,14 @@ pub async fn delete(pool: &PgPool, id: i32) -> Result<bool, String> {
})
{
Ok(_) => {
tx.commit().await.map_err(|err| {
let _ = tx.commit().await.map_err(|err| {
tracing::error!("Failed to commit transaction: {:?}", err);
false
});
Ok(true)
}
Err(err) => {
tx.rollback().await.map_err(|err| println!("{:?}", err));
Err(_err) => {
let _ = tx.rollback().await.map_err(|err| println!("{:?}", err));
Ok(false)
}
// todo, when empty commit()
Expand Down
2 changes: 1 addition & 1 deletion src/db/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ pub async fn delete(pool: &PgPool, id: i32) -> Result<bool, String> {
});
Ok(true)
}
Err(err) => {
Err(_err) => {
let _ = tx.rollback().await.map_err(|err| println!("{:?}", err));
Ok(false)
}
Expand Down
1 change: 0 additions & 1 deletion src/forms/cloud.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use crate::models;
use serde::{Deserialize, Serialize};
use serde_valid::Validate;
use chrono::Utc;

#[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize, Validate)]
pub struct Cloud {
Expand Down
2 changes: 1 addition & 1 deletion src/forms/project/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ impl App {


service.ports = dctypes::Ports::Long(ports);
service.restart = Some("always".to_owned());
service.restart = Some(self.restart.clone());
service.volumes = volumes;
service.environment = dctypes::Environment::KvPair(envs);

Expand Down
1 change: 0 additions & 1 deletion src/forms/project/compose_networks.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use serde::{Deserialize, Serialize};
use docker_compose_types as dctypes;
use indexmap::IndexMap;
use crate::forms::project;
use crate::forms::project::network::Network;

#[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)]
Expand Down
18 changes: 5 additions & 13 deletions src/forms/project/form.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
use serde::{Deserialize, Serialize};
use serde_json::Value;
use serde_valid::Validate;
use actix_web::Error;
use actix_web::web::Bytes;
use crate::models;
use crate::forms;
use crate::helpers::JsonResponse;
use std::str;


Expand All @@ -24,31 +20,27 @@ impl TryFrom<&models::Project> for ProjectForm {

impl ProjectForm {
pub async fn is_readable_docker_image(&self) -> Result<bool, String> {
let mut is_active = true;
for app in &self.custom.web {
if !app.app.docker_image.is_active().await? {
is_active = false;
break;
return Ok(false);
}
}

if let Some(service) = &self.custom.service {
for app in service {
if !app.app.docker_image.is_active().await? {
is_active = false;
break;
return Ok(false);
}
}
}

if let Some(features) = &self.custom.feature {
for app in features {
if !app.app.docker_image.is_active().await? {
is_active = false;
break;
return Ok(false);
}
}
}
Ok(is_active)
Ok(true)
}
}
}
2 changes: 0 additions & 2 deletions src/forms/project/network.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
use serde::{Deserialize, Serialize};
use serde_valid::Validate;
use crate::forms::project;
use docker_compose_types as dctypes;
use indexmap::IndexMap;
use crate::forms::project::NetworkDriver;


Expand Down
4 changes: 2 additions & 2 deletions src/forms/project/payload.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
use std::convert::TryFrom;
use crate::models;
use crate::forms;
use serde_json::Value;
use serde::{Deserialize, Serialize};
use serde_valid::Validate;

#[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize, Validate)]
#[serde(rename_all = "snake_case")]
pub struct Payload {
pub(crate) id: Option<i32>,
pub(crate) project_id: Option<i32>,
pub(crate) user_token: Option<String>,
pub(crate) user_email: Option<String>,
#[serde(flatten)]
Expand All @@ -31,7 +31,7 @@ impl TryFrom<&models::Project> for Payload {
format!("{:?}", err)
})?;

project_data.id = Some(project.id.clone());
project_data.project_id = Some(project.id);

Ok(project_data)
}
Expand Down
Loading

0 comments on commit 5522fa8

Please sign in to comment.