Skip to content

Commit

Permalink
Assistant: Decodes streaming data
Browse files Browse the repository at this point in the history
This patch implements the decoding service for streaming data. The
underlining decoding method has been changed, therefore the decoded
output are in interleaved data format, not as AudioBuffer.

Bug: b/113131759, b/80315134
Test: manual.
Change-Id: I5b382caa33a1bd60a8b262847b76498280c3ed90
Reviewed-on: https://chromium-review.googlesource.com/1192685
Reviewed-by: Sam McNally <[email protected]>
Reviewed-by: Xiaohui Chen <[email protected]>
Commit-Queue: Tao Wu <[email protected]>
Cr-Commit-Position: refs/heads/master@{#587447}
  • Loading branch information
wutao authored and Commit Bot committed Aug 30, 2018
1 parent 247fb8a commit b7a9ec9
Show file tree
Hide file tree
Showing 11 changed files with 293 additions and 137 deletions.
2 changes: 2 additions & 0 deletions chromeos/services/assistant/audio_decoder/BUILD.gn
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ source_set("lib") {
"assistant_audio_decoder_factory.h",
"assistant_audio_decoder_service.cc",
"assistant_audio_decoder_service.h",
"ipc_data_source.cc",
"ipc_data_source.h",
]

deps = [
Expand Down
3 changes: 3 additions & 0 deletions chromeos/services/assistant/audio_decoder/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
This directory contains audio decoder service for the Chrome OS native Assistant
to decode the audio output by Libassistant, before connecting to AudioService.
We cannot use the standard media service, which does not have the demuxer.
167 changes: 85 additions & 82 deletions chromeos/services/assistant/audio_decoder/assistant_audio_decoder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,115 +4,118 @@

#include "chromeos/services/assistant/audio_decoder/assistant_audio_decoder.h"

#include "base/bind_helpers.h"
#include "media/base/audio_buffer.h"
#include "media/base/media_log.h"
#include "media/base/media_tracks.h"
#include "media/base/stream_parser_buffer.h"
#include "media/base/text_track_config.h"
#include "media/filters/ffmpeg_audio_decoder.h"
#include "media/formats/mpeg/mpeg1_audio_stream_parser.h"
#include "media/mojo/common/media_type_converters.h"
#include "media/mojo/interfaces/media_types.mojom.h"
#include "base/threading/thread.h"
#include "chromeos/services/assistant/audio_decoder/ipc_data_source.h"
#include "media/base/audio_bus.h"
#include "media/base/data_source.h"
#include "media/filters/audio_file_reader.h"
#include "media/filters/blocking_url_protocol.h"
#include "services/service_manager/public/cpp/service_context_ref.h"

namespace chromeos {
namespace assistant {

namespace {

// Preferred bytes per sample when get interleaved data from AudioBus.
constexpr int kBytesPerSample = 2;

void OnError(bool* succeeded) {
*succeeded = false;
}

} // namespace

AssistantAudioDecoder::AssistantAudioDecoder(
std::unique_ptr<service_manager::ServiceContextRef> service_ref,
mojom::AssistantAudioDecoderClientPtr client)
mojom::AssistantAudioDecoderClientPtr client,
mojom::AssistantMediaDataSourcePtr data_source)
: service_ref_(std::move(service_ref)),
client_(std::move(client)),
task_runner_(base::ThreadTaskRunnerHandle::Get()),
media_log_(std::make_unique<media::MediaLog>()),
parser_(std::make_unique<media::MPEG1AudioStreamParser>()),
decoder_(std::make_unique<media::FFmpegAudioDecoder>(task_runner_,
media_log_.get())) {
parser_->Init(
/*init_cb=*/base::DoNothing(),
base::BindRepeating(&AssistantAudioDecoder::OnNewConfigs,
base::Unretained(this)),
base::BindRepeating(&AssistantAudioDecoder::OnNewBuffers,
base::Unretained(this)),
/*ignore_text_tracks=*/true,
/*encrypted_media_init_data_cb=*/base::DoNothing(),
/*new_segment_cb=*/base::DoNothing(),
/*end_of_segment_cb=*/base::DoNothing(), media_log_.get());
media_thread_(std::make_unique<base::Thread>("media_thread")),
data_source_(std::make_unique<IPCDataSource>(std::move(data_source))) {
CHECK(media_thread_->Start());
}

AssistantAudioDecoder::~AssistantAudioDecoder() = default;

void AssistantAudioDecoder::Decode(const std::vector<uint8_t>& data) {
if (!parser_->Parse(data.data(), data.size()))
client_->OnDecoderError();
void AssistantAudioDecoder::Decode() {
media_thread_->task_runner()->PostTask(
FROM_HERE, base::BindOnce(&AssistantAudioDecoder::DecodeOnMediaThread,
base::Unretained(this)));
}

bool AssistantAudioDecoder::OnNewConfigs(
std::unique_ptr<media::MediaTracks> tracks,
const media::StreamParser::TextTrackConfigMap& text_configs) {
DCHECK(task_runner_->RunsTasksInCurrentSequence());
const media::StreamParser::TrackId track_id =
tracks->tracks().back()->bytestream_track_id();

// Initialize decoder.
const media::AudioDecoderConfig& config = tracks->getAudioConfig(track_id);
decoder_->Initialize(
config, /*cdm_context=*/nullptr,
base::BindRepeating(&AssistantAudioDecoder::OnDecoderInitialized,
base::Unretained(this), config),
base::BindRepeating(&AssistantAudioDecoder::OnDecodeOutput,
base::Unretained(this)),
/*waiting_for_decryption_key_cb=*/base::NullCallback());
return true;
void AssistantAudioDecoder::OpenDecoder(OpenDecoderCallback callback) {
media_thread_->task_runner()->PostTask(
FROM_HERE,
base::BindOnce(&AssistantAudioDecoder::OpenDecoderOnMediaThread,
base::Unretained(this), std::move(callback)));
}

bool AssistantAudioDecoder::OnNewBuffers(
const media::StreamParser::BufferQueueMap& buffer_queue_map) {
DCHECK(task_runner_->RunsTasksInCurrentSequence());
int new_buffer_size = 0;
for (const auto& queue_map : buffer_queue_map) {
new_buffer_size += queue_map.second.size();
for (const auto& buffer : queue_map.second)
parsed_buffers_.push_back(buffer);
void AssistantAudioDecoder::OpenDecoderOnMediaThread(
OpenDecoderCallback callback) {
bool read_ok = true;
protocol_ = std::make_unique<media::BlockingUrlProtocol>(
data_source_.get(), base::BindRepeating(&OnError, &read_ok));
decoder_ = std::make_unique<media::AudioFileReader>(protocol_.get());

if (!decoder_->Open() || !read_ok) {
task_runner_->PostTask(
FROM_HERE,
base::BindOnce(&AssistantAudioDecoder::OnDecoderErrorOnThread,
base::Unretained(this), std::move(callback)));
return;
}
client_->OnNewBuffers(new_buffer_size);
DecodeNow();
return true;
}

void AssistantAudioDecoder::OnDecoderInitialized(
media::AudioDecoderConfig config,
bool success) {
DCHECK(task_runner_->RunsTasksInCurrentSequence());
client_->OnDecoderInitialized(success, config.samples_per_second(),
config.channels());
task_runner_->PostTask(
FROM_HERE,
base::BindOnce(&AssistantAudioDecoder::OnDecoderInitializedOnThread,
base::Unretained(this), std::move(callback),
decoder_->sample_rate(), decoder_->channels()));
}

void AssistantAudioDecoder::OnDecodeOutput(
const scoped_refptr<media::AudioBuffer>& decoded) {
DCHECK(task_runner_->RunsTasksInCurrentSequence());
client_->OnBufferDecoded(media::mojom::AudioBuffer::From(decoded));
DecodeNow();
}
void AssistantAudioDecoder::DecodeOnMediaThread() {
std::vector<std::unique_ptr<media::AudioBus>> decoded_audio_packets;
// Experimental number of decoded packets before sending to |client_|.
constexpr int kPacketsToRead = 128;
decoder_->Read(&decoded_audio_packets, kPacketsToRead);

void AssistantAudioDecoder::OnDecodeStatus(media::DecodeStatus status) {
DCHECK(task_runner_->RunsTasksInCurrentSequence());
if (status != media::DecodeStatus::OK)
client_->OnDecoderError();
task_runner_->PostTask(
FROM_HERE,
base::BindOnce(&AssistantAudioDecoder::OnBufferDecodedOnThread,
base::Unretained(this), std::move(decoded_audio_packets)));
}

void AssistantAudioDecoder::DecodeNow() {
DCHECK(task_runner_->RunsTasksInCurrentSequence());
if (!parsed_buffers_.empty()) {
const auto& data = parsed_buffers_.front();
DCHECK_NE(data->timestamp(), media::kNoTimestamp);
void AssistantAudioDecoder::OnDecoderInitializedOnThread(
OpenDecoderCallback callback,
int sample_rate,
int channels) {
std::move(callback).Run(/*success=*/true, kBytesPerSample, sample_rate,
channels);
}

decoder_->Decode(data,
base::BindRepeating(&AssistantAudioDecoder::OnDecodeStatus,
base::Unretained(this)));
parsed_buffers_.pop_front();
void AssistantAudioDecoder::OnBufferDecodedOnThread(
const std::vector<std::unique_ptr<media::AudioBus>>&
decoded_audio_packets) {
std::vector<std::vector<uint8_t>> buffers;
for (const auto& audio_bus : decoded_audio_packets) {
const int bytes_to_alloc =
audio_bus->frames() * kBytesPerSample * audio_bus->channels();
std::vector<uint8_t> buffer(bytes_to_alloc);
audio_bus->ToInterleaved(audio_bus->frames(), kBytesPerSample,
buffer.data());
buffers.emplace_back(buffer);
}
client_->OnNewBuffers(buffers);
}

void AssistantAudioDecoder::OnDecoderErrorOnThread(
OpenDecoderCallback callback) {
std::move(callback).Run(/*success=*/false,
/*bytes_per_sample=*/0,
/*samples_per_second=*/0,
/*channels=*/0);
}

} // namespace assistant
Expand Down
52 changes: 24 additions & 28 deletions chromeos/services/assistant/audio_decoder/assistant_audio_decoder.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,14 @@

#include <memory>

#include "base/containers/circular_deque.h"
#include "base/macros.h"
#include "base/synchronization/lock.h"
#include "chromeos/services/assistant/public/mojom/assistant_audio_decoder.mojom.h"
#include "media/base/stream_parser.h"

namespace media {
class AudioBuffer;
class MediaLog;
class MediaTracks;
class MPEG1AudioStreamParser;
class FFmpegAudioDecoder;
class StreamParserBuffer;
class AudioFileReader;
class AudioBus;
class DataSource;
} // namespace media

namespace service_manager {
Expand All @@ -32,36 +28,36 @@ class AssistantAudioDecoder : public mojom::AssistantAudioDecoder {
public:
AssistantAudioDecoder(
std::unique_ptr<service_manager::ServiceContextRef> service_ref,
mojom::AssistantAudioDecoderClientPtr client);
mojom::AssistantAudioDecoderClientPtr client,
mojom::AssistantMediaDataSourcePtr data_source);
~AssistantAudioDecoder() override;

// Called by |client_| on main thread.
void Decode(const std::vector<uint8_t>& data) override;
void OpenDecoder(OpenDecoderCallback callback) override;
void Decode() override;

private:
// Called by |parser_| on main thread.
bool OnNewConfigs(
std::unique_ptr<media::MediaTracks> tracks,
const media::StreamParser::TextTrackConfigMap& text_configs);
bool OnNewBuffers(
const media::StreamParser::BufferQueueMap& buffer_queue_map);
// Calls |decoder_| to decode on media thread.
void OpenDecoderOnMediaThread(OpenDecoderCallback callback);
void DecodeOnMediaThread();

// Called by |decoder_| on main thread.
void OnDecoderInitialized(media::AudioDecoderConfig config, bool success);
void OnDecodeOutput(const scoped_refptr<media::AudioBuffer>& decoded);
void OnDecodeStatus(media::DecodeStatus status);

// Called by |OnNewBuffers()| and |OnDecodeOutput()| on main thread.
void DecodeNow();
// Calls |client_| methods on main thread.
void OnDecoderInitializedOnThread(OpenDecoderCallback callback,
int sample_rate,
int channels);
void OnDecoderErrorOnThread(OpenDecoderCallback callback);
void OnBufferDecodedOnThread(
const std::vector<std::unique_ptr<media::AudioBus>>&
decoded_audio_buffers);

const std::unique_ptr<service_manager::ServiceContextRef> service_ref_;
mojom::AssistantAudioDecoderClientPtr client_;
scoped_refptr<base::SingleThreadTaskRunner> task_runner_;
std::unique_ptr<media::MediaLog> media_log_;
std::unique_ptr<media::MPEG1AudioStreamParser> parser_;
std::unique_ptr<media::FFmpegAudioDecoder> decoder_;
base::circular_deque<scoped_refptr<media::StreamParserBuffer>>
parsed_buffers_;

std::unique_ptr<media::DataSource> data_source_;
std::unique_ptr<media::BlockingUrlProtocol> protocol_;
std::unique_ptr<media::AudioFileReader> decoder_;
std::unique_ptr<base::Thread> media_thread_;

DISALLOW_COPY_AND_ASSIGN(AssistantAudioDecoder);
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@ AssistantAudioDecoderFactory::~AssistantAudioDecoderFactory() = default;

void AssistantAudioDecoderFactory::CreateAssistantAudioDecoder(
mojom::AssistantAudioDecoderRequest request,
mojom::AssistantAudioDecoderClientPtr client) {
mojo::MakeStrongBinding(std::make_unique<AssistantAudioDecoder>(
service_ref_->Clone(), std::move(client)),
std::move(request));
mojom::AssistantAudioDecoderClientPtr client,
mojom::AssistantMediaDataSourcePtr data_source) {
mojo::MakeStrongBinding(
std::make_unique<AssistantAudioDecoder>(
service_ref_->Clone(), std::move(client), std::move(data_source)),
std::move(request));
}

} // namespace assistant
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ class AssistantAudioDecoderFactory
// mojom::AssistantAudioDecoderFactory:
void CreateAssistantAudioDecoder(
mojom::AssistantAudioDecoderRequest request,
mojom::AssistantAudioDecoderClientPtr client) override;
mojom::AssistantAudioDecoderClientPtr client,
mojom::AssistantMediaDataSourcePtr data_source) override;

const std::unique_ptr<service_manager::ServiceContextRef> service_ref_;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ void AssistantAudioDecoderService::OnStart() {
ref_factory_ = std::make_unique<service_manager::ServiceContextRefFactory>(
context()->CreateQuitClosure());
registry_.AddInterface(
base::BindRepeating(&OnAudioDecoderFactoryRequest, ref_factory_.get()));
base::BindRepeating(&OnAudioDecoderFactoryRequest, ref_factory_.get()),
base::ThreadTaskRunnerHandle::Get());
}

void AssistantAudioDecoderService::OnBindInterface(
Expand Down
Loading

0 comments on commit b7a9ec9

Please sign in to comment.