Skip to content

Commit

Permalink
New ingester optimization attempt
Browse files Browse the repository at this point in the history
  • Loading branch information
fogodev committed Oct 22, 2024
1 parent 3eba4bb commit 114d0df
Showing 1 changed file with 62 additions and 37 deletions.
99 changes: 62 additions & 37 deletions core/crates/sync/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use sd_utils::timestamp_to_datetime;

use std::{
collections::{hash_map::Entry, BTreeMap, HashMap},
fmt,
fmt, mem,
num::NonZeroU128,
sync::{
atomic::{self, AtomicBool},
Expand All @@ -19,11 +19,11 @@ use std::{
};

use async_stream::stream;
use futures::Stream;
use futures_concurrency::future::{Join, TryJoin};
use futures::{stream::FuturesUnordered, Stream, TryStreamExt};
use futures_concurrency::future::TryJoin;
use tokio::{
spawn,
sync::{broadcast, Mutex, Notify, RwLock, Semaphore},
sync::{broadcast, Mutex, Notify, RwLock},
};
use tracing::{debug, instrument, warn};
use uhlc::{HLCBuilder, HLC};
Expand All @@ -36,6 +36,8 @@ use super::{
Error, SyncEvent, TimestampPerDevice, NTP64,
};

const INGESTION_BATCH_SIZE: i64 = 10_000;

/// Wrapper that spawns the ingest actor and provides utilities for reading and writing sync operations.
#[derive(Clone)]
pub struct Manager {
Expand Down Expand Up @@ -165,14 +167,22 @@ impl Manager {
async fn ingest_by_model(&self, model_id: ModelId) -> Result<(), Error> {
let mut total_count = 0;

let mut buckets = (0..self.available_parallelism)
.map(|_| FuturesUnordered::new())
.collect::<Vec<_>>();

loop {
let (ops_ids, ops) = self.fetch_cloud_crdt_ops(model_id, 10_000).await?;
let (ops_ids, ops) = self
.fetch_cloud_crdt_ops(model_id, INGESTION_BATCH_SIZE)
.await?;
if ops_ids.is_empty() {
break;
}

let messages_count = ops.len();

debug!(
messages_count = ops.len(),
messages_count,
first_message = ?ops
.first()
.map_or_else(|| SystemTime::UNIX_EPOCH.into(), |op| timestamp_to_datetime(op.timestamp)),
Expand Down Expand Up @@ -216,45 +226,60 @@ impl Manager {

let _lock_guard = self.sync_lock.lock().await;

let semaphore = &Arc::new(Semaphore::new(self.available_parallelism));

let handles = compressed_map
compressed_map
.into_iter()
.flat_map(|(device_pub_id, records)| {
records.into_values().map(move |(record_id, ops)| {
// We can process each record in parallel as they are independent
spawn({
let clock = Arc::clone(&self.clock);
let timestamp_per_device = Arc::clone(&self.timestamp_per_device);
let db = Arc::clone(&self.db);
let device_pub_id = device_pub_id.into();
let semaphore = Arc::clone(semaphore);

async move {
let _permit =
semaphore.acquire().await.expect("semaphore never closes");

let count = ops.len();

process_crdt_operations(
&clock,
&timestamp_per_device,
&db,
device_pub_id,
model_id,
record_id,
ops,
)
.await
.map(|()| count)
}
})

let clock = Arc::clone(&self.clock);
let timestamp_per_device = Arc::clone(&self.timestamp_per_device);
let db = Arc::clone(&self.db);
let device_pub_id = device_pub_id.into();

async move {
let count = ops.len();

process_crdt_operations(
&clock,
&timestamp_per_device,
&db,
device_pub_id,
model_id,
record_id,
ops,
)
.await
.map(|()| count)
}
})
})
.enumerate()
.for_each(|(idx, fut)| buckets[idx % self.available_parallelism].push(fut));

let handles = buckets
.iter_mut()
.enumerate()
.filter(|(_idx, bucket)| !bucket.is_empty())
.map(|(idx, bucket)| {
let mut bucket = mem::take(bucket);

spawn(async move {
let mut ops_count = 0;
while let Some(count) = bucket.try_next().await? {
ops_count += count;
}

Ok::<_, Error>((ops_count, idx, bucket))
})
})
.collect::<Vec<_>>();

for res in handles.join().await {
let count = res.map_err(Error::ProcessCrdtPanic)??;
for res in handles.try_join().await.map_err(Error::ProcessCrdtPanic)? {
let (count, idx, bucket) = res?;

buckets[idx] = bucket;

debug!(count, "Ingested operations of model");
total_count += count;
}
Expand Down

0 comments on commit 114d0df

Please sign in to comment.