Skip to content

Commit

Permalink
chore: add docstring (#21)
Browse files Browse the repository at this point in the history
  • Loading branch information
zk-steve authored Dec 18, 2024
1 parent e7a47da commit 109ab30
Show file tree
Hide file tree
Showing 25 changed files with 923 additions and 340 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/adapter/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ version = "0.0.1"
edition = "2021"

[dependencies]
anyhow = { workspace = true }
async-trait = { workspace = true }
bincode = { workspace = true }
deadpool-diesel = { workspace = true, features = ["postgres", "serde"] }
Expand Down
10 changes: 9 additions & 1 deletion crates/adapter/src/http/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,17 @@ impl PeerPort for PeerClient {
.await
.map_err(|e| CoreError::InternalError(e.into()))?;

// Check the response status and handle any non-OK responses explicitly.
if !response.status().is_success() {
return Err(CoreError::UnexpectedResponse(format!(
"Received non-success status: {}",
response.status()
)));
}

response
.json()
.await
.map_err(|e| CoreError::ParseResponseError(e.into()))
.map_err(|e| CoreError::InternalError(e.into()))
}
}
46 changes: 21 additions & 25 deletions crates/adapter/src/http/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use frog_core::entities::session::{SessionEntity, SessionId};
use frog_core::errors::CoreError;
use frog_core::errors::CoreError::UnexpectedResponse;
use frog_core::ports::session_client::SessionClientPort;
use log::error;
use reqwest::{Client, StatusCode};

pub struct SessionClient {
Expand All @@ -18,6 +19,21 @@ impl SessionClient {
client,
}
}

/// Helper function to handle HTTP responses.
async fn handle_response(&self, response: reqwest::Response) -> Result<String, CoreError> {
let status = response.status();
let body = response
.text()
.await
.map_err(|e| CoreError::InternalError(e.into()))?;

if status != StatusCode::OK {
error!("Unexpected response with status {}: {}", status, body);
return Err(UnexpectedResponse(body));
}
Ok(body)
}
}

#[async_trait]
Expand All @@ -37,14 +53,8 @@ impl SessionClientPort for SessionClient {
.send()
.await
.map_err(|e| CoreError::InternalError(e.into()))?;
let status = response.status();
if status != StatusCode::OK {
let body = response
.text()
.await
.map_err(|e| CoreError::ParseResponseError(e.into()))?;
return Err(UnexpectedResponse(body));
}

self.handle_response(response).await?;
Ok(())
}

Expand All @@ -61,7 +71,7 @@ impl SessionClientPort for SessionClient {
response
.json::<SessionEntity>()
.await
.map_err(|e| CoreError::ParseResponseError(e.into()))
.map_err(|e| CoreError::InternalError(e.into()))
}

async fn bootstrap(
Expand All @@ -81,14 +91,7 @@ impl SessionClientPort for SessionClient {
.await
.map_err(|e| CoreError::InternalError(e.into()))?;

let status = response.status();
if status != StatusCode::OK {
let body = response
.text()
.await
.map_err(|e| CoreError::ParseResponseError(e.into()))?;
return Err(UnexpectedResponse(body));
}
self.handle_response(response).await?;
Ok(())
}

Expand All @@ -109,14 +112,7 @@ impl SessionClientPort for SessionClient {
.await
.map_err(|e| CoreError::InternalError(e.into()))?;

let status = response.status();
if status != StatusCode::OK {
let body = response
.text()
.await
.map_err(|e| CoreError::ParseResponseError(e.into()))?;
return Err(UnexpectedResponse(body));
}
self.handle_response(response).await?;
Ok(())
}
}
129 changes: 65 additions & 64 deletions crates/adapter/src/postgres/session_db.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use anyhow::Error;
use async_trait::async_trait;
use deadpool_diesel::postgres::Pool;
use diesel::{
Expand Down Expand Up @@ -25,93 +26,93 @@ impl SessionDBRepository {
}
}

fn map_diesel_error(err: diesel::result::Error) -> CoreError {
match err {
diesel::result::Error::NotFound => CoreError::NotFound,
_ => CoreError::InternalError(err.into()),
}
}

#[async_trait]
impl SessionPort for SessionDBRepository {
async fn create(&self, session_entity: SessionEntity) -> Result<SessionId, CoreError> {
self.db
let conn = self
.db
.get()
.await
.unwrap()
.interact(move |conn| {
let session = SessionModel::try_from(session_entity)
.map_err(|err| CoreError::InternalError(err.into()))?;
let response = insert_into(sessions)
.values(&session)
.get_result::<SessionModel>(conn)
.map_err(|err| match err {
diesel::result::Error::NotFound => CoreError::NotFound,
_ => CoreError::InternalError(err.into()),
})?;
Ok(SessionId(response.id))
})
.await
.unwrap()
.map_err(|e| CoreError::InternalError(e.into()))?;
conn.interact(move |conn| {
let session = SessionModel::try_from(session_entity)
.map_err(|err| CoreError::InternalError(err.into()))?;
let response = insert_into(sessions)
.values(&session)
.get_result::<SessionModel>(conn)
.map_err(map_diesel_error)?;
Ok(SessionId(response.id))
})
.await
.map_err(|e| CoreError::InternalError(Error::msg(e.to_string())))?
}

async fn get(&self, session_id: SessionId) -> Result<SessionEntity, CoreError> {
self.db
let conn = self
.db
.get()
.await
.unwrap()
.interact(move |conn| {
let response = sessions
.filter(id.eq(session_id.0))
.select(SessionModel::as_select())
.first(conn)
.map_err(|err| match err {
diesel::result::Error::NotFound => CoreError::NotFound,
_ => CoreError::InternalError(err.into()),
})?
.into();

Ok(response)
})
.await
.unwrap()
.map_err(|e| CoreError::InternalError(e.into()))?;
conn.interact(move |conn| {
let response = sessions
.filter(id.eq(session_id.0))
.select(SessionModel::as_select())
.first::<SessionModel>(conn)
.map_err(map_diesel_error)?
.into();
Ok(response)
})
.await
.map_err(|e| CoreError::InternalError(Error::msg(e.to_string())))?
}

async fn update(
&self,
session_id: SessionId,
session_entity: SessionEntity,
) -> Result<SessionId, CoreError> {
assert_eq!(session_id, session_entity.id);
self.db
if session_id != session_entity.id {
return Err(CoreError::ValidationFail("Session ID mismatch".to_string()));
}

let conn = self
.db
.get()
.await
.unwrap()
.interact(move |conn| {
let session = SessionModel::try_from(session_entity)
.map_err(|err| CoreError::InternalError(err.into()))?;
let response = update(sessions.filter(id.eq(session.id)))
.set(&session)
.get_result::<SessionModel>(conn)
.map_err(|err| match err {
diesel::result::Error::NotFound => CoreError::NotFound,
_ => CoreError::InternalError(err.into()),
})?;
Ok(SessionId(response.id))
})
.await
.unwrap()
.map_err(|e| CoreError::InternalError(e.into()))?;
conn.interact(move |conn| {
let session = SessionModel::try_from(session_entity)
.map_err(|err| CoreError::InternalError(err.into()))?;
let response = update(sessions.filter(id.eq(session.id)))
.set(&session)
.get_result::<SessionModel>(conn)
.map_err(map_diesel_error)?;
Ok(SessionId(response.id))
})
.await
.map_err(|e| CoreError::InternalError(Error::msg(e.to_string())))?
}

async fn delete(&self, session_id: SessionId) -> Result<(), CoreError> {
self.db
let conn = self
.db
.get()
.await
.unwrap()
.interact(move |conn| {
let _ = delete(sessions.filter(id.eq(session_id.0)))
.execute(conn)
.map_err(|err| match err {
diesel::result::Error::NotFound => CoreError::NotFound,
_ => CoreError::InternalError(err.into()),
})?;

Ok(())
})
.await
.unwrap()
.map_err(|e| CoreError::InternalError(e.into()))?;
conn.interact(move |conn| {
delete(sessions.filter(id.eq(session_id.0)))
.execute(conn)
.map_err(map_diesel_error)?;
Ok(())
})
.await
.map_err(|e| CoreError::InternalError(Error::msg(e.to_string())))?
}
}
Loading

0 comments on commit 109ab30

Please sign in to comment.