Skip to content

Commit

Permalink
Merge pull request dani-garcia#1579 from jjlin/job-scheduler
Browse files Browse the repository at this point in the history
Add support for auto-deleting trashed items
  • Loading branch information
dani-garcia authored Apr 6, 2021
2 parents a2955da + 90e0b7f commit 4e64dbd
Show file tree
Hide file tree
Showing 11 changed files with 180 additions and 33 deletions.
17 changes: 17 additions & 0 deletions .env.template
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,23 @@
# WEBSOCKET_ADDRESS=0.0.0.0
# WEBSOCKET_PORT=3012

## Job scheduler settings
##
## Job schedules use a cron-like syntax (as parsed by https://crates.io/crates/cron),
## and are always in terms of UTC time (regardless of your local time zone settings).
##
## How often (in ms) the job scheduler thread checks for jobs that need running.
## Set to 0 to globally disable scheduled jobs.
# JOB_POLL_INTERVAL_MS=30000
##
## Cron schedule of the job that checks for Sends past their deletion date.
## Defaults to hourly (5 minutes after the hour). Set blank to disable this job.
# SEND_PURGE_SCHEDULE="0 5 * * * *"
##
## Cron schedule of the job that checks for trashed items to delete permanently.
## Defaults to daily (5 minutes after midnight). Set blank to disable this job.
# TRASH_PURGE_SCHEDULE="0 5 0 * * *"

## Enable extended logging, which shows timestamps and targets in the logs
# EXTENDED_LOGGING=true

Expand Down
33 changes: 32 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 10 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ chrono = { version = "0.4.19", features = ["serde"] }
chrono-tz = "0.5.3"
time = "0.2.26"

# Job scheduler
job_scheduler = "1.2.1"

# TOTP library
oath = "0.10.2"

Expand Down Expand Up @@ -136,3 +139,10 @@ rocket_contrib = { git = 'https://github.com/SergioBenitez/Rocket', rev = '263e3

# For favicon extraction from main website
data-url = { git = 'https://github.com/servo/rust-url', package="data-url", rev = '540ede02d0771824c0c80ff9f57fe8eff38b1291' }

# The maintainer of the `job_scheduler` crate doesn't seem to have responded
# to any issues or PRs for almost a year (as of April 2021). This hopefully
# temporary fork updates Cargo.toml to use more up-to-date dependencies.
# In particular, `cron` has since implemented parsing of some common syntax
# that wasn't previously supported (https://github.com/zslayton/cron/pull/64).
job_scheduler = { git = 'https://github.com/jjlin/job_scheduler', rev = 'ee023418dbba2bfe1e30a5fd7d937f9e33739806' }
11 changes: 10 additions & 1 deletion src/api/core/ciphers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use crate::{
api::{self, EmptyResult, JsonResult, JsonUpcase, Notify, PasswordData, UpdateType},
auth::Headers,
crypto,
db::{models::*, DbConn},
db::{models::*, DbConn, DbPool},
CONFIG,
};

Expand Down Expand Up @@ -77,6 +77,15 @@ pub fn routes() -> Vec<Route> {
]
}

pub fn purge_trashed_ciphers(pool: DbPool) {
debug!("Purging trashed ciphers");
if let Ok(conn) = pool.get() {
Cipher::purge_trash(&conn);
} else {
error!("Failed to get DB connection while purging trashed ciphers")
}
}

#[derive(FromForm, Default)]
struct SyncData {
#[form(field = "excludeDomains")]
Expand Down
3 changes: 2 additions & 1 deletion src/api/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ mod organizations;
pub mod two_factor;
mod sends;

pub use sends::start_send_deletion_scheduler;
pub use ciphers::purge_trashed_ciphers;
pub use sends::purge_sends;

pub fn routes() -> Vec<Route> {
let mut mod_routes = routes![
Expand Down
24 changes: 8 additions & 16 deletions src/api/core/sends.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use serde_json::Value;
use crate::{
api::{ApiResult, EmptyResult, JsonResult, JsonUpcase, Notify, UpdateType},
auth::{Headers, Host},
db::{models::*, DbConn},
db::{models::*, DbConn, DbPool},
CONFIG,
};

Expand All @@ -27,21 +27,13 @@ pub fn routes() -> Vec<rocket::Route> {
]
}

pub fn start_send_deletion_scheduler(pool: crate::db::DbPool) {
std::thread::spawn(move || {
loop {
if let Ok(conn) = pool.get() {
info!("Initiating send deletion");
for send in Send::find_all(&conn) {
if chrono::Utc::now().naive_utc() >= send.deletion_date {
send.delete(&conn).ok();
}
}
}

std::thread::sleep(std::time::Duration::from_secs(3600));
}
});
pub fn purge_sends(pool: DbPool) {
debug!("Purging sends");
if let Ok(conn) = pool.get() {
Send::purge(&conn);
} else {
error!("Failed to get DB connection while purging sends")
}
}

#[derive(Deserialize)]
Expand Down
3 changes: 2 additions & 1 deletion src/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ use serde_json::Value;

pub use crate::api::{
admin::routes as admin_routes,
core::purge_sends,
core::purge_trashed_ciphers,
core::routes as core_routes,
core::start_send_deletion_scheduler,
icons::routes as icons_routes,
identity::routes as identity_routes,
notifications::routes as notifications_routes,
Expand Down
16 changes: 16 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,17 @@ make_config! {
/// Websocket port
websocket_port: u16, false, def, 3012;
},
jobs {
/// Job scheduler poll interval |> How often the job scheduler thread checks for jobs to run.
/// Set to 0 to globally disable scheduled jobs.
job_poll_interval_ms: u64, false, def, 30_000;
/// Send purge schedule |> Cron schedule of the job that checks for Sends past their deletion date.
/// Defaults to hourly. Set blank to disable this job.
send_purge_schedule: String, false, def, "0 5 * * * *".to_string();
/// Trash purge schedule |> Cron schedule of the job that checks for trashed items to delete permanently.
/// Defaults to daily. Set blank to disable this job.
trash_purge_schedule: String, false, def, "0 5 0 * * *".to_string();
},

/// General settings
settings {
Expand All @@ -339,6 +350,11 @@ make_config! {
/// Per-organization attachment limit (KB) |> Limit in kilobytes for an organization attachments, once the limit is exceeded it won't be possible to upload more
org_attachment_limit: i64, true, option;

/// Trash auto-delete days |> Number of days to wait before auto-deleting a trashed item.
/// If unset, trashed items are not auto-deleted. This setting applies globally, so make
/// sure to inform all users of any changes to this setting.
trash_auto_delete_days: i64, true, option;

/// Disable icon downloads |> Set to true to disable icon downloading, this would still serve icons from
/// $ICON_CACHE_FOLDER, but it won't produce any external network request. Needs to set $ICON_CACHE_TTL to 0,
/// otherwise it will delete them and they won't be downloaded again.
Expand Down
24 changes: 23 additions & 1 deletion src/db/models/cipher.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use chrono::{NaiveDateTime, Utc};
use chrono::{Duration, NaiveDateTime, Utc};
use serde_json::Value;

use crate::CONFIG;

use super::{
Attachment,
CollectionCipher,
Expand Down Expand Up @@ -271,6 +273,17 @@ impl Cipher {
Ok(())
}

/// Purge all ciphers that are old enough to be auto-deleted.
pub fn purge_trash(conn: &DbConn) {
if let Some(auto_delete_days) = CONFIG.trash_auto_delete_days() {
let now = Utc::now().naive_utc();
let dt = now - Duration::days(auto_delete_days);
for cipher in Self::find_deleted_before(&dt, conn) {
cipher.delete(&conn).ok();
}
}
}

pub fn move_to_folder(&self, folder_uuid: Option<String>, user_uuid: &str, conn: &DbConn) -> EmptyResult {
User::update_uuid_revision(user_uuid, conn);

Expand Down Expand Up @@ -511,6 +524,15 @@ impl Cipher {
}}
}

/// Find all ciphers that were deleted before the specified datetime.
pub fn find_deleted_before(dt: &NaiveDateTime, conn: &DbConn) -> Vec<Self> {
db_run! {conn: {
ciphers::table
.filter(ciphers::deleted_at.lt(dt))
.load::<CipherDb>(conn).expect("Error loading ciphers").from_db()
}}
}

pub fn get_collections(&self, user_id: &str, conn: &DbConn) -> Vec<String> {
db_run! {conn: {
ciphers_collections::table
Expand Down
22 changes: 16 additions & 6 deletions src/db/models/send.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,13 @@ impl Send {
}}
}

/// Purge all sends that are past their deletion date.
pub fn purge(conn: &DbConn) {
for send in Self::find_by_past_deletion_date(&conn) {
send.delete(&conn).ok();
}
}

pub fn update_users_revision(&self, conn: &DbConn) {
match &self.user_uuid {
Some(user_uuid) => {
Expand All @@ -223,12 +230,6 @@ impl Send {
Ok(())
}

pub fn find_all(conn: &DbConn) -> Vec<Self> {
db_run! {conn: {
sends::table.load::<SendDb>(conn).expect("Error loading sends").from_db()
}}
}

pub fn find_by_access_id(access_id: &str, conn: &DbConn) -> Option<Self> {
use data_encoding::BASE64URL_NOPAD;
use uuid::Uuid;
Expand Down Expand Up @@ -271,4 +272,13 @@ impl Send {
.load::<SendDb>(conn).expect("Error loading sends").from_db()
}}
}

pub fn find_by_past_deletion_date(conn: &DbConn) -> Vec<Self> {
let now = Utc::now().naive_utc();
db_run! {conn: {
sends::table
.filter(sends::deletion_date.lt(now))
.load::<SendDb>(conn).expect("Error loading sends").from_db()
}}
}
}
50 changes: 44 additions & 6 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@ extern crate diesel;
#[macro_use]
extern crate diesel_migrations;

use job_scheduler::{JobScheduler, Job};
use std::{
fs::create_dir_all,
panic,
path::Path,
process::{exit, Command},
str::FromStr,
thread,
time::Duration,
};

#[macro_use]
Expand Down Expand Up @@ -56,7 +58,9 @@ fn main() {

create_icon_cache_folder();

launch_rocket(extra_debug);
let pool = create_db_pool();
schedule_jobs(pool.clone());
launch_rocket(pool, extra_debug); // Blocks until program termination.
}

const HELP: &str = "\
Expand Down Expand Up @@ -301,17 +305,17 @@ fn check_web_vault() {
}
}

fn launch_rocket(extra_debug: bool) {
let pool = match util::retry_db(db::DbPool::from_config, CONFIG.db_connection_retries()) {
fn create_db_pool() -> db::DbPool {
match util::retry_db(db::DbPool::from_config, CONFIG.db_connection_retries()) {
Ok(p) => p,
Err(e) => {
error!("Error creating database pool: {:?}", e);
exit(1);
}
};

api::start_send_deletion_scheduler(pool.clone());
}
}

fn launch_rocket(pool: db::DbPool, extra_debug: bool) {
let basepath = &CONFIG.domain_path();

// If adding more paths here, consider also adding them to
Expand All @@ -334,3 +338,37 @@ fn launch_rocket(extra_debug: bool) {
// The launch will restore the original logging level
error!("Launch error {:#?}", result);
}

fn schedule_jobs(pool: db::DbPool) {
if CONFIG.job_poll_interval_ms() == 0 {
info!("Job scheduler disabled.");
return;
}
thread::Builder::new().name("job-scheduler".to_string()).spawn(move || {
let mut sched = JobScheduler::new();

// Purge sends that are past their deletion date.
if !CONFIG.send_purge_schedule().is_empty() {
sched.add(Job::new(CONFIG.send_purge_schedule().parse().unwrap(), || {
api::purge_sends(pool.clone());
}));
}

// Purge trashed items that are old enough to be auto-deleted.
if !CONFIG.trash_purge_schedule().is_empty() {
sched.add(Job::new(CONFIG.trash_purge_schedule().parse().unwrap(), || {
api::purge_trashed_ciphers(pool.clone());
}));
}

// Periodically check for jobs to run. We probably won't need any
// jobs that run more often than once a minute, so a default poll
// interval of 30 seconds should be sufficient. Users who want to
// schedule jobs to run more frequently for some reason can reduce
// the poll interval accordingly.
loop {
sched.tick();
thread::sleep(Duration::from_millis(CONFIG.job_poll_interval_ms()));
}
}).expect("Error spawning job scheduler thread");
}

0 comments on commit 4e64dbd

Please sign in to comment.