Skip to content
This repository has been archived by the owner on Dec 18, 2020. It is now read-only.

Commit

Permalink
Use a connection pool
Browse files Browse the repository at this point in the history
  • Loading branch information
foldu committed Dec 21, 2019
1 parent d070381 commit 68bb3dd
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 26 deletions.
78 changes: 78 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ directories = "2.0.2"
url = { version = "2.1.0", features = ["serde"] }
toml = "0.5.5"
tonic = "0.1.0-beta.1"
deadpool-postgres = "0.4.0"

[build-dependencies]
tonic-build = "0.1.0-beta.1"
Expand Down
49 changes: 23 additions & 26 deletions src/bin/server.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
use std::{convert::TryFrom, sync::Arc};

use futures_util::{
future,
future::{FutureExt, TryFutureExt},
};
use deadpool_postgres::{Manager, Pool};
use futures_util::{future, future::TryFutureExt};
use serde::Deserialize;
use tgcd::raw::{
tgcd_server,
Expand All @@ -15,7 +13,6 @@ use tgcd::raw::{
Tags,
};
use thiserror::Error;
use tokio::sync::Mutex;
use tokio_postgres as postgres;
use tonic::{transport::Server, Request, Response, Status};

Expand All @@ -24,31 +21,35 @@ use tgcd::{Blake2bHash, HashError, Tag, TagError};
#[derive(Deserialize)]
struct Config {
postgres_url: String,

#[serde(default = "port")]
port: u16,

#[serde(default = "pool_size")]
pool_size: usize,
}

fn port() -> u16 {
8080
}

fn pool_size() -> usize {
16
}

#[derive(Clone)]
struct Tgcd {
inner: Arc<TgcdInner>,
}

impl Tgcd {
async fn new(cfg: &Config) -> Result<Self, SetupError> {
let (mut client, connection) = postgres::connect(&cfg.postgres_url, postgres::NoTls)
.map_err(SetupError::PostgresConnect)
.await?;

tokio::spawn(connection.map(|r| {
if let Err(e) = r {
log::error!("{}", e);
}
}));
// TODO: remove unwrap
let postgres_config = cfg.postgres_url.parse().unwrap();
let mngr = Manager::new(postgres_config, postgres::NoTls);
let pool = Pool::new(mngr, cfg.pool_size);

let mut client = pool.get().await.unwrap();
let txn = client.transaction().await.unwrap();
let schema = include_str!("../../sql/schema.sql");
let _ = txn
Expand All @@ -58,15 +59,13 @@ impl Tgcd {
txn.commit().await.unwrap();

Ok(Self {
inner: Arc::new(TgcdInner {
client: Mutex::new(client),
}),
inner: Arc::new(TgcdInner { pool }),
})
}
}

struct TgcdInner {
client: Mutex<postgres::Client>,
pool: Pool,
}

#[derive(Error, Debug)]
Expand Down Expand Up @@ -196,15 +195,15 @@ async fn add_tags_to_hash(
#[tonic::async_trait]
impl tgcd_server::Tgcd for Tgcd {
async fn get_tags(&self, req: Request<Hash>) -> Result<Response<Tags>, Status> {
let client = self.inner.client.lock().await;
let client = self.inner.pool.get().await.unwrap();
let hash = Blake2bHash::try_from(&*req.into_inner().hash).map_err(Error::ArgHash)?;
let tags = get_tags(&client, &hash).await?;

Ok(Response::new(Tags { tags }))
}

async fn add_tags_to_hash(&self, req: Request<AddTags>) -> Result<Response<()>, Status> {
let mut client = self.inner.client.lock().await;
let mut client = self.inner.pool.get().await.unwrap();
let AddTags { hash, tags } = req.into_inner();
let hash = Blake2bHash::try_from(&*hash).map_err(Error::ArgHash)?;
let tags = tags
Expand All @@ -225,7 +224,7 @@ impl tgcd_server::Tgcd for Tgcd {
&self,
req: Request<GetMultipleTagsReq>,
) -> Result<Response<GetMultipleTagsResp>, Status> {
let client = self.inner.client.lock().await;
let client = self.inner.pool.get().await.unwrap();
let hashes = req.into_inner().hashes;
let hashes = hashes
.into_iter()
Expand All @@ -236,8 +235,7 @@ impl tgcd_server::Tgcd for Tgcd {
let tags = future::try_join_all(
hashes
.iter()
.map(|hash| get_tags(&client, &hash).map_ok(|tags| Tags { tags }))
.collect::<Vec<_>>(),
.map(|hash| get_tags(&client, &hash).map_ok(|tags| Tags { tags })),
)
.await?;

Expand All @@ -249,7 +247,7 @@ impl tgcd_server::Tgcd for Tgcd {
src_hash,
dest_hash,
} = req.into_inner();
let mut client = self.inner.client.lock().await;
let mut client = self.inner.pool.get().await.unwrap();

let src_hash = Blake2bHash::try_from(&*src_hash).map_err(Error::ArgHash)?;
let dest_hash = Blake2bHash::try_from(&*dest_hash).map_err(Error::ArgHash)?;
Expand All @@ -270,12 +268,11 @@ impl tgcd_server::Tgcd for Tgcd {

async fn run() -> Result<(), SetupError> {
let config: Config = envy::from_env()?;
let addr = format!("0.0.0.0:{}", config.port).parse().unwrap();
let tgcd = Tgcd::new(&config).await?;

Server::builder()
.add_service(tgcd_server::TgcdServer::new(tgcd))
.serve(addr)
.serve(([0, 0, 0, 0], config.port).into())
.await?;

Ok(())
Expand Down

0 comments on commit 68bb3dd

Please sign in to comment.