Skip to content

Commit

Permalink
[Performance] Refactor ThreadManager, and use it in some places. (apt…
Browse files Browse the repository at this point in the history
  • Loading branch information
grao1991 authored Aug 31, 2023
1 parent f68a6e2 commit b38cf1a
Show file tree
Hide file tree
Showing 23 changed files with 279 additions and 291 deletions.
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,7 @@ aptos-executor-service = { path = "execution/executor-service" }
aptos-executor-test-helpers = { path = "execution/executor-test-helpers" }
aptos-executor-types = { path = "execution/executor-types" }
aptos-experimental-ptx-executor = { path = "experimental/execution/ptx-executor" }
aptos-experimental-runtimes = { path = "experimental/runtimes" }
aptos-faucet-cli = { path = "crates/aptos-faucet/cli" }
aptos-faucet-core = { path = "crates/aptos-faucet/core" }
aptos-faucet-service = { path = "crates/aptos-faucet/service" }
Expand Down
1 change: 1 addition & 0 deletions execution/executor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ aptos-block-partitioner = { workspace = true }
aptos-consensus-types = { workspace = true }
aptos-crypto = { workspace = true }
aptos-executor-types = { workspace = true }
aptos-experimental-runtimes = { workspace = true }
aptos-infallible = { workspace = true }
aptos-logger = { workspace = true }
aptos-metrics-core = { workspace = true }
Expand Down
21 changes: 13 additions & 8 deletions execution/executor/src/block_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use aptos_executor_types::{
execution_output::ExecutionOutput, state_checkpoint_output::StateCheckpointOutput,
BlockExecutorTrait, Error, StateComputeResult,
};
use aptos_experimental_runtimes::thread_manager::THREAD_MANAGER;
use aptos_infallible::RwLock;
use aptos_logger::prelude::*;
use aptos_scratchpad::SparseMerkleTree;
Expand Down Expand Up @@ -252,10 +253,12 @@ where
.with_label_values(&["state_checkpoint"])
.start_timer();

chunk_output.into_state_checkpoint_output(
parent_output.state(),
maybe_block_gas_limit.map(|_| block_id),
)?
THREAD_MANAGER.get_exe_cpu_pool().install(|| {
chunk_output.into_state_checkpoint_output(
parent_output.state(),
maybe_block_gas_limit.map(|_| block_id),
)
})?
};

let _ = self.block_tree.add_block(
Expand Down Expand Up @@ -306,10 +309,12 @@ where
);
parent_output.reconfig_suffix()
} else {
let (output, _, _) = ApplyChunkOutput::calculate_ledger_update(
state_checkpoint_output,
parent_accumulator.clone(),
)?;
let (output, _, _) = THREAD_MANAGER.get_non_exe_cpu_pool().install(|| {
ApplyChunkOutput::calculate_ledger_update(
state_checkpoint_output,
parent_accumulator.clone(),
)
})?;
output
};

Expand Down
3 changes: 2 additions & 1 deletion execution/executor/src/components/block_tree/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use anyhow::{anyhow, ensure, Result};
use aptos_consensus_types::block::Block as ConsensusBlock;
use aptos_crypto::HashValue;
use aptos_executor_types::{execution_output::ExecutionOutput, Error, LedgerUpdateOutput};
use aptos_experimental_runtimes::thread_manager::THREAD_MANAGER;
use aptos_infallible::Mutex;
use aptos_logger::{debug, info};
use aptos_storage_interface::DbReader;
Expand Down Expand Up @@ -285,7 +286,7 @@ impl BlockTree {
// This should be the last reference to old root, spawning a drop to a different thread
// guarantees that the drop will not happen in the current thread
let (tx, rx) = channel::<()>();
rayon::spawn(move || {
THREAD_MANAGER.get_non_exe_cpu_pool().spawn(move || {
let _timeer = APTOS_EXECUTOR_OTHER_TIMERS_SECONDS
.with_label_values(&["drop_old_root"])
.start_timer();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::metrics::APTOS_EXECUTOR_OTHER_TIMERS_SECONDS;
use anyhow::{anyhow, ensure, Result};
use aptos_crypto::{hash::CryptoHash, HashValue};
use aptos_executor_types::{ParsedTransactionOutput, ProofReader};
use aptos_experimental_runtimes::thread_manager::THREAD_MANAGER;
use aptos_scratchpad::SparseMerkleTree;
use aptos_storage_interface::{
cached_state_view::{ShardedStateCache, StateCache},
Expand Down Expand Up @@ -102,7 +103,7 @@ impl InMemoryStateCalculatorV2 {
let StateCache {
// This makes sure all in-mem nodes seen while proofs were fetched stays in mem during the
// calculation
frozen_base: _,
frozen_base,
sharded_state_cache,
proofs,
} = state_cache;
Expand Down Expand Up @@ -135,6 +136,10 @@ impl InMemoryStateCalculatorV2 {
(new_checkpoint, new_checkpoint_version)
};

THREAD_MANAGER.get_non_exe_cpu_pool().spawn(move || {
drop(frozen_base);
});

let state_checkpoint_hashes = std::iter::repeat(None)
.take(num_txns - 1)
.chain([Some(new_checkpoint.root_hash())])
Expand Down
23 changes: 23 additions & 0 deletions experimental/runtimes/src/common.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

#[cfg(target_os = "linux")]
use libc::{cpu_set_t, sched_setaffinity};

#[cfg(target_os = "linux")]
pub(crate) fn new_cpu_set() -> cpu_set_t {
unsafe { std::mem::zeroed::<cpu_set_t>() }
}

#[cfg(target_os = "linux")]
pub(crate) fn pin_cpu_set(cpu_set: cpu_set_t) -> impl Fn() + Send + Sync + 'static {
move || {
unsafe {
sched_setaffinity(
0, // Defaults to current thread
std::mem::size_of::<cpu_set_t>(),
&cpu_set,
);
};
}
}
2 changes: 2 additions & 0 deletions experimental/runtimes/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

mod common;
pub(crate) mod strategies;
pub mod thread_manager;
39 changes: 39 additions & 0 deletions experimental/runtimes/src/strategies/default.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use crate::thread_manager::ThreadManager;
use aptos_runtimes::spawn_rayon_thread_pool;
use rayon::ThreadPool;

pub struct DefaultThreadManager {
exe_threads: ThreadPool,
non_exe_threads: ThreadPool,
io_threads: ThreadPool,
}

impl DefaultThreadManager {
pub(crate) fn new() -> DefaultThreadManager {
let exe_threads = spawn_rayon_thread_pool("exe".into(), Some(num_cpus::get()));
let non_exe_threads = spawn_rayon_thread_pool("non_exe".into(), Some(num_cpus::get()));
let io_threads = spawn_rayon_thread_pool("io".into(), Some(64));
Self {
exe_threads,
non_exe_threads,
io_threads,
}
}
}

impl<'a> ThreadManager<'a> for DefaultThreadManager {
fn get_exe_cpu_pool(&'a self) -> &'a ThreadPool {
&self.exe_threads
}

fn get_non_exe_cpu_pool(&'a self) -> &'a ThreadPool {
&self.non_exe_threads
}

fn get_io_pool(&'a self) -> &'a ThreadPool {
&self.io_threads
}
}
6 changes: 6 additions & 0 deletions experimental/runtimes/src/strategies/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

pub(crate) mod default;
#[cfg(target_os = "linux")]
pub(crate) mod pin_exe_threads_to_cores;
70 changes: 70 additions & 0 deletions experimental/runtimes/src/strategies/pin_exe_threads_to_cores.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use crate::{
common::{new_cpu_set, pin_cpu_set},
thread_manager::ThreadManager,
};
use aptos_runtimes::spawn_rayon_thread_pool_with_start_hook;
use libc::CPU_SET;
use rayon::ThreadPool;

pub(crate) struct PinExeThreadsToCoresThreadManager {
exe_threads: ThreadPool,
non_exe_threads: ThreadPool,
io_threads: ThreadPool,
}

impl PinExeThreadsToCoresThreadManager {
pub(crate) fn new(num_exe_cpu: usize) -> Self {
let core_ids = core_affinity::get_core_ids().unwrap();
assert!(core_ids.len() > num_exe_cpu);

let mut exe_cpu_set = new_cpu_set();
let mut non_exe_cpu_set = new_cpu_set();
for core_id in core_ids.iter().take(num_exe_cpu) {
unsafe { CPU_SET(core_id.id, &mut exe_cpu_set) };
}
for core_id in core_ids.iter().skip(num_exe_cpu) {
unsafe { CPU_SET(core_id.id, &mut non_exe_cpu_set) };
}

let exe_threads = spawn_rayon_thread_pool_with_start_hook(
"exe".into(),
Some(num_exe_cpu),
pin_cpu_set(exe_cpu_set),
);

let non_exe_threads = spawn_rayon_thread_pool_with_start_hook(
"non_exe".into(),
Some(core_ids.len() - num_exe_cpu),
pin_cpu_set(non_exe_cpu_set),
);

let io_threads = spawn_rayon_thread_pool_with_start_hook(
"io".into(),
Some(64),
pin_cpu_set(non_exe_cpu_set),
);

Self {
exe_threads,
non_exe_threads,
io_threads,
}
}
}

impl<'a> ThreadManager<'a> for PinExeThreadsToCoresThreadManager {
fn get_exe_cpu_pool(&'a self) -> &'a ThreadPool {
&self.exe_threads
}

fn get_non_exe_cpu_pool(&'a self) -> &'a ThreadPool {
&self.non_exe_threads
}

fn get_io_pool(&'a self) -> &'a ThreadPool {
&self.io_threads
}
}
Loading

0 comments on commit b38cf1a

Please sign in to comment.