Skip to content

Commit

Permalink
Return success on PartialSuccess (#130)
Browse files Browse the repository at this point in the history
Described and tracked in
open-telemetry/opentelemetry-collector#9243.

The otelarrow export component copied this code from the core OTLP
exporter, so it has the same bug.
  • Loading branch information
jmacd authored Jan 9, 2024
1 parent 364a3f3 commit 01bd0e9
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 6 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- Remove two deprecated fields, both concurrent batch processor `max_in_flight_bytes`
and otelarrow receiver `memory_limit` fields have corresponding `_mib` field names
for consistency.
- OTel-Arrow exporter: Do not treat PartialSuccess as errors (see https://github.com/open-telemetry/opentelemetry-collector/issues/9243). []()

## [0.13.0](https://github.com/open-telemetry/otel-arrow/releases/tag/v0.13.0) - 2023-12-20

Expand Down
19 changes: 16 additions & 3 deletions collector/exporter/otelarrowexporter/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
arrowPkg "github.com/apache/arrow/go/v12/arrow"
arrowRecord "github.com/open-telemetry/otel-arrow/pkg/otel/arrow_record"
"go.uber.org/multierr"
"go.uber.org/zap"
"google.golang.org/genproto/googleapis/rpc/errdetails"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -201,7 +202,11 @@ func (e *baseExporter) pushTraces(ctx context.Context, td ptrace.Traces) error {
}
partialSuccess := resp.PartialSuccess()
if !(partialSuccess.ErrorMessage() == "" && partialSuccess.RejectedSpans() == 0) {
return consumererror.NewPermanent(fmt.Errorf("OTLP partial success: \"%s\" (%d rejected)", resp.PartialSuccess().ErrorMessage(), resp.PartialSuccess().RejectedSpans()))
// TODO: These should be counted, similar to dropped items.
e.settings.Logger.Warn("partial success",
zap.String("message", resp.PartialSuccess().ErrorMessage()),
zap.Int64("num_rejected", resp.PartialSuccess().RejectedSpans()),
)
}
return nil
}
Expand All @@ -219,7 +224,11 @@ func (e *baseExporter) pushMetrics(ctx context.Context, md pmetric.Metrics) erro
}
partialSuccess := resp.PartialSuccess()
if !(partialSuccess.ErrorMessage() == "" && partialSuccess.RejectedDataPoints() == 0) {
return consumererror.NewPermanent(fmt.Errorf("OTLP partial success: \"%s\" (%d rejected)", resp.PartialSuccess().ErrorMessage(), resp.PartialSuccess().RejectedDataPoints()))
// TODO: These should be counted, similar to dropped items.
e.settings.Logger.Warn("partial success",
zap.String("message", resp.PartialSuccess().ErrorMessage()),
zap.Int64("num_rejected", resp.PartialSuccess().RejectedDataPoints()),
)
}
return nil
}
Expand All @@ -237,7 +246,11 @@ func (e *baseExporter) pushLogs(ctx context.Context, ld plog.Logs) error {
}
partialSuccess := resp.PartialSuccess()
if !(partialSuccess.ErrorMessage() == "" && partialSuccess.RejectedLogRecords() == 0) {
return consumererror.NewPermanent(fmt.Errorf("OTLP partial success: \"%s\" (%d rejected)", resp.PartialSuccess().ErrorMessage(), resp.PartialSuccess().RejectedLogRecords()))
// TODO: These should be counted, similar to dropped items.
e.settings.Logger.Warn("partial success",
zap.String("message", resp.PartialSuccess().ErrorMessage()),
zap.Int64("num_rejected", resp.PartialSuccess().RejectedLogRecords()),
)
}
return nil
}
Expand Down
7 changes: 4 additions & 3 deletions collector/exporter/otelarrowexporter/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,8 +412,9 @@ func TestSendTraces(t *testing.T) {
// A request with 2 Trace entries.
td = testdata.GenerateTraces(2)

// PartialSuccess is not an error.
err = exp.ConsumeTraces(callCtx1, td)
assert.Error(t, err)
assert.NoError(t, err)
}

func TestSendTracesWhenEndpointHasHttpScheme(t *testing.T) {
Expand Down Expand Up @@ -590,7 +591,7 @@ func TestSendMetrics(t *testing.T) {

// Send two metrics.
md = testdata.GenerateMetrics(2)
assert.Error(t, exp.ConsumeMetrics(context.Background(), md))
assert.NoError(t, exp.ConsumeMetrics(context.Background(), md))
}

func TestSendTraceDataServerDownAndUp(t *testing.T) {
Expand Down Expand Up @@ -883,7 +884,7 @@ func TestSendLogData(t *testing.T) {
ld = testdata.GenerateLogs(2)

err = exp.ConsumeLogs(context.Background(), ld)
assert.Error(t, err)
assert.NoError(t, err)
}

// TestSendArrowTracesNotSupported tests a successful OTLP export w/
Expand Down

0 comments on commit 01bd0e9

Please sign in to comment.