Skip to content

Commit

Permalink
lock-service: fix bug when closing rocksdb database (MystenLabs#2273)
Browse files Browse the repository at this point in the history
Frequently when running tests we'll see test failures with the following
error:

    libc++abi: Pure virtual function called!

This is due to the fact that the thread part of the lock-service runs on
may outlive the main thread leading to a race on closing the rocksdb
database vs cleaning up the C++ runtime, causing this segfault to occur
and preventing the database from being able to cleanly close.

This is fixed by joining the thread handle for the spawned lock-service
threads in the Drop impl of LockService.
  • Loading branch information
bmwill authored May 27, 2022
1 parent a13a63f commit 11733de
Showing 1 changed file with 67 additions and 11 deletions.
78 changes: 67 additions & 11 deletions crates/sui-storage/src/lock_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
use futures::channel::oneshot;
use rocksdb::Options;
use std::path::Path;
use std::sync::Arc;
use std::thread::JoinHandle;
use tokio::sync::mpsc::{channel, Receiver, Sender};
use tracing::{debug, info, trace, warn};
use typed_store::rocks::DBMap;
Expand Down Expand Up @@ -271,8 +273,53 @@ const LOCKSERVICE_QUEUE_LEN: usize = 500;
/// Atomicity relies on single threaded loop and only one instance per authority.
#[derive(Clone)]
pub struct LockService {
sender: Sender<LockServiceCommands>,
query_sender: Sender<LockServiceQueries>,
inner: Arc<LockServiceInner>,
}

struct LockServiceInner {
sender: Option<Sender<LockServiceCommands>>,
query_sender: Option<Sender<LockServiceQueries>>,
run_command_loop: Option<JoinHandle<()>>,
run_queries_loop: Option<JoinHandle<()>>,
}

impl LockServiceInner {
#[inline]
fn sender(&self) -> &Sender<LockServiceCommands> {
self.sender
.as_ref()
.expect("LockServiceInner should not have been dropped yet")
}

#[inline]
fn query_sender(&self) -> &Sender<LockServiceQueries> {
self.query_sender
.as_ref()
.expect("LockServiceInner should not have been dropped yet")
}
}

impl Drop for LockServiceInner {
fn drop(&mut self) {
debug!("Begin Dropping LockService");

// Take the two Senders and immediately drop them. This will prompt the two threads
// "run_command_loop" and "run_queries_loop" to terminate so that we can join the threads.
self.sender.take();
self.query_sender.take();
self.run_command_loop
.take()
.expect("run_command_loop thread should not have already been joined")
.join()
.unwrap();
self.run_queries_loop
.take()
.expect("run_queries_loop thread should not have already been joined")
.join()
.unwrap();

debug!("End Dropping LockService");
}
}

impl LockService {
Expand All @@ -284,18 +331,22 @@ impl LockService {
// Now, create a sync channel and spawn a thread
let (sender, receiver) = channel(LOCKSERVICE_QUEUE_LEN);
let inner2 = inner_service.clone();
std::thread::spawn(move || {
let run_command_loop = std::thread::spawn(move || {
inner2.run_command_loop(receiver);
});

let (q_sender, q_receiver) = channel(LOCKSERVICE_QUEUE_LEN);
std::thread::spawn(move || {
let run_queries_loop = std::thread::spawn(move || {
inner_service.run_queries_loop(q_receiver);
});

Ok(Self {
sender,
query_sender: q_sender,
inner: Arc::new(LockServiceInner {
sender: Some(sender),
query_sender: Some(q_sender),
run_command_loop: Some(run_command_loop),
run_queries_loop: Some(run_queries_loop),
}),
})
}

Expand All @@ -311,7 +362,8 @@ impl LockService {
) -> SuiResult {
let (os_sender, os_receiver) = oneshot::channel::<SuiResult>();
// NOTE: below is blocking, switch to Tokio channels which are async?
self.sender
self.inner
.sender()
.send(LockServiceCommands::Acquire {
refs,
tx_digest,
Expand All @@ -330,7 +382,8 @@ impl LockService {
/// Only the gateway could set is_force_reset to true.
pub async fn initialize_locks(&self, refs: &[ObjectRef], is_force_reset: bool) -> SuiResult {
let (os_sender, os_receiver) = oneshot::channel::<SuiResult>();
self.sender
self.inner
.sender()
.send(LockServiceCommands::Initialize {
refs: Vec::from(refs),
is_force_reset,
Expand All @@ -346,7 +399,8 @@ impl LockService {
/// Removes locks for a given list of ObjectRefs.
pub async fn remove_locks(&self, refs: Vec<ObjectRef>) -> SuiResult {
let (os_sender, os_receiver) = oneshot::channel::<SuiResult>();
self.sender
self.inner
.sender()
.send(LockServiceCommands::RemoveLocks {
refs,
resp: os_sender,
Expand All @@ -364,7 +418,8 @@ impl LockService {
/// * Some(Some(tx_digest)) - lock exists and set to transaction
pub async fn get_lock(&self, object: ObjectRef) -> SuiLockResult {
let (os_sender, os_receiver) = oneshot::channel::<SuiLockResult>();
self.query_sender
self.inner
.query_sender()
.send(LockServiceQueries::GetLock {
object,
resp: os_sender,
Expand All @@ -380,7 +435,8 @@ impl LockService {
/// Returns Err(TransactionLockDoesNotExist) if at least one object lock is not initialized.
pub async fn locks_exist(&self, objects: Vec<ObjectRef>) -> SuiResult {
let (os_sender, os_receiver) = oneshot::channel::<SuiResult>();
self.query_sender
self.inner
.query_sender()
.send(LockServiceQueries::CheckLocksExist {
objects,
resp: os_sender,
Expand Down

0 comments on commit 11733de

Please sign in to comment.