diff --git a/lib/wasi/src/runners/wcgi.rs b/lib/wasi/src/runners/wcgi.rs deleted file mode 100644 index 6bb2c10f219..00000000000 --- a/lib/wasi/src/runners/wcgi.rs +++ /dev/null @@ -1,101 +0,0 @@ -use anyhow::{Context, Error}; -use wasmer::{Engine, Module, Store}; -use wasmer_vfs::FileSystem; -use webc::metadata::{Command, Manifest}; - -use crate::runners::WapmContainer; - -pub struct WcgiRunner {} - -// TODO(Michael-F-Bryan): When we rewrite the existing runner infrastructure, -// make the "Runner" trait contain just these two methods. -impl WcgiRunner { - fn supports(cmd: &Command) -> Result { - Ok(cmd.runner.starts_with("https://webc.org/runner/wcgi")) - } - - #[tracing::instrument(skip(self, ctx))] - fn run_(&self, command_name: &str, ctx: &RunnerContext<'_>) -> Result<(), Error> { - let wasi: webc::metadata::annotations::Wasi = ctx - .command() - .annotations - .get("wasi") - .cloned() - .and_then(|v| serde_cbor::value::from_value(v).ok()) - .context("Unable to retrieve the WASI metadata")?; - - let atom_name = &wasi.atom; - let atom = ctx - .get_atom(&atom_name) - .with_context(|| format!("Unable to retrieve the \"{atom_name}\" atom"))?; - - let module = ctx.compile(atom).context("Unable to compile the atom")?; - todo!(); - } -} - -// TODO(Michael-F-Bryan): Turn this into an object-safe trait when we rewrite -// the "Runner" trait. -struct RunnerContext<'a> { - container: &'a WapmContainer, - command: &'a Command, - engine: Engine, - store: Store, -} - -#[allow(dead_code)] -impl RunnerContext<'_> { - fn command(&self) -> &Command { - self.command - } - - fn manifest(&self) -> &Manifest { - self.container.manifest() - } - - fn engine(&self) -> &Engine { - &self.engine - } - - fn store(&self) -> &Store { - &self.store - } - - fn volume(&self, _name: &str) -> Option> { - todo!(); - } - - fn get_atom(&self, name: &str) -> Option<&[u8]> { - self.container.get_atom(name) - } - - fn compile(&self, wasm: &[u8]) -> Result { - // TODO: wire this up to wasmer-cache - Module::new(&self.engine, wasm).map_err(Error::from) - } -} - -impl crate::runners::Runner for WcgiRunner { - type Output = (); - - fn can_run_command(&self, _: &str, command: &Command) -> Result { - WcgiRunner::supports(command) - } - - fn run_command( - &mut self, - command_name: &str, - command: &Command, - container: &WapmContainer, - ) -> Result { - let store = Store::default(); - let ctx = RunnerContext { - container, - command, - engine: store.engine().clone(), - store, - }; - - self.run_(command_name, &ctx) - } -} diff --git a/lib/wasi/src/runners/wcgi/handler.rs b/lib/wasi/src/runners/wcgi/handler.rs new file mode 100644 index 00000000000..8144e0e61f1 --- /dev/null +++ b/lib/wasi/src/runners/wcgi/handler.rs @@ -0,0 +1,244 @@ +use std::{ + collections::HashMap, + path::{Path, PathBuf}, + pin::Pin, + sync::Arc, + task::Poll, +}; + +use anyhow::Error; +use futures::{Future, FutureExt, StreamExt, TryFutureExt}; +use http::{Request, Response}; +use hyper::{service::Service, Body}; +use tokio::{ + io::{AsyncBufReadExt, AsyncRead, AsyncWrite, AsyncWriteExt}, + runtime::Handle, +}; +use wasmer::Module; +use wasmer_vfs::{FileSystem, PassthruFileSystem, RootFileSystemBuilder, TmpFileSystem}; +use wcgi_host::CgiDialect; + +use crate::{ + http::HttpClientCapabilityV1, runners::wcgi::MappedDirectory, Capabilities, Pipe, + VirtualTaskManager, WasiEnv, +}; + +/// The shared object that manages the instantiaion of WASI executables and +/// communicating with them via the CGI protocol. +#[derive(Debug, Clone)] +pub(crate) struct Handler { + pub(crate) program: Arc, + pub(crate) env: Arc>, + pub(crate) args: Arc<[String]>, + pub(crate) mapped_dirs: Arc<[MappedDirectory]>, + pub(crate) task_manager: Arc, + pub(crate) module: Module, + pub(crate) dialect: CgiDialect, +} + +impl Handler { + pub(crate) async fn handle(&self, req: Request) -> Result, Error> { + let (parts, body) = req.into_parts(); + + let (req_body_sender, req_body_receiver) = Pipe::channel(); + let (res_body_sender, res_body_receiver) = Pipe::channel(); + let (stderr_sender, stderr_receiver) = Pipe::channel(); + + let builder = WasiEnv::builder(self.program.to_string()); + + let mut request_specific_env = HashMap::new(); + self.dialect + .prepare_environment_variables(parts, &mut request_specific_env); + + let builder = builder + .envs(self.env.iter()) + .envs(request_specific_env) + .args(self.args.iter()) + .stdin(Box::new(req_body_receiver)) + .stdout(Box::new(res_body_sender)) + .stderr(Box::new(stderr_sender)) + .capabilities(Capabilities { + insecure_allow_all: true, + http_client: HttpClientCapabilityV1::new_allow_all(), + }) + .sandbox_fs(self.fs()?) + .preopen_dir(Path::new("/"))?; + + let module = self.module.clone(); + + let done = self + .task_manager + .runtime() + .spawn_blocking(move || builder.run(module)) + .map_err(Error::from) + .and_then(|r| async { r.map_err(Error::from) }); + + let handle = self.task_manager.runtime().clone(); + self.task_manager.runtime().spawn(async move { + if let Err(e) = + drive_request_to_completion(&handle, done, body, req_body_sender, stderr_receiver) + .await + { + tracing::error!( + error = &*e as &dyn std::error::Error, + "Unable to drive the request to completion" + ); + } + }); + + let mut res_body_receiver = tokio::io::BufReader::new(res_body_receiver); + + let parts = self + .dialect + .extract_response_header(&mut res_body_receiver) + .await?; + let chunks = futures::stream::try_unfold(res_body_receiver, |mut r| async move { + match r.fill_buf().await { + Ok(chunk) if chunk.is_empty() => Ok(None), + Ok(chunk) => { + let chunk = chunk.to_vec(); + r.consume(chunk.len()); + Ok(Some((chunk, r))) + } + Err(e) => Err(e), + } + }); + let body = hyper::Body::wrap_stream(chunks); + + let response = hyper::Response::from_parts(parts, body); + + Ok(response) + } + + fn fs(&self) -> Result { + let root_fs = RootFileSystemBuilder::new().build(); + + if !self.mapped_dirs.is_empty() { + let fs_backing: Arc = + Arc::new(PassthruFileSystem::new(crate::default_fs_backing())); + + for MappedDirectory { host, guest } in self.mapped_dirs.iter() { + let guest = match guest.starts_with('/') { + true => PathBuf::from(guest), + false => Path::new("/").join(guest), + }; + tracing::trace!( + host=%host.display(), + guest=%guest.display(), + "mounting directory to instance fs", + ); + + root_fs + .mount(host.clone(), &fs_backing, guest.clone()) + .map_err(|error| { + anyhow::anyhow!( + "Unable to mount \"{}\" to \"{}\": {error}", + host.display(), + guest.display() + ) + })?; + } + } + Ok(root_fs) + } +} + +/// Drive the request to completion by streaming the request body to the +/// instance and waiting for it to exit. +async fn drive_request_to_completion( + handle: &Handle, + done: impl Future>, + mut request_body: hyper::Body, + mut instance_stdin: impl AsyncWrite + Send + Unpin + 'static, + instance_stderr: impl AsyncRead + Send + Unpin + 'static, +) -> Result<(), Error> { + let request_body_send = handle + .spawn(async move { + // Copy the request into our instance, chunk-by-chunk. If the instance + // dies before we finish writing the body, the instance's side of the + // pipe will be automatically closed and we'll error out. + while let Some(res) = request_body.next().await { + // FIXME(theduke): figure out how to propagate a body error to the + // CGI instance. + let chunk = res?; + instance_stdin.write_all(chunk.as_ref()).await?; + } + + instance_stdin.shutdown().await?; + + Ok::<(), Error>(()) + }) + .map_err(Error::from) + .and_then(|r| async { r }); + + handle.spawn(async move { + consume_stderr(instance_stderr).await; + }); + + futures::try_join!(done, request_body_send)?; + + Ok(()) +} + +/// Read the instance's stderr, taking care to preserve output even when WASI +/// pipe errors occur so users still have *something* they use for +/// troubleshooting. +async fn consume_stderr(stderr: impl AsyncRead + Send + Unpin + 'static) { + let mut stderr = tokio::io::BufReader::new(stderr); + + // FIXME: this could lead to unbound memory usage + let mut buffer = Vec::new(); + + // Note: we don't want to just read_to_end() because a reading error + // would cause us to lose all of stderr. At least this way we'll be + // able to show users the partial result. + loop { + match stderr.fill_buf().await { + Ok(chunk) if chunk.is_empty() => { + // EOF - the instance's side of the pipe was closed. + break; + } + Ok(chunk) => { + buffer.extend(chunk); + let bytes_read = chunk.len(); + stderr.consume(bytes_read); + } + Err(e) => { + tracing::error!( + error = &e as &dyn std::error::Error, + bytes_read = buffer.len(), + "Unable to read the complete stderr", + ); + break; + } + } + } + + let stderr = String::from_utf8(buffer).unwrap_or_else(|e| { + tracing::warn!( + error = &e as &dyn std::error::Error, + "Stdout wasn't valid UTF-8", + ); + String::from_utf8_lossy(e.as_bytes()).into_owned() + }); + + tracing::info!(%stderr); +} + +impl Service> for Handler { + type Response = Response; + type Error = Error; + type Future = Pin, Error>> + Send>>; + + fn poll_ready(&mut self, _cx: &mut std::task::Context<'_>) -> Poll> { + // TODO: We probably should implement some sort of backpressure here... + Poll::Ready(Ok(())) + } + + fn call(&mut self, request: Request) -> Self::Future { + // Note: all fields are reference-counted so cloning is pretty cheap + let handler = self.clone(); + let fut = async move { handler.handle(request).await }; + fut.boxed() + } +} diff --git a/lib/wasi/src/runners/wcgi/mod.rs b/lib/wasi/src/runners/wcgi/mod.rs new file mode 100644 index 00000000000..4b3df172778 --- /dev/null +++ b/lib/wasi/src/runners/wcgi/mod.rs @@ -0,0 +1,12 @@ +mod handler; +mod runner; + +use std::path::PathBuf; + +pub use self::runner::WcgiRunner; + +#[derive(Debug, Clone, PartialEq)] +pub(crate) struct MappedDirectory { + pub host: PathBuf, + pub guest: String, +} diff --git a/lib/wasi/src/runners/wcgi/runner.rs b/lib/wasi/src/runners/wcgi/runner.rs new file mode 100644 index 00000000000..599709376b2 --- /dev/null +++ b/lib/wasi/src/runners/wcgi/runner.rs @@ -0,0 +1,275 @@ +use std::{collections::HashMap, convert::Infallible, net::SocketAddr, path::PathBuf, sync::Arc}; + +use anyhow::{Context, Error}; +use wasmer::{Engine, Module, Store}; +use wasmer_vfs::FileSystem; +use wcgi_host::CgiDialect; +use webc::metadata::{Command, Manifest}; + +use crate::{ + runners::{ + wcgi::{handler::Handler, MappedDirectory}, + WapmContainer, + }, + runtime::task_manager::tokio::TokioTaskManager, + VirtualTaskManager, +}; + +pub struct WcgiRunner { + program_name: Arc, + config: Config, +} + +// TODO(Michael-F-Bryan): When we rewrite the existing runner infrastructure, +// make the "Runner" trait contain just these two methods. +impl WcgiRunner { + fn supports(cmd: &Command) -> Result { + Ok(cmd.runner.starts_with("https://webc.org/runner/wcgi")) + } + + #[tracing::instrument(skip(self, ctx))] + fn run_(&mut self, command_name: &str, ctx: &RunnerContext<'_>) -> Result<(), Error> { + let module = self.load_module(ctx).context("Couldn't load the module")?; + + let handler = self.create_handler(module, ctx)?; + let task_manager = Arc::clone(&handler.task_manager); + + let make_service = hyper::service::make_service_fn(move |_| { + let handler = handler.clone(); + async { Ok::<_, Infallible>(handler) } + }); + + let address = self.config.addr; + tracing::info!(%address, "Starting the server"); + + task_manager + .block_on(async { hyper::Server::bind(&address).serve(make_service).await }) + .context("Unable to start the server")?; + + todo!(); + } +} + +impl WcgiRunner { + pub fn new(program_name: impl Into>) -> Self { + WcgiRunner { + program_name: program_name.into(), + config: Config::default(), + } + } + + pub fn config(&mut self) -> &mut Config { + &mut self.config + } + + fn load_module(&self, ctx: &RunnerContext<'_>) -> Result { + let wasi: webc::metadata::annotations::Wasi = ctx + .command() + .annotations + .get("wasi") + .cloned() + .and_then(|v| serde_cbor::value::from_value(v).ok()) + .context("Unable to retrieve the WASI metadata")?; + + let atom_name = &wasi.atom; + let atom = ctx + .get_atom(&atom_name) + .with_context(|| format!("Unable to retrieve the \"{atom_name}\" atom"))?; + + let module = ctx.compile(atom).context("Unable to compile the atom")?; + + Ok(module) + } + + fn create_handler(&self, module: Module, ctx: &RunnerContext<'_>) -> Result { + let mut env = HashMap::new(); + + if self.config.forward_host_env { + env.extend(std::env::vars()); + } + + env.extend(self.config.env.clone()); + + let webc::metadata::annotations::Wcgi { dialect, .. } = ctx + .command() + .annotations + .get("wcgi") + .cloned() + .and_then(|v| serde_cbor::value::from_value(v).ok()) + .context("No \"wcgi\" annotations associated with this command")?; + + let dialect = match dialect { + Some(d) => d.parse().context("Unable to parse the CGI dialect")?, + None => CgiDialect::Wcgi, + }; + + let handler = Handler { + program: Arc::clone(&self.program_name), + env: Arc::new(env), + args: self.config.args.clone().into(), + mapped_dirs: self.config.mapped_dirs.clone().into(), + task_manager: self + .config + .task_manager + .clone() + .unwrap_or_else(|| Arc::new(TokioTaskManager::default())), + module, + dialect, + }; + + Ok(handler) + } +} + +// TODO(Michael-F-Bryan): Pass this to Runner::run() as "&dyn RunnerContext" +// when we rewrite the "Runner" trait. +struct RunnerContext<'a> { + container: &'a WapmContainer, + command: &'a Command, + engine: Engine, + store: Store, +} + +#[allow(dead_code)] +impl RunnerContext<'_> { + fn command(&self) -> &Command { + self.command + } + + fn manifest(&self) -> &Manifest { + self.container.manifest() + } + + fn engine(&self) -> &Engine { + &self.engine + } + + fn store(&self) -> &Store { + &self.store + } + + fn volume(&self, _name: &str) -> Option> { + todo!("Implement a read-only filesystem backed by a volume"); + } + + fn get_atom(&self, name: &str) -> Option<&[u8]> { + self.container.get_atom(name) + } + + fn compile(&self, wasm: &[u8]) -> Result { + // TODO(Michael-F-Bryan): wire this up to wasmer-cache + Module::new(&self.engine, wasm).map_err(Error::from) + } +} + +impl crate::runners::Runner for WcgiRunner { + type Output = (); + + fn can_run_command(&self, _: &str, command: &Command) -> Result { + WcgiRunner::supports(command) + } + + fn run_command( + &mut self, + command_name: &str, + command: &Command, + container: &WapmContainer, + ) -> Result { + let store = Store::default(); + let ctx = RunnerContext { + container, + command, + engine: store.engine().clone(), + store, + }; + + self.run_(command_name, &ctx) + } +} + +#[derive(Debug)] +pub struct Config { + task_manager: Option>, + addr: SocketAddr, + args: Vec, + env: HashMap, + forward_host_env: bool, + mapped_dirs: Vec, +} + +impl Config { + pub fn task_manager(&mut self, task_manager: impl VirtualTaskManager) -> &mut Self { + self.task_manager = Some(Arc::new(task_manager)); + self + } + + pub fn addr(&mut self, addr: SocketAddr) -> &mut Self { + self.addr = addr; + self + } + + /// Add an argument to the WASI executable's command-line arguments. + pub fn arg(&mut self, arg: impl Into) -> &mut Self { + self.args.push(arg.into()); + self + } + + /// Add multiple arguments to the WASI executable's command-line arguments. + pub fn args(&mut self, args: A) -> &mut Self + where + A: IntoIterator, + S: Into, + { + self.args.extend(args.into_iter().map(|s| s.into())); + self + } + + /// Expose an environment variable to the guest. + pub fn env(&mut self, name: impl Into, value: impl Into) -> &mut Self { + self.env.insert(name.into(), value.into()); + self + } + + /// Expose multiple environment variables to the guest. + pub fn envs(&mut self, variables: I) -> &mut Self + where + I: IntoIterator, + K: Into, + V: Into, + { + self.env + .extend(variables.into_iter().map(|(k, v)| (k.into(), v.into()))); + self + } + + /// Forward all of the host's environment variables to the guest. + pub fn forward_host_env(&mut self) -> &mut Self { + self.forward_host_env = true; + self + } + + pub fn map_directory( + &mut self, + host: impl Into, + guest: impl Into, + ) -> &mut Self { + self.mapped_dirs.push(MappedDirectory { + host: host.into(), + guest: guest.into(), + }); + self + } +} + +impl Default for Config { + fn default() -> Self { + Self { + task_manager: None, + addr: ([127, 0, 0, 1], 8000).into(), + env: HashMap::new(), + forward_host_env: false, + mapped_dirs: Vec::new(), + args: Vec::new(), + } + } +}