Skip to content

Commit

Permalink
Implement max concurrency to configure legacy behavior (1) or otherwi…
Browse files Browse the repository at this point in the history
…se limit concurrency (#254)

This adds a `max_concurrency` setting. When set to 0, concurrency is
unlimited. When set to 1, behavior matches the legacy behavior of the
core batch processor. When set to a value >1, this enables the user to
artificially limit concurrency.
  • Loading branch information
jmacd authored Sep 25, 2024
1 parent 3f9c778 commit b78f1ba
Show file tree
Hide file tree
Showing 9 changed files with 146 additions and 1 deletion.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

## Unreleased

- Concurrent batch processor: concurrency limit for legacy behavior or otherwise. [#254](https://github.com/open-telemetry/otel-arrow/pull/254)
- Concurrent batch processor: EarlyReturn legacy compat feature. [#253](https://github.com/open-telemetry/otel-arrow/pull/253)
- Concurrent batch processor: Synchronize with upstream; removes in-flight bytes metric,
removes panic recovery as unnecessary divergence. [#251](https://github.com/open-telemetry/otel-arrow/pull/251)
Expand Down
26 changes: 26 additions & 0 deletions collector/processor/concurrentbatchprocessor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ ignored as data will be sent immediately, subject to only `send_batch_max_size`.
- `early_return` (default = false): When enabled, this pipeline component
will return immediate success to the caller after enqueuing the item
for eventual delivery.
- `max_concurrency` (default = unlimited): Controls the maximum number
of concurrent export calls made by this component. This is enforced
per batcher instance, as determined by `metadata_keys`. When the value
0 is configured, unlimited concurrency is allowed.

See notes about metadata batching below.

Expand Down Expand Up @@ -117,3 +121,25 @@ metadata-key values.

The number of batch processors currently in use is exported as the
`otelcol_processor_batch_metadata_cardinality` metric.

## Batching with error transmission

The use of unlimited concurrency is recommended for this component.

This component's legacy configuration had `max_concurrency` of 1 and
`early_return` set true. The use of `early_return` in the legacy
configuration prevented error transmission through this component.

When the exporterhelper `queue_sender` is disabled, which is also
necessary for error transmission, the result combined with
`max_concurrency` of 1 would be synchronous export behavior, meaning
that a new batch could not be formed until the preceding batch
completed its export. Setting `max_concurrency` to 0 for unlimited
concurrency is recommended because it works with all configurations of
the exporterhelper.

The use of unlimited concurrency should not be considered a risk,
because the actions of this processor take place after the associated
memory has been allocated. Users are expected to implement memory
limits using other means, possibly via the `memorylimiter` extension
or another form of admission control.
17 changes: 16 additions & 1 deletion collector/processor/concurrentbatchprocessor/batch_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
"golang.org/x/sync/semaphore"

"github.com/open-telemetry/otel-arrow/collector/processor/concurrentbatchprocessor/internal/metadata"
"go.opentelemetry.io/collector/client"
Expand Down Expand Up @@ -58,6 +59,10 @@ type batchProcessor struct {
// metadataLimit is the limiting size of the batchers map.
metadataLimit int

// sem controls the max_concurrency setting. this field is nil
// for unlimited concurrency.
sem *semaphore.Weighted

// earlyReturn is the value of Config.EarlyReturn.
earlyReturn bool

Expand Down Expand Up @@ -189,6 +194,10 @@ func newBatchProcessor(set processor.Settings, cfg *Config, batchFunc func() bat
tracer: set.TelemetrySettings.TracerProvider.Tracer(metadata.ScopeName),
}

if cfg.MaxConcurrency > 0 {
bp.sem = semaphore.NewWeighted(int64(cfg.MaxConcurrency))
}

asb := anyShardBatcher{processor: bp}
if len(bp.metadataKeys) == 0 {
bp.batcher = &singleShardBatcher{anyShardBatcher: asb}
Expand Down Expand Up @@ -374,10 +383,16 @@ func (b *shard) sendItems(trigger trigger) {

b.totalSent = numItemsAfter

if b.processor.sem != nil {
b.processor.sem.Acquire(context.Background(), 1)
}
b.processor.goroutines.Add(1)

go func() {
if b.processor.sem != nil {
defer b.processor.sem.Release(1)
}
defer b.processor.goroutines.Done()

var err error

var parentSpan trace.Span
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"errors"
"fmt"
"math"
"runtime"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -1755,6 +1756,97 @@ func TestErrorPropagation(t *testing.T) {
}
}

// concurrencyTracesSink orchestrates a test in which the concurrency
// limit is is repeatedly reached but never exceeded. The consumers
// are released when the limit is reached exactly.
type concurrencyTracesSink struct {
*testing.T
context.CancelFunc
consumertest.TracesSink

lock sync.Mutex
conc int
cnt int
grp *sync.WaitGroup
}

func newConcurrencyTracesSink(ctx context.Context, cancel context.CancelFunc, t *testing.T, conc int) *concurrencyTracesSink {
cts := &concurrencyTracesSink{
T: t,
CancelFunc: cancel,
conc: conc,
grp: &sync.WaitGroup{},
}
cts.grp.Add(1)
go func() {
for {
runtime.Gosched()
select {
case <-ctx.Done():
return
default:
}
cts.lock.Lock()
if cts.cnt == cts.conc {
cts.grp.Done()
cts.grp = &sync.WaitGroup{}
cts.grp.Add(1)
cts.cnt = 0
}
cts.lock.Unlock()
}
}()
return cts
}

func (cts *concurrencyTracesSink) ConsumeTraces(ctx context.Context, td ptrace.Traces) error {
cts.lock.Lock()
cts.cnt++
grp := cts.grp
if cts.cnt > cts.conc {
cts.Fatal("unexpected concurrency -- already at limit")
cts.CancelFunc()
}
cts.lock.Unlock()
grp.Wait()
return cts.TracesSink.ConsumeTraces(ctx, td)
}

func TestBatchProcessorConcurrency(t *testing.T) {
for _, conc := range []int{1, 2, 4, 10} {
t.Run(fmt.Sprint(conc), func(t *testing.T) {
bg, cancel := context.WithCancel(context.Background())
defer cancel()

sink := newConcurrencyTracesSink(bg, cancel, t, conc)
cfg := createDefaultConfig().(*Config)
cfg.MaxConcurrency = uint32(conc)
cfg.SendBatchSize = 100
cfg.Timeout = time.Minute
creationSet := processortest.NewNopSettings()
batcher, err := newBatchTracesProcessor(creationSet, sink, cfg)
require.NoError(t, err)
require.NoError(t, batcher.Start(bg, componenttest.NewNopHost()))

// requestCount has to be a multiple of concurrency for the
// concurrencyTracesSink mechanism, which releases requests when
// the maximum concurrent number is reached.
requestCount := 100 * conc
spansPerRequest := 100
var wg sync.WaitGroup
for requestNum := 0; requestNum < requestCount; requestNum++ {
td := testdata.GenerateTraces(spansPerRequest)
sendTraces(bg, t, batcher, &wg, td)
}

wg.Wait()
require.NoError(t, batcher.Shutdown(context.Background()))

require.Equal(t, requestCount*spansPerRequest, sink.SpanCount())
})
}
}

func TestBatchProcessorEarlyReturn(t *testing.T) {
bg := context.Background()
sink := new(consumertest.TracesSink)
Expand Down
6 changes: 6 additions & 0 deletions collector/processor/concurrentbatchprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,12 @@ type Config struct {
// combination of MetadataKeys.
MetadataCardinalityLimit uint32 `mapstructure:"metadata_cardinality_limit"`

// MaxConcurrency limits the number of concurrent export
// calls. The default value, 0, indicates unlimited
// concurrency. The value 1 (a legacy default), results in
// synchronous export behavior.
MaxConcurrency uint32 `mapstructure:"max_concurrency"`

// EarlyReturn dictates whether the batch processor will
// return success as soon as the data item has been accepted
// into a pending batch. When set, the return will be
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ func TestUnmarshalConfig(t *testing.T) {
SendBatchMaxSize: uint32(11000),
Timeout: time.Second * 10,
MetadataCardinalityLimit: 1000,
MaxConcurrency: 2,
EarlyReturn: true,
}, cfg)
}
Expand Down
1 change: 1 addition & 0 deletions collector/processor/concurrentbatchprocessor/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ require (
go.opentelemetry.io/otel/trace v1.30.0
go.uber.org/goleak v1.3.0
go.uber.org/zap v1.27.0
golang.org/x/sync v0.8.0
)

require (
Expand Down
2 changes: 2 additions & 0 deletions collector/processor/concurrentbatchprocessor/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ golang.org/x/net v0.28.0/go.mod h1:yqtgsTWOOnlGLG9GFRrK3++bGOUEkNBoHZc8MEDWPNg=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ=
golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
timeout: 10s
send_batch_size: 10000
send_batch_max_size: 11000
max_concurrency: 2
early_return: true

0 comments on commit b78f1ba

Please sign in to comment.