Skip to content

Commit

Permalink
Bug 1724141 - Update audioipc to 8bb1a227. r=chunmin
Browse files Browse the repository at this point in the history
  • Loading branch information
kinetiknz committed Aug 10, 2021
1 parent 83fb89d commit 683c531
Show file tree
Hide file tree
Showing 21 changed files with 687 additions and 860 deletions.
2 changes: 1 addition & 1 deletion .cargo/config.in
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ rev = "8ae48c16b637d6d9a841cf4d01de06c16b1318b0"
[source."https://github.com/mozilla/audioipc-2"]
git = "https://github.com/mozilla/audioipc-2"
replace-with = "vendored-sources"
rev = "7537bfadad2e981577eb75e4f13662fc517e1a09"
rev = "8bb1a227fbaa5677458bcd876162b65307df38c2"

[source."https://github.com/mozilla/application-services"]
git = "https://github.com/mozilla/application-services"
Expand Down
6 changes: 3 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

29 changes: 23 additions & 6 deletions dom/media/CubebUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@

#define AUDIOIPC_POOL_SIZE_DEFAULT 1
#define AUDIOIPC_STACK_SIZE_DEFAULT (64 * 4096)
// See also: https://github.com/mozilla/audioipc-2/issues/124
#define AUDIOIPC_SHM_AREA_SIZE_DEFAULT (512 * 4096)

#define PREF_VOLUME_SCALE "media.volume_scale"
#define PREF_CUBEB_BACKEND "media.cubeb.backend"
Expand All @@ -59,6 +61,7 @@
#define PREF_CUBEB_SANDBOX "media.cubeb.sandbox"
#define PREF_AUDIOIPC_POOL_SIZE "media.audioipc.pool_size"
#define PREF_AUDIOIPC_STACK_SIZE "media.audioipc.stack_size"
#define PREF_AUDIOIPC_SHM_AREA_SIZE "media.audioipc.shm_area_size"

#if (defined(XP_LINUX) && !defined(MOZ_WIDGET_ANDROID)) || \
defined(XP_MACOSX) || (defined(XP_WIN) && !defined(_ARM64_))
Expand Down Expand Up @@ -108,6 +111,7 @@ bool sRouteOutputAsVoice = false;
bool sCubebSandbox = false;
size_t sAudioIPCPoolSize;
size_t sAudioIPCStackSize;
size_t sAudioIPCShmAreaSize;
#endif
StaticAutoPtr<char> sBrandName;
StaticAutoPtr<char> sCubebBackendName;
Expand Down Expand Up @@ -269,6 +273,10 @@ void PrefChanged(const char* aPref, void* aClosure) {
StaticMutexAutoLock lock(sMutex);
sAudioIPCStackSize = Preferences::GetUint(PREF_AUDIOIPC_STACK_SIZE,
AUDIOIPC_STACK_SIZE_DEFAULT);
} else if (strcmp(aPref, PREF_AUDIOIPC_SHM_AREA_SIZE) == 0) {
StaticMutexAutoLock lock(sMutex);
sAudioIPCShmAreaSize = Preferences::GetUint(PREF_AUDIOIPC_SHM_AREA_SIZE,
AUDIOIPC_SHM_AREA_SIZE_DEFAULT);
}
#endif
else if (strcmp(aPref, PREF_CUBEB_OUTPUT_VOICE_ROUTING) == 0) {
Expand Down Expand Up @@ -409,6 +417,7 @@ void InitAudioIPCConnection() {

ipc::FileDescriptor CreateAudioIPCConnection() {
#ifdef MOZ_CUBEB_REMOTING
StaticMutexAutoLock lock(sMutex);
MOZ_ASSERT(sCubebSandbox && XRE_IsParentProcess());
if (!sServerHandle) {
MOZ_LOG(gCubebLog, LogLevel::Debug, ("Starting cubeb server..."));
Expand All @@ -417,9 +426,11 @@ ipc::FileDescriptor CreateAudioIPCConnection() {
return ipc::FileDescriptor();
}
}
MOZ_LOG(gCubebLog, LogLevel::Debug,
("%s: %d", PREF_AUDIOIPC_SHM_AREA_SIZE, (int)sAudioIPCShmAreaSize));
MOZ_ASSERT(sServerHandle);
ipc::FileDescriptor::PlatformHandleType rawFD =
audioipc::audioipc_server_new_client(sServerHandle);
audioipc::audioipc_server_new_client(sServerHandle, sAudioIPCShmAreaSize);
ipc::FileDescriptor fd(rawFD);
if (!fd.IsValid()) {
MOZ_LOG(gCubebLog, LogLevel::Error, ("audioipc_server_new_client failed"));
Expand Down Expand Up @@ -582,11 +593,17 @@ uint32_t GetCubebMTGLatencyInFrames(cubeb_stream_params* params) {
}

static const char* gInitCallbackPrefs[] = {
PREF_VOLUME_SCALE, PREF_CUBEB_OUTPUT_DEVICE,
PREF_CUBEB_LATENCY_PLAYBACK, PREF_CUBEB_LATENCY_MTG,
PREF_CUBEB_BACKEND, PREF_CUBEB_FORCE_NULL_CONTEXT,
PREF_CUBEB_SANDBOX, PREF_AUDIOIPC_POOL_SIZE,
PREF_AUDIOIPC_STACK_SIZE, nullptr,
PREF_VOLUME_SCALE,
PREF_CUBEB_OUTPUT_DEVICE,
PREF_CUBEB_LATENCY_PLAYBACK,
PREF_CUBEB_LATENCY_MTG,
PREF_CUBEB_BACKEND,
PREF_CUBEB_FORCE_NULL_CONTEXT,
PREF_CUBEB_SANDBOX,
PREF_AUDIOIPC_POOL_SIZE,
PREF_AUDIOIPC_STACK_SIZE,
PREF_AUDIOIPC_SHM_AREA_SIZE,
nullptr,
};
static const char* gCallbackPrefs[] = {
PREF_CUBEB_FORCE_SAMPLE_RATE,
Expand Down
2 changes: 1 addition & 1 deletion third_party/rust/audioipc-client/.cargo-checksum.json
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"files":{"Cargo.toml":"4020e8c4119327dac49b47391c902eb69bb927c9e7d05f5882ad9e84cff4ec5e","cbindgen.toml":"bd89c5a9f52395b1c703ff04d1c0019dc3c92b691d571ae503c4b85753a44a39","src/context.rs":"7388ee487bc25bbdf5cc6475dfa78674bdd9f49c92968c1b0a8047cdd023476c","src/lib.rs":"7755001c8caf6899ca5ed00a517d7bf1b6425fe17157a97037dae619af567fc9","src/send_recv.rs":"450bdb1d8a346634c0237f2081b424d11e2c19ad81670009303f8a03b3bfb196","src/stream.rs":"a7b32be48f67ef6b7fbcf61b25e02cab1e961ef3146c3ace2f16ced9dab2953e"},"package":null}
{"files":{"Cargo.toml":"4020e8c4119327dac49b47391c902eb69bb927c9e7d05f5882ad9e84cff4ec5e","cbindgen.toml":"bd89c5a9f52395b1c703ff04d1c0019dc3c92b691d571ae503c4b85753a44a39","src/context.rs":"89e2929aa2ba4bddcec6dd4a4511f528c681b7717adbaa7b7df4378c8e3c4d1c","src/lib.rs":"7755001c8caf6899ca5ed00a517d7bf1b6425fe17157a97037dae619af567fc9","src/send_recv.rs":"450bdb1d8a346634c0237f2081b424d11e2c19ad81670009303f8a03b3bfb196","src/stream.rs":"1fb24d5cf51305408068eebee0e367b4f9786f27665c7dba47128defdfadbf5c"},"package":null}
25 changes: 11 additions & 14 deletions third_party/rust/audioipc-client/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@ use audio_thread_priority::get_current_thread_info;
#[cfg(not(target_os = "linux"))]
use audio_thread_priority::promote_current_thread_to_real_time;
use audioipc::codec::LengthDelimitedCodec;
use audioipc::frame::{framed, Framed};
use audioipc::platformhandle_passing::{framed_with_platformhandles, FramedWithPlatformHandles};
use audioipc::framing::{framed, Framed};
use audioipc::{core, rpc};
use audioipc::{
messages, messages::DeviceCollectionReq, messages::DeviceCollectionResp, ClientMessage,
Expand All @@ -38,10 +37,8 @@ struct CubebClient;
impl rpc::Client for CubebClient {
type Request = ServerMessage;
type Response = ClientMessage;
type Transport = FramedWithPlatformHandles<
audioipc::AsyncMessageStream,
LengthDelimitedCodec<Self::Request, Self::Response>,
>;
type Transport =
Framed<audioipc::AsyncMessageStream, LengthDelimitedCodec<Self::Request, Self::Response>>;
}

pub const CLIENT_OPS: Ops = capi_new!(ClientContext, ClientStream);
Expand Down Expand Up @@ -190,7 +187,7 @@ impl ContextOps for ClientContext {
stream: audioipc::AsyncMessageStream,
tx_rpc: &mpsc::Sender<rpc::ClientProxy<ServerMessage, ClientMessage>>,
) {
let transport = framed_with_platformhandles(stream, Default::default());
let transport = framed(stream, Default::default());
let rpc = rpc::bind_client::<CubebClient>(transport);
// If send fails then the rx end has closed
// which is unlikely here.
Expand All @@ -206,7 +203,7 @@ impl ContextOps for ClientContext {
let thread_destroy_callback = params.thread_destroy_callback;

let server_stream =
unsafe { audioipc::MessageStream::from_raw_fd(params.server_connection) };
unsafe { audioipc::MessageStream::from_raw_handle(params.server_connection) };

let core = core::spawn_thread(
"AudioIPC Client RPC",
Expand Down Expand Up @@ -368,15 +365,15 @@ impl ContextOps for ClientContext {
assert_not_in_callback();

if !self.device_collection_rpc {
let fds = send_recv!(self.rpc(),
let mut fd = send_recv!(self.rpc(),
ContextSetupDeviceCollectionCallback =>
ContextSetupDeviceCollectionCallback())?;

// TODO: The lowest comms layer expects exactly 3 PlatformHandles, but we only
// need one here. The server sent two dummy valid handles, ignore those (closed on drop)
// and use the one we need.
let stream =
unsafe { audioipc::MessageStream::from_raw_fd(fds.platform_handles[0].into_raw()) };
let stream = unsafe {
audioipc::MessageStream::from_raw_handle(
fd.platform_handle.take_handle().into_raw(),
)
};

let server = DeviceCollectionServer {
input_device_callback: self.input_device_callback.clone(),
Expand Down
149 changes: 96 additions & 53 deletions third_party/rust/audioipc-client/src/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@

use crate::ClientContext;
use crate::{assert_not_in_callback, run_in_callback};
use audioipc::frame::{framed, Framed};
use audioipc::messages::{self, CallbackReq, CallbackResp, ClientMessage, ServerMessage};
use audioipc::rpc;
use audioipc::shm::SharedMem;
use audioipc::{codec::LengthDelimitedCodec, messages::StreamCreateParams};
use audioipc::{
framing::{framed, Framed},
messages::{self, CallbackReq, CallbackResp, ClientMessage, ServerMessage},
};
use cubeb_backend::{ffi, DeviceRef, Error, Result, Stream, StreamOps};
use futures::Future;
use futures_cpupool::{CpuFuture, CpuPool};
Expand Down Expand Up @@ -50,9 +52,17 @@ pub struct ClientStream<'ctx> {
shutdown_rx: mpsc::Receiver<()>,
}

#[derive(Copy, Clone, Debug, PartialEq)]
enum StreamDirection {
Input,
Output,
Duplex,
}

struct CallbackServer {
input_shm: Option<SharedMem>,
output_shm: Option<SharedMem>,
dir: StreamDirection,
shm: Option<SharedMem>,
duplex_input: Option<Vec<u8>>,
data_cb: ffi::cubeb_data_callback,
state_cb: ffi::cubeb_state_callback,
user_ptr: usize,
Expand Down Expand Up @@ -83,34 +93,68 @@ impl rpc::Server for CallbackServer {
output_frame_size,
);

let input_nbytes = nframes as usize * input_frame_size;
let output_nbytes = nframes as usize * output_frame_size;

// Clone values that need to be moved into the cpu pool thread.
let input_shm = unsafe { self.input_shm.as_ref().map(|shm| shm.unsafe_view()) };
let output_shm = unsafe { self.output_shm.as_ref().map(|shm| shm.unsafe_view()) };
let mut shm = unsafe { self.shm.as_ref().unwrap().unsafe_view() };

let duplex_copy_ptr = match &mut self.duplex_input {
Some(buf) => {
assert_eq!(self.dir, StreamDirection::Duplex);
assert!(input_frame_size > 0);
assert!(buf.capacity() >= input_nbytes);
buf.as_mut_ptr()
}
None => ptr::null_mut(),
} as usize;
let user_ptr = self.user_ptr;
let cb = self.data_cb.unwrap();
let dir = self.dir;

self.cpu_pool.spawn_fn(move || {
let input_ptr = match input_shm {
Some(shm) => unsafe {
shm.get_slice(nframes as usize * input_frame_size)
.unwrap()
.as_ptr()
// Input and output reuse the same shmem backing. Unfortunately, cubeb's data_callback isn't
// specified in such a way that would require the callee to consume all of the input before
// writing to the output (i.e., it is passed as two pointers that aren't expected to alias).
// That means we need to copy the input here.
let (input_ptr, output_ptr) = match dir {
StreamDirection::Duplex => unsafe {
assert!(input_frame_size > 0);
assert!(output_frame_size > 0);
assert_ne!(duplex_copy_ptr, 0);
let input = shm.get_slice(input_nbytes).unwrap();
ptr::copy_nonoverlapping(
input.as_ptr(),
duplex_copy_ptr as *mut _,
input.len(),
);
(
duplex_copy_ptr as _,
shm.get_mut_slice(output_nbytes).unwrap().as_mut_ptr(),
)
},
None => ptr::null(),
};
let output_ptr = match output_shm {
Some(mut shm) => unsafe {
shm.get_mut_slice(nframes as usize * output_frame_size)
.unwrap()
.as_mut_ptr()
StreamDirection::Input => unsafe {
assert!(input_frame_size > 0);
assert_eq!(output_frame_size, 0);
(
shm.get_slice(input_nbytes).unwrap().as_ptr(),
ptr::null_mut(),
)
},
StreamDirection::Output => unsafe {
assert!(output_frame_size > 0);
assert_eq!(input_frame_size, 0);
(
ptr::null(),
shm.get_mut_slice(output_nbytes).unwrap().as_mut_ptr(),
)
},
None => ptr::null(),
};

run_in_callback(|| {
let nframes = unsafe {
cb(
ptr::null_mut(),
ptr::null_mut(), // https://github.com/kinetiknz/cubeb/issues/518
user_ptr as *mut c_void,
input_ptr as *const _,
output_ptr as *mut _,
Expand Down Expand Up @@ -152,6 +196,20 @@ impl rpc::Server for CallbackServer {
Ok(CallbackResp::DeviceChange)
})
}
CallbackReq::SharedMem(mut handle, shm_area_size) => {
let shm = unsafe {
SharedMem::from(handle.take_handle(), shm_area_size)
.expect("Client failed to set up shmem")
};
self.shm = Some(shm);

self.duplex_input = if let StreamDirection::Duplex = self.dir {
Some(Vec::with_capacity(shm_area_size))
} else {
None
};
self.cpu_pool.spawn_fn(move || Ok(CallbackResp::SharedMem))
}
}
}
}
Expand All @@ -171,41 +229,15 @@ impl<'ctx> ClientStream<'ctx> {
input_stream_params: init_params.input_stream_params,
output_stream_params: init_params.output_stream_params,
};
let data = send_recv!(rpc, StreamCreate(create_params) => StreamCreated())?;
let mut data = send_recv!(rpc, StreamCreate(create_params) => StreamCreated())?;

debug!(
"token = {}, handles = {:?}",
data.token, data.platform_handles
"token = {}, handle = {:?}",
data.token, data.platform_handle
);

let has_input = init_params.input_stream_params.is_some();
let has_output = init_params.output_stream_params.is_some();

let stream =
unsafe { audioipc::MessageStream::from_raw_fd(data.platform_handles[0].into_raw()) };

let input_shm = if has_input {
match unsafe { SharedMem::from(&data.platform_handles[1], audioipc::SHM_AREA_SIZE) } {
Ok(shm) => Some(shm),
Err(e) => {
debug!("Client failed to set up input shmem: {}", e);
return Err(Error::error());
}
}
} else {
None
};

let output_shm = if has_output {
match unsafe { SharedMem::from(&data.platform_handles[2], audioipc::SHM_AREA_SIZE) } {
Ok(shm) => Some(shm),
Err(e) => {
debug!("Client failed to set up output shmem: {}", e);
return Err(Error::error());
}
}
} else {
None
let stream = unsafe {
audioipc::MessageStream::from_raw_handle(data.platform_handle.take_handle().into_raw())
};

let user_data = user_ptr as usize;
Expand All @@ -217,9 +249,20 @@ impl<'ctx> ClientStream<'ctx> {

let (_shutdown_tx, shutdown_rx) = mpsc::channel();

let dir = match (
init_params.input_stream_params,
init_params.output_stream_params,
) {
(Some(_), Some(_)) => StreamDirection::Duplex,
(Some(_), None) => StreamDirection::Input,
(None, Some(_)) => StreamDirection::Output,
(None, None) => unreachable!(),
};

let server = CallbackServer {
input_shm,
output_shm,
dir,
shm: None,
duplex_input: None,
data_cb: data_callback,
state_cb: state_callback,
user_ptr: user_data,
Expand Down
Loading

0 comments on commit 683c531

Please sign in to comment.