Skip to content

Commit

Permalink
Optimize and fix attempt at cloud ingester
Browse files Browse the repository at this point in the history
  • Loading branch information
fogodev committed Oct 22, 2024
1 parent 2fa90d6 commit f2f9f91
Show file tree
Hide file tree
Showing 6 changed files with 194 additions and 108 deletions.
73 changes: 21 additions & 52 deletions core/crates/cloud-services/src/sync/ingest.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,15 @@
use crate::Error;

use sd_core_sync::{from_cloud_crdt_ops, CompressedCRDTOperationsPerModelPerDevice, SyncManager};
use sd_core_sync::SyncManager;

use sd_actors::{Actor, Stopper};
use sd_prisma::prisma::{cloud_crdt_operation, SortOrder};
use sd_utils::timestamp_to_datetime;

use std::{
future::IntoFuture,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
time::SystemTime,
};

use futures::FutureExt;
Expand All @@ -22,8 +19,6 @@ use tracing::{debug, error};

use super::{ReceiveAndIngestNotifiers, SyncActors, ONE_MINUTE};

const BATCH_SIZE: i64 = 1000;

/// Responsible for taking sync operations received from the cloud,
/// and applying them to the local database via the sync system's ingest actor.
Expand All @@ -43,20 +38,14 @@ impl Actor<SyncActors> for Ingester {
Stopped,
}

'outer: loop {
loop {
self.active.store(true, Ordering::Relaxed);
self.active_notify.notify_waiters();

loop {
match self.run_loop_iteration().await {
Ok(IngestStatus::Completed) => break,
Ok(IngestStatus::InProgress) => {}
Err(e) => {
error!(?e, "Error during cloud sync ingester actor iteration");
sleep(ONE_MINUTE).await;
continue 'outer;
}
}
if let Err(e) = self.run_loop_iteration().await {
error!(?e, "Error during cloud sync ingester actor iteration");
sleep(ONE_MINUTE).await;
continue;
}

self.active.store(false, Ordering::Relaxed);
Expand All @@ -79,11 +68,6 @@ impl Actor<SyncActors> for Ingester {
}
}

enum IngestStatus {
Completed,
InProgress,
}

impl Ingester {
pub const fn new(
sync: SyncManager,
Expand All @@ -99,48 +83,33 @@ impl Ingester {
}
}

async fn run_loop_iteration(&self) -> Result<IngestStatus, Error> {
let (ops_ids, ops) = self
async fn run_loop_iteration(&self) -> Result<(), Error> {
let operations_to_ingest_count = self
.sync
.db
.cloud_crdt_operation()
.find_many(vec![])
.take(BATCH_SIZE)
.order_by(cloud_crdt_operation::timestamp::order(SortOrder::Asc))
.count(vec![])
.exec()
.await
.map_err(sd_core_sync::Error::from)?
.into_iter()
.map(from_cloud_crdt_ops)
.collect::<Result<(Vec<_>, Vec<_>), _>>()?;
.map_err(sd_core_sync::Error::from)?;

if ops_ids.is_empty() {
return Ok(IngestStatus::Completed);
if operations_to_ingest_count == 0 {
debug!("Nothing to ingest, early finishing ingester loop");
return Ok(());
}

debug!(
messages_count = ops.len(),
first_message = ?ops
.first()
.map_or_else(|| SystemTime::UNIX_EPOCH.into(), |op| timestamp_to_datetime(op.timestamp)),
last_message = ?ops
.last()
.map_or_else(|| SystemTime::UNIX_EPOCH.into(), |op| timestamp_to_datetime(op.timestamp)),
"Messages to ingest",
operations_to_ingest_count,
"Starting sync messages cloud ingestion loop"
);

self.sync
.ingest_ops(CompressedCRDTOperationsPerModelPerDevice::new(ops))
.await?;
self.sync.ingest_ops().await?;

self.sync
.db
.cloud_crdt_operation()
.delete_many(vec![cloud_crdt_operation::id::in_vec(ops_ids)])
.exec()
.await
.map_err(sd_core_sync::Error::from)?;
debug!(
operations_to_ingest_count,
"Finished sync messages cloud ingestion loop"
);

Ok(IngestStatus::InProgress)
Ok(())
}
}
19 changes: 18 additions & 1 deletion core/crates/sync/src/ingest_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,24 @@ async fn handle_crdt_deletion(
record_id: rmpv::Value,
delete_op: &CompressedCRDTOperation,
) -> Result<(), Error> {
// deletes are the be all and end all, no need to check anything
// deletes are the be all and end all, except if we never created the object to begin with
// in this case we don't need to delete anything

if db
.crdt_operation()
.count(vec![
crdt_operation::model::equals(i32::from(model)),
crdt_operation::record_id::equals(rmp_serde::to_vec(&record_id)?),
])
.exec()
.await?
== 0
{
// This means that in the other device this entry was created and deleted, before this
// device here could even take notice of it. So we don't need to do anything here.
return Ok(());
}

let op = CRDTOperation {
device_pub_id: device_pub_id.into(),
model_id: model,
Expand Down
4 changes: 3 additions & 1 deletion core/crates/sync/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use sd_utils::uuid_to_bytes;

use std::{collections::HashMap, sync::Arc};

use tokio::sync::RwLock;
use tokio::{sync::RwLock, task::JoinError};

pub mod backfill;
mod db_operation;
Expand Down Expand Up @@ -77,6 +77,8 @@ pub enum Error {
EmptyOperations,
#[error("device not found: {0}")]
DeviceNotFound(DevicePubId),
#[error("processes crdt task panicked")]
ProcessCrdtPanic(JoinError),
}

impl From<Error> for rspc::Error {
Expand Down
Loading

0 comments on commit f2f9f91

Please sign in to comment.