Skip to content

Commit 6255722

Browse files
committed
testing threading
1 parent 2b8ab28 commit 6255722

8 files changed

+390
-3
lines changed

Cargo.lock

+86
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

core/Cargo.toml

+6
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,9 @@ default = ["proactor_nuclei", "telemetry_server_builtin", "prometheus_metrics",
3737
proactor_nuclei = ["nuclei/async-exec", "nuclei/iouring"]
3838
# Use the tokio async runtime (proactor_? exclusive one or the other)
3939
proactor_tokio = ["nuclei/tokio", "nuclei/iouring"]
40+
exec_embassy = ["embassy-executor"]
41+
#exec_smol = ["smol"]
42+
exec_async_std = ["async-std"]
4043

4144
proactor_windows = ["nuclei/async-exec"]
4245

@@ -80,6 +83,9 @@ cargo-args = ["-Zunstable-options", "-Zrustdoc-scrape-examples"]
8083

8184
[dependencies]
8285
nuclei = { version = "0.4.4", optional = true } # foundational proactive runtime
86+
embassy-executor = { version = "0.5.0", optional = true }
87+
async-std = { version = "1.13.1", attributes="", optional = true}
88+
#smol = { version = "2.0.2", option = true}
8389
ringbuf = "0.4.7" # foundational bounded message passing
8490
async-ringbuf = "0.3.2" # foundational bounded message passing
8591
log = "0.4.25" # common logging traits
+85
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
2+
#[cfg(any(feature = "exec_async_std", feature = "exec_async_std"))]
3+
pub(crate) mod core_exec {
4+
//! This module provides an abstraction layer for threading solutions, allowing for easier
5+
//! swapping of threading implementations in the future.
6+
//!
7+
//! The module leverages the `async_std` library for asynchronous execution and provides
8+
//! utilities for initialization, spawning detached tasks, and blocking on futures.
9+
10+
// ## Imports
11+
use std::future::Future;
12+
use std::io::{self, Result};
13+
use std::thread;
14+
use std::time::Duration;
15+
use lazy_static::lazy_static;
16+
use log::{error, trace, warn};
17+
use parking_lot::Once;
18+
use crate::ProactorConfig;
19+
use async_std::task::{self, JoinHandle};
20+
use std::panic::{catch_unwind, AssertUnwindSafe};
21+
22+
/// Spawns a local task intended for lightweight operations.
23+
///
24+
/// Note: `async_std` uses a thread pool, so this spawns on the global executor rather than
25+
/// strictly on the current thread.
26+
pub fn spawn_local<F: Future<Output = T> + 'static, T: 'static>(f: F) -> JoinHandle<T> {
27+
task::spawn(f)
28+
}
29+
30+
/// Spawns a blocking task on a separate thread for CPU-bound or blocking operations.
31+
pub fn spawn_blocking<F: FnOnce() -> T + Send + 'static, T: Send + 'static>(f: F) -> JoinHandle<T> {
32+
task::spawn_blocking(f)
33+
}
34+
35+
/// Spawns a task that can run on any thread in the pool for parallel execution.
36+
pub fn spawn<F: Future<Output = T> + Send + 'static, T: Send + 'static>(future: F) -> JoinHandle<T> {
37+
task::spawn(future)
38+
}
39+
40+
/// Asynchronously spawns additional threads in the executor.
41+
///
42+
/// Note: `async_std` does not support dynamically adding threads at runtime, so this returns an error.
43+
pub async fn spawn_more_threads(_count: usize) -> io::Result<usize> {
44+
Err(io::Error::new(
45+
io::ErrorKind::Other,
46+
"async_std does not support dynamic thread spawning",
47+
))
48+
}
49+
50+
lazy_static! {
51+
/// Ensures initialization runs only once across all threads.
52+
static ref INIT: Once = Once::new();
53+
}
54+
55+
/// Initializes the `async_std` executor with the specified configuration.
56+
///
57+
/// If `enable_driver` is true, spawns a blocking task to keep the executor running indefinitely.
58+
/// The `proactor_config` and `queue_length` parameters are ignored as `async_std` does not use
59+
/// `io_uring` or similar configurable proactors.
60+
pub(crate) fn init(enable_driver: bool, _proactor_config: ProactorConfig, _queue_length: u32) {
61+
INIT.call_once(|| {
62+
if enable_driver {
63+
trace!("Starting async_std driver");
64+
task::spawn_blocking(|| {
65+
loop {
66+
let result = catch_unwind(AssertUnwindSafe(|| {
67+
task::block_on(std::future::pending::<()>());
68+
}));
69+
if let Err(e) = result {
70+
error!("async_std driver panicked: {:?}", e);
71+
thread::sleep(Duration::from_secs(1));
72+
warn!("Restarting async_std driver");
73+
}
74+
}
75+
}).detach();
76+
}
77+
// No specific executor configuration needed for async_std
78+
});
79+
}
80+
81+
/// Blocks the current thread until the given future completes.
82+
pub fn block_on<F: Future<Output = T>, T>(future: F) -> T {
83+
task::block_on(future)
84+
}
85+
}

core/src/abstract_executor_embassy.rs

+107
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
#[cfg(any(feature = "exec_embassy"))]
2+
pub(crate) mod core_exec {
3+
//! This module provides an abstraction layer for threading solutions, allowing for easier
4+
//! swapping of threading implementations in the future.
5+
//!
6+
//! The module leverages the `embassy_executor` library for asynchronous execution and provides
7+
//! utilities for initialization, spawning detached tasks, and blocking on futures.
8+
9+
use std::future::Future;
10+
use std::io::{self, Result};
11+
use lazy_static::lazy_static;
12+
use parking_lot::Once;
13+
use crate::ProactorConfig;
14+
use embassy_executor::{Executor, Spawner};
15+
16+
/// Spawns a local task intended for lightweight operations.
17+
///
18+
/// In Embassy, tasks are scheduled on the executor without distinguishing between
19+
/// local and pooled tasks. The task must be `'static` and produce a `'static` output.
20+
pub fn spawn_local<F, T>(f: F) -> embassy_executor::Task<T>
21+
where
22+
F: Future<Output = T> + 'static,
23+
T: 'static,
24+
{
25+
let spawner = embassy_executor::Spawner::for_current_executor();
26+
spawner.spawn(f)
27+
}
28+
29+
/// Spawns a blocking task on a separate thread for CPU-bound or blocking operations.
30+
///
31+
/// Embassy does not natively support blocking operations. This implementation simulates
32+
/// blocking by running the closure in an async task, which may not be ideal for CPU-bound work.
33+
pub fn spawn_blocking<F, T>(f: F) -> embassy_executor::Task<T>
34+
where
35+
F: FnOnce() -> T + Send + 'static,
36+
T: Send + 'static,
37+
{
38+
let spawner = embassy_executor::Spawner::for_current_executor();
39+
spawner.spawn(async move { f() })
40+
}
41+
42+
/// Spawns a task that can run on any thread in the pool for parallel execution.
43+
///
44+
/// In Embassy, this is identical to `spawn_local` as there’s no thread pool distinction.
45+
pub fn spawn<F, T>(future: F) -> embassy_executor::Task<T>
46+
where
47+
F: Future<Output = T> + Send + 'static,
48+
T: Send + 'static,
49+
{
50+
let spawner = embassy_executor::Spawner::for_current_executor();
51+
spawner.spawn(future)
52+
}
53+
54+
/// Asynchronously spawns additional threads in the executor.
55+
///
56+
/// Embassy does not support dynamic thread spawning, so this function returns an error.
57+
pub async fn spawn_more_threads(_count: usize) -> io::Result<usize> {
58+
Err(io::Error::new(
59+
io::ErrorKind::Other,
60+
"Dynamic thread spawning not supported in Embassy",
61+
))
62+
}
63+
64+
lazy_static! {
65+
/// Ensures initialization runs only once across all threads.
66+
static ref INIT: Once = Once::new();
67+
}
68+
69+
/// Initializes the Embassy executor with the specified configuration.
70+
///
71+
/// If `enable_driver` is true, starts the executor in a dedicated thread.
72+
/// The `proactor_config` and `queue_length` parameters are ignored as Embassy does not use
73+
/// `io_uring` or similar configurable proactors.
74+
pub(crate) fn init(enable_driver: bool, _proactor_config: ProactorConfig, _queue_length: u32) {
75+
INIT.call_once(|| {
76+
if enable_driver {
77+
trace!("Starting Embassy driver");
78+
std::thread::spawn(|| {
79+
let executor = Executor::new();
80+
executor.run(|_spawner| {
81+
// Embassy's executor runs indefinitely; no tasks are spawned here
82+
// as the spawner is typically used elsewhere in the application.
83+
});
84+
});
85+
}
86+
});
87+
}
88+
89+
/// Blocks the current thread until the given future completes.
90+
///
91+
/// Embassy is not designed for blocking on a single future. This implementation runs
92+
/// a new executor instance until the future completes, which may be inefficient.
93+
pub fn block_on<F, T>(future: F) -> T
94+
where
95+
F: Future<Output = T>,
96+
T: 'static,
97+
{
98+
let mut result = None;
99+
let executor = Executor::new();
100+
executor.run(|spawner| {
101+
spawner.spawn(async {
102+
result = Some(future.await);
103+
})
104+
});
105+
result.expect("Future did not complete")
106+
}
107+
}

core/src/abstract_executor.rs core/src/abstract_executor_nuclei.rs

+1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11

2+
#[cfg(any(feature = "proactor_nuclei", feature = "proactor_tokio"))]
23
pub(crate) mod core_exec {
34
//! This module provides an abstraction layer for threading solutions, allowing for easier
45
//! swapping of threading implementations in the future.

0 commit comments

Comments
 (0)