Skip to content
This repository has been archived by the owner on Jul 31, 2023. It is now read-only.

Commit

Permalink
Make trace export interval and batch size configurable at runtime. (#421
Browse files Browse the repository at this point in the history
)

Fixes #420.
  • Loading branch information
g-easy authored Nov 29, 2019
1 parent 67c7c1a commit eabb06a
Show file tree
Hide file tree
Showing 6 changed files with 78 additions and 29 deletions.
4 changes: 4 additions & 0 deletions opencensus/trace/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ opencensus_lib(
absl::time
absl::span)

# Define NOMINMAX to fix build errors when compiling with MSVC.
target_compile_definitions(opencensus_trace PUBLIC
$<$<CXX_COMPILER_ID:MSVC>:NOMINMAX>)

opencensus_lib(
trace_b3
PUBLIC
Expand Down
18 changes: 18 additions & 0 deletions opencensus/trace/exporter/span_exporter.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <memory>
#include <vector>

#include "absl/time/time.h"
#include "opencensus/trace/exporter/span_data.h"

namespace opencensus {
Expand All @@ -27,6 +28,22 @@ namespace exporter {
// SpanExporter allows Exporters to register. Thread-safe.
class SpanExporter final {
public:
// Sets the batch size when exporting traces. Takes effect after the next
// batch starts. This is not a strict limit, the generated batch may be
// slightly larger. If the interval expires before the batch fills up, the
// batch will be smaller.
//
// Warning: this API may be removed in future, in favor of configuring this
// per-exporter.
static void SetBatchSize(int size);

// Sets the interval between exporting batches of traces. Takes effect after
// the next batch starts.
//
// Warning: this API may be removed in future, in favor of configuring this
// per-exporter.
static void SetInterval(absl::Duration interval);

// Handlers allow different tracing services to export recorded data for
// sampled spans in their own format. Every exporter must provide a static
// Register() method that takes any arguments needed by the exporter (e.g. a
Expand All @@ -41,6 +58,7 @@ class SpanExporter final {
static void RegisterHandler(std::unique_ptr<Handler> handler);

private:
SpanExporter() = delete;
friend class SpanExporterTestPeer;

// Forces an export, only for testing purposes.
Expand Down
11 changes: 11 additions & 0 deletions opencensus/trace/internal/span_exporter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,23 @@
#include <memory>
#include <utility>

#include "absl/time/time.h"
#include "opencensus/trace/internal/span_exporter_impl.h"

namespace opencensus {
namespace trace {
namespace exporter {

// static
void SpanExporter::SetBatchSize(int size) {
SpanExporterImpl::Get()->SetBatchSize(size);
}

// static
void SpanExporter::SetInterval(absl::Duration interval) {
SpanExporterImpl::Get()->SetInterval(interval);
}

// static
void SpanExporter::RegisterHandler(std::unique_ptr<Handler> handler) {
SpanExporterImpl::Get()->RegisterHandler(std::move(handler));
Expand Down
51 changes: 31 additions & 20 deletions opencensus/trace/internal/span_exporter_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#include "opencensus/trace/internal/span_exporter_impl.h"

#include <algorithm>
#include <utility>

#include "absl/synchronization/mutex.h"
Expand All @@ -24,17 +25,20 @@ namespace opencensus {
namespace trace {
namespace exporter {

SpanExporterImpl* SpanExporterImpl::span_exporter_ = nullptr;

SpanExporterImpl* SpanExporterImpl::Get() {
static SpanExporterImpl* global_span_exporter_impl = new SpanExporterImpl(
kDefaultBufferSize, absl::Milliseconds(kIntervalWaitTimeInMillis));
static SpanExporterImpl* global_span_exporter_impl = new SpanExporterImpl;
return global_span_exporter_impl;
}

SpanExporterImpl::SpanExporterImpl(uint32_t buffer_size,
absl::Duration interval)
: buffer_size_(buffer_size), interval_(interval) {}
void SpanExporterImpl::SetBatchSize(int size) {
absl::MutexLock l(&handler_mu_);
batch_size_ = std::max(1, size);
}

void SpanExporterImpl::SetInterval(absl::Duration interval) {
absl::MutexLock l(&handler_mu_);
interval_ = std::max(absl::Seconds(1), interval);
}

void SpanExporterImpl::RegisterHandler(
std::unique_ptr<SpanExporter::Handler> handler) {
Expand All @@ -59,36 +63,43 @@ void SpanExporterImpl::StartExportThread() {
collect_spans_ = true;
}

bool SpanExporterImpl::IsBufferFull() const {
bool SpanExporterImpl::IsBatchFull() const {
span_mu_.AssertHeld();
return spans_.size() >= buffer_size_;
return spans_.size() >= cached_batch_size_;
}

void SpanExporterImpl::RunWorkerLoop() {
std::vector<opencensus::trace::exporter::SpanData> span_data_;
std::vector<std::shared_ptr<opencensus::trace::SpanImpl>> batch_;
std::vector<opencensus::trace::exporter::SpanData> span_data;
std::vector<std::shared_ptr<opencensus::trace::SpanImpl>> batch;
// Thread loops forever.
// TODO: Add in shutdown mechanism.
absl::Time next_forced_export_time = absl::Now() + interval_;
while (true) {
int size;
absl::Time next_forced_export_time;
{
// Start of loop, update batch size and interval.
absl::MutexLock l(&handler_mu_);
size = batch_size_;
next_forced_export_time = absl::Now() + interval_;
}
{
absl::MutexLock l(&span_mu_);
cached_batch_size_ = size;
// Wait until batch is full or interval time has been exceeded.
span_mu_.AwaitWithDeadline(
absl::Condition(this, &SpanExporterImpl::IsBufferFull),
absl::Condition(this, &SpanExporterImpl::IsBatchFull),
next_forced_export_time);
next_forced_export_time = absl::Now() + interval_;
if (spans_.empty()) {
continue;
}
std::swap(batch_, spans_);
std::swap(batch, spans_);
}
for (const auto& span : batch_) {
span_data_.emplace_back(span->ToSpanData());
for (const auto& span : batch) {
span_data.emplace_back(span->ToSpanData());
}
batch_.clear();
Export(span_data_);
span_data_.clear();
batch.clear();
Export(span_data);
span_data.clear();
}
}

Expand Down
19 changes: 10 additions & 9 deletions opencensus/trace/internal/span_exporter_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ class SpanExporterImpl {
// Returns the global instance of SpanExporterImpl.
static SpanExporterImpl* Get();

void SetBatchSize(int size);
void SetInterval(absl::Duration interval);

// A shared_ptr to the span is added to a list. The actual conversion to
// SpanData will take place at a later time via the background thread. This
// is intended to be called at the Span::End().
Expand All @@ -53,11 +56,8 @@ class SpanExporterImpl {
// initialization.
void RegisterHandler(std::unique_ptr<SpanExporter::Handler> handler);

static constexpr uint32_t kDefaultBufferSize = 64;
static constexpr uint32_t kIntervalWaitTimeInMillis = 5000;

private:
SpanExporterImpl(uint32_t buffer_size, absl::Duration interval);
SpanExporterImpl() = default;
SpanExporterImpl(const SpanExporterImpl&) = delete;
SpanExporterImpl(SpanExporterImpl&&) = delete;
SpanExporterImpl& operator=(const SpanExporterImpl&) = delete;
Expand All @@ -75,14 +75,15 @@ class SpanExporterImpl {
// returns when complete.
void ExportForTesting();

// Returns true if the spans_ buffer has filled up.
bool IsBufferFull() const;
// Returns true if the spans_ batch is full.
bool IsBatchFull() const;

static SpanExporterImpl* span_exporter_;
const uint32_t buffer_size_;
const absl::Duration interval_;
mutable absl::Mutex span_mu_;
mutable absl::Mutex handler_mu_;
int batch_size_ GUARDED_BY(handler_mu_) = 64;
absl::Duration interval_ GUARDED_BY(handler_mu_) = absl::Seconds(5);
// Updated in RunWorkerLoop and protected by span_mu_ instead of handler_mu_.
int cached_batch_size_ GUARDED_BY(span_mu_);
std::vector<std::shared_ptr<opencensus::trace::SpanImpl>> spans_
GUARDED_BY(span_mu_);
std::vector<std::unique_ptr<SpanExporter::Handler>> handlers_
Expand Down
4 changes: 4 additions & 0 deletions opencensus/trace/internal/span_exporter_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "absl/memory/memory.h"
#include "absl/synchronization/mutex.h"
#include "absl/time/clock.h"
#include "absl/time/time.h"
#include "gtest/gtest.h"
#include "opencensus/trace/exporter/span_data.h"
#include "opencensus/trace/sampler.h"
Expand Down Expand Up @@ -63,6 +64,9 @@ class MyExporter : public exporter::SpanExporter::Handler {
class SpanExporterTest : public ::testing::Test {
protected:
static void SetUpTestSuite() {
exporter::SpanExporter::SetBatchSize(1);
exporter::SpanExporter::SetInterval(absl::Seconds(1));

// Only register once.
MyExporter::Register();
}
Expand Down

0 comments on commit eabb06a

Please sign in to comment.