Skip to content

Commit

Permalink
bruh
Browse files Browse the repository at this point in the history
  • Loading branch information
fogodev committed Oct 22, 2024
1 parent f2f9f91 commit 3eba4bb
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 85 deletions.
3 changes: 3 additions & 0 deletions core/crates/sync/src/ingest_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ pub async fn process_crdt_operations(
Ok(())
}

#[instrument(skip_all, err)]
async fn handle_crdt_updates(
db: &PrismaClient,
device_pub_id: &DevicePubId,
Expand Down Expand Up @@ -213,6 +214,7 @@ async fn handle_crdt_updates(
.await
}

#[instrument(skip_all, err)]
async fn handle_crdt_create_and_updates(
db: &PrismaClient,
device_pub_id: &DevicePubId,
Expand Down Expand Up @@ -291,6 +293,7 @@ async fn handle_crdt_create_and_updates(
.await
}

#[instrument(skip_all, err)]
async fn handle_crdt_deletion(
db: &PrismaClient,
device_pub_id: &DevicePubId,
Expand Down
182 changes: 97 additions & 85 deletions core/crates/sync/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use tokio::{
spawn,
sync::{broadcast, Mutex, Notify, RwLock, Semaphore},
};
use tracing::{debug, warn};
use tracing::{debug, instrument, warn};
use uhlc::{HLCBuilder, HLC};
use uuid::Uuid;

Expand Down Expand Up @@ -161,100 +161,112 @@ impl Manager {
.collect::<Result<(Vec<_>, Vec<_>), _>>()
}

#[instrument(skip(self))]
async fn ingest_by_model(&self, model_id: ModelId) -> Result<(), Error> {
let (ops_ids, ops) = self.fetch_cloud_crdt_ops(model_id, 10_000).await?;
if ops_ids.is_empty() {
return Ok(());
}
let mut total_count = 0;

debug!(
messages_count = ops.len(),
first_message = ?ops
.first()
.map_or_else(|| SystemTime::UNIX_EPOCH.into(), |op| timestamp_to_datetime(op.timestamp)),
last_message = ?ops
.last()
.map_or_else(|| SystemTime::UNIX_EPOCH.into(), |op| timestamp_to_datetime(op.timestamp)),
model_id,
"Messages by model to ingest",
);

let mut compressed_map =
BTreeMap::<Uuid, HashMap<Vec<u8>, (RecordId, Vec<CompressedCRDTOperation>)>>::new();

for CRDTOperation {
device_pub_id,
timestamp,
model_id: _, // Ignoring model_id as we know it already
record_id,
data,
} in ops
{
let records = compressed_map.entry(device_pub_id).or_default();

// Can't use RecordId as a key because rmpv::Value doesn't implement Hash + Eq.
// So we use it's serialized bytes as a key.
let record_id_bytes =
rmp_serde::to_vec_named(&record_id).expect("already serialized to Value");

match records.entry(record_id_bytes) {
Entry::Occupied(mut entry) => {
entry
.get_mut()
.1
.push(CompressedCRDTOperation { timestamp, data });
}
Entry::Vacant(entry) => {
entry.insert((record_id, vec![CompressedCRDTOperation { timestamp, data }]));
}
loop {
let (ops_ids, ops) = self.fetch_cloud_crdt_ops(model_id, 10_000).await?;
if ops_ids.is_empty() {
break;
}
}

let _lock_guard = self.sync_lock.lock().await;

let semaphore = &Arc::new(Semaphore::new(self.available_parallelism));
debug!(
messages_count = ops.len(),
first_message = ?ops
.first()
.map_or_else(|| SystemTime::UNIX_EPOCH.into(), |op| timestamp_to_datetime(op.timestamp)),
last_message = ?ops
.last()
.map_or_else(|| SystemTime::UNIX_EPOCH.into(), |op| timestamp_to_datetime(op.timestamp)),
"Messages by model to ingest",
);

let mut compressed_map =
BTreeMap::<Uuid, HashMap<Vec<u8>, (RecordId, Vec<CompressedCRDTOperation>)>>::new();

for CRDTOperation {
device_pub_id,
timestamp,
model_id: _, // Ignoring model_id as we know it already
record_id,
data,
} in ops
{
let records = compressed_map.entry(device_pub_id).or_default();

// Can't use RecordId as a key because rmpv::Value doesn't implement Hash + Eq.
// So we use it's serialized bytes as a key.
let record_id_bytes =
rmp_serde::to_vec_named(&record_id).expect("already serialized to Value");

match records.entry(record_id_bytes) {
Entry::Occupied(mut entry) => {
entry
.get_mut()
.1
.push(CompressedCRDTOperation { timestamp, data });
}
Entry::Vacant(entry) => {
entry
.insert((record_id, vec![CompressedCRDTOperation { timestamp, data }]));
}
}
}

let handles = compressed_map
.into_iter()
.flat_map(|(device_pub_id, records)| {
records.into_values().map(move |(record_id, ops)| {
// We can process each record in parallel as they are independent
spawn({
let clock = Arc::clone(&self.clock);
let timestamp_per_device = Arc::clone(&self.timestamp_per_device);
let db = Arc::clone(&self.db);
let device_pub_id = device_pub_id.into();
let semaphore = Arc::clone(semaphore);

async move {
let _permit =
semaphore.acquire().await.expect("semaphore never closes");

process_crdt_operations(
&clock,
&timestamp_per_device,
&db,
device_pub_id,
model_id,
record_id,
ops,
)
.await
}
let _lock_guard = self.sync_lock.lock().await;

let semaphore = &Arc::new(Semaphore::new(self.available_parallelism));

let handles = compressed_map
.into_iter()
.flat_map(|(device_pub_id, records)| {
records.into_values().map(move |(record_id, ops)| {
// We can process each record in parallel as they are independent
spawn({
let clock = Arc::clone(&self.clock);
let timestamp_per_device = Arc::clone(&self.timestamp_per_device);
let db = Arc::clone(&self.db);
let device_pub_id = device_pub_id.into();
let semaphore = Arc::clone(semaphore);

async move {
let _permit =
semaphore.acquire().await.expect("semaphore never closes");

let count = ops.len();

process_crdt_operations(
&clock,
&timestamp_per_device,
&db,
device_pub_id,
model_id,
record_id,
ops,
)
.await
.map(|()| count)
}
})
})
})
})
.collect::<Vec<_>>();
.collect::<Vec<_>>();

for res in handles.join().await {
let count = res.map_err(Error::ProcessCrdtPanic)??;
debug!(count, "Ingested operations of model");
total_count += count;
}

for res in handles.join().await {
res.map_err(Error::ProcessCrdtPanic)??;
self.db
.cloud_crdt_operation()
.delete_many(vec![cloud_crdt_operation::id::in_vec(ops_ids)])
.exec()
.await?;
}

self.db
.cloud_crdt_operation()
.delete_many(vec![cloud_crdt_operation::id::in_vec(ops_ids)])
.exec()
.await?;
debug!(total_count, "Ingested all operations of this model");

Ok(())
}
Expand Down

0 comments on commit 3eba4bb

Please sign in to comment.