Skip to content
This repository has been archived by the owner on May 23, 2024. It is now read-only.

Commit

Permalink
Update udpTransport to only send Jaeger.thrift (#147)
Browse files Browse the repository at this point in the history
  • Loading branch information
black-adder authored Jun 2, 2017
1 parent bcb0e9e commit fda19be
Show file tree
Hide file tree
Showing 17 changed files with 527 additions and 222 deletions.
6 changes: 2 additions & 4 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@ import (

"github.com/uber/jaeger-client-go"
"github.com/uber/jaeger-client-go/rpcmetrics"
"github.com/uber/jaeger-client-go/transport"
"github.com/uber/jaeger-client-go/transport/udp"
)

const defaultSamplingProbability = 0.001
Expand Down Expand Up @@ -245,6 +243,6 @@ func (rc *ReporterConfig) NewReporter(
return reporter, err
}

func (rc *ReporterConfig) newTransport() (transport.Transport, error) {
return udp.NewUDPTransport(rc.LocalAgentHostPort, 0)
func (rc *ReporterConfig) newTransport() (jaeger.Transport, error) {
return jaeger.NewUDPTransport(rc.LocalAgentHostPort, 0)
}
4 changes: 2 additions & 2 deletions jaeger_span.go → jaeger_thrift_span.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ import (
"github.com/uber/jaeger-client-go/utils"
)

// buildJaegerSpan builds jaeger span based on internal span.
func buildJaegerSpan(span *Span) *j.Span {
// BuildJaegerThrift builds jaeger span based on internal span.
func BuildJaegerThrift(span *Span) *j.Span {
startTime := utils.TimeToMicrosecondsSinceEpochInt64(span.startTime)
duration := span.duration.Nanoseconds() / int64(time.Microsecond)
jaegerSpan := &j.Span{
Expand Down
10 changes: 5 additions & 5 deletions jaeger_span_test.go → jaeger_thrift_span_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ var (
someSliceString = "[a]"
)

func TestBuildJaegerSpan(t *testing.T) {
func TestBuildJaegerThrift(t *testing.T) {
tracer, closer := NewTracer("DOOP",
NewConstSampler(true),
NewNullReporter())
Expand All @@ -60,8 +60,8 @@ func TestBuildJaegerSpan(t *testing.T) {
sp2.Finish()
sp1.Finish()

jaegerSpan1 := buildJaegerSpan(sp1)
jaegerSpan2 := buildJaegerSpan(sp2)
jaegerSpan1 := BuildJaegerThrift(sp1)
jaegerSpan2 := BuildJaegerThrift(sp2)
assert.Equal(t, "sp1", jaegerSpan1.OperationName)
assert.Equal(t, "sp2", jaegerSpan2.OperationName)
assert.EqualValues(t, 0, jaegerSpan1.ParentSpanId)
Expand Down Expand Up @@ -235,7 +235,7 @@ func TestBuildLogs(t *testing.T) {
} else if test.field != (log.Field{}) {
sp.LogFields(test.field)
}
jaegerSpan := buildJaegerSpan(sp.(*Span))
jaegerSpan := BuildJaegerThrift(sp.(*Span))
if test.disableSampling {
assert.Equal(t, 0, len(jaegerSpan.Logs), testName)
continue
Expand Down Expand Up @@ -307,7 +307,7 @@ func TestJaegerSpanBaggageLogs(t *testing.T) {
ext.SpanKindRPCServer.Set(sp)
sp.Finish()

jaegerSpan := buildJaegerSpan(sp)
jaegerSpan := BuildJaegerThrift(sp)
require.Len(t, jaegerSpan.Logs, 1)
fields := jaegerSpan.Logs[0].Fields
require.Len(t, fields, 3)
Expand Down
13 changes: 5 additions & 8 deletions reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@ import (
"github.com/opentracing/opentracing-go"

"github.com/uber/jaeger-client-go/log"
"github.com/uber/jaeger-client-go/thrift-gen/zipkincore"
"github.com/uber/jaeger-client-go/transport"
)

// Reporter is called by the tracer when a span is completed to report the span to the tracing collector.
Expand Down Expand Up @@ -166,15 +164,15 @@ const (

type remoteReporter struct {
reporterOptions
sender transport.Transport
queue chan *zipkincore.Span
sender Transport
queue chan *Span
queueLength int64 // signed because metric's gauge is signed
queueDrained sync.WaitGroup
flushSignal chan *sync.WaitGroup
}

// NewRemoteReporter creates a new reporter that sends spans out of process by means of Sender
func NewRemoteReporter(sender transport.Transport, opts ...ReporterOption) Reporter {
func NewRemoteReporter(sender Transport, opts ...ReporterOption) Reporter {
options := reporterOptions{}
for _, option := range opts {
option(&options)
Expand All @@ -195,7 +193,7 @@ func NewRemoteReporter(sender transport.Transport, opts ...ReporterOption) Repor
reporterOptions: options,
sender: sender,
flushSignal: make(chan *sync.WaitGroup),
queue: make(chan *zipkincore.Span, options.queueSize),
queue: make(chan *Span, options.queueSize),
}
go reporter.processQueue()
return reporter
Expand All @@ -204,9 +202,8 @@ func NewRemoteReporter(sender transport.Transport, opts ...ReporterOption) Repor
// Report implements Report() method of Reporter.
// It passes the span to a background go-routine for submission to Jaeger.
func (r *remoteReporter) Report(span *Span) {
thriftSpan := buildThriftSpan(span)
select {
case r.queue <- thriftSpan:
case r.queue <- span:
atomic.AddInt64(&r.queueLength, 1)
default:
r.metrics.ReporterDropped.Inc(1)
Expand Down
125 changes: 59 additions & 66 deletions reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
package jaeger

import (
"fmt"
"io"
"sort"
"strings"
Expand All @@ -34,13 +33,11 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"github.com/uber/jaeger-client-go/testutils"
z "github.com/uber/jaeger-client-go/thrift-gen/zipkincore"
"github.com/uber/jaeger-lib/metrics"
mTestutils "github.com/uber/jaeger-lib/metrics/testutils"

"github.com/uber/jaeger-client-go/transport"
"github.com/uber/jaeger-client-go/transport/udp"
"github.com/uber/jaeger-client-go/testutils"
j "github.com/uber/jaeger-client-go/thrift-gen/jaeger"
)

type reporterSuite struct {
Expand Down Expand Up @@ -92,19 +89,18 @@ func (s *reporterSuite) flushReporter() {
wg.Wait()
}

func (s *reporterSuite) TestRootSpanAnnotations() {
func (s *reporterSuite) TestRootSpanTags() {
s.metricsFactory.Clear()
sp := s.tracer.StartSpan("get_name")
ext.SpanKindRPCServer.Set(sp)
ext.PeerService.Set(sp, s.serviceName)
sp.Finish()
s.flushReporter()
s.Equal(1, len(s.collector.Spans()))
zSpan := s.collector.Spans()[0]
s.NotNil(findAnnotation(zSpan, "sr"), "expecting sr annotation")
s.NotNil(findAnnotation(zSpan, "ss"), "expecting ss annotation")
s.NotNil(findBinaryAnnotation(zSpan, "ca"), "expecting ca annotation")
s.NotNil(findBinaryAnnotation(zSpan, JaegerClientVersionTagKey), "expecting client version tag")
span := s.collector.Spans()[0]
s.Len(span.tags, 6)
s.EqualValues("server", span.tags[2].value, "span.kind should be server")
s.NotNil(findDomainTag(span, JaegerClientVersionTagKey), "expecting client version tag")

mTestutils.AssertCounterMetrics(s.T(), s.metricsFactory,
mTestutils.ExpectedMetric{
Expand All @@ -115,7 +111,7 @@ func (s *reporterSuite) TestRootSpanAnnotations() {
)
}

func (s *reporterSuite) TestClientSpanAnnotations() {
func (s *reporterSuite) TestClientSpan() {
s.metricsFactory.Clear()
sp := s.tracer.StartSpan("get_name")
ext.SpanKindRPCServer.Set(sp)
Expand All @@ -127,13 +123,10 @@ func (s *reporterSuite) TestClientSpanAnnotations() {
sp.Finish()
s.flushReporter()
s.Equal(2, len(s.collector.Spans()))
zSpan := s.collector.Spans()[0] // child span is reported first
s.EqualValues(zSpan.ID, sp2.(*Span).context.spanID)
s.Equal(2, len(zSpan.Annotations), "expecting two annotations, cs and cr")
s.Equal(1, len(zSpan.BinaryAnnotations), "expecting one binary annotation sa")
s.NotNil(findAnnotation(zSpan, "cs"), "expecting cs annotation")
s.NotNil(findAnnotation(zSpan, "cr"), "expecting cr annotation")
s.NotNil(findBinaryAnnotation(zSpan, "sa"), "expecting sa annotation")
span := s.collector.Spans()[0] // child span is reported first
s.EqualValues(span.context.spanID, sp2.(*Span).context.spanID)
s.Len(span.tags, 2)
s.EqualValues("client", span.tags[0].value, "span.kind should be client")

mTestutils.AssertCounterMetrics(s.T(), s.metricsFactory,
mTestutils.ExpectedMetric{
Expand All @@ -151,7 +144,7 @@ func (s *reporterSuite) TestTagsAndEvents() {
expected := []string{"long", "ping", "awake", "awake", "one", "two", "three", "bite me",
JaegerClientVersionTagKey, TracerHostnameTagKey,
SamplerParamTagKey, SamplerTypeTagKey,
"lc", "does not compute"}
"does not compute"}
sp.SetTag("long", strings.Repeat("x", 300))
sp.SetTag("ping", "pong")
sp.SetTag("awake", true)
Expand All @@ -164,39 +157,19 @@ func (s *reporterSuite) TestTagsAndEvents() {
sp.Finish()
s.flushReporter()
s.Equal(1, len(s.collector.Spans()))
zSpan := s.collector.Spans()[0]
s.Equal(2, len(zSpan.Annotations), "expecting two annotations for events")
s.Equal(len(expected), len(zSpan.BinaryAnnotations),
"expecting %d binary annotations", len(expected))
binAnnos := []string{}
for _, a := range zSpan.BinaryAnnotations {
binAnnos = append(binAnnos, string(a.Key))
span := s.collector.Spans()[0]
s.Equal(2, len(span.logs), "expecting two logs")
s.Equal(len(expected), len(span.tags),
"expecting %d tags", len(expected))
tags := []string{}
for _, tag := range span.tags {
tags = append(tags, string(tag.key))
}
sort.Strings(expected)
sort.Strings(binAnnos)
s.Equal(expected, binAnnos, "expecting %d binary annotations", len(expected))

s.NotNil(findAnnotation(zSpan, "hello"), "expecting 'hello' annotation: %+v", zSpan.Annotations)

longEvent := false
for _, a := range zSpan.Annotations {
if strings.HasPrefix(a.Value, "long event") {
longEvent = true
s.EqualValues(256, len(a.Value))
}
}
s.True(longEvent, "Must have truncated and saved long event name")

for i := range expected {
s.NotNil(findBinaryAnnotation(zSpan, expected[i]), "expecting annotation '%s'", expected[i])
}
doesNotCompute := findBinaryAnnotation(zSpan, "does not compute")
s.NotNil(doesNotCompute)
doesNotComputeStr := fmt.Sprintf("%+v", sp)
s.Equal(doesNotComputeStr, string(doesNotCompute.Value))
sort.Strings(tags)
s.Equal(expected, tags, "expecting %d tags", len(expected))

longStr := findBinaryAnnotation(zSpan, "long")
s.EqualValues(256, len(longStr.Value), "long tag valur must be truncated")
s.NotNil(findDomainLog(span, "hello"), "expecting 'hello' log: %+v", span.logs)
}

func TestUDPReporter(t *testing.T) {
Expand All @@ -205,18 +178,18 @@ func TestUDPReporter(t *testing.T) {
defer agent.Close()

testRemoteReporter(t,
func(m *Metrics) (transport.Transport, error) {
return udp.NewUDPTransport(agent.SpanServerAddr(), 0)
func(m *Metrics) (Transport, error) {
return NewUDPTransport(agent.SpanServerAddr(), 0)
},
func() []*z.Span {
return agent.GetZipkinSpans()
func() []*j.Batch {
return agent.GetJaegerBatches()
})
}

func testRemoteReporter(
t *testing.T,
factory func(m *Metrics) (transport.Transport, error),
getSpans func() []*z.Span,
factory func(m *Metrics) (Transport, error),
getBatches func() []*j.Batch,
) {
metricsFactory := metrics.NewLocalFactory(0)
metrics := NewMetrics(metricsFactory, nil)
Expand All @@ -239,12 +212,14 @@ func testRemoteReporter(
// however, in case of UDP reporter it's fire and forget, so we need to wait a bit
time.Sleep(5 * time.Millisecond)

spans := getSpans()
require.Equal(t, 1, len(spans))
assert.Equal(t, "leela", spans[0].Name)
sa := findBinaryAnnotation(spans[0], z.SERVER_ADDR)
require.NotNil(t, sa)
assert.Equal(t, "downstream", sa.Host.ServiceName)
batches := getBatches()
require.Equal(t, 1, len(batches))
require.Equal(t, 1, len(batches[0].Spans))
assert.Equal(t, "leela", batches[0].Spans[0].OperationName)
assert.Equal(t, "reporter-test-service", batches[0].Process.ServiceName)
tag := findJaegerTag("peer.service", batches[0].Spans[0].Tags)
assert.NotNil(t, tag)
assert.Equal(t, "downstream", *tag.VStr)

mTestutils.AssertCounterMetrics(t, metricsFactory, []mTestutils.ExpectedMetric{
{Name: "jaeger.reporter-spans", Tags: map[string]string{"state": "success"}, Value: 1},
Expand All @@ -262,11 +237,11 @@ func (s *reporterSuite) TestMemoryReporterReport() {
}

type fakeSender struct {
spans []*z.Span
spans []*Span
mutex sync.Mutex
}

func (s *fakeSender) Append(span *z.Span) (int, error) {
func (s *fakeSender) Append(span *Span) (int, error) {
s.mutex.Lock()
defer s.mutex.Unlock()
s.spans = append(s.spans, span)
Expand All @@ -279,10 +254,28 @@ func (s *fakeSender) Flush() (int, error) {

func (s *fakeSender) Close() error { return nil }

func (s *fakeSender) Spans() []*z.Span {
func (s *fakeSender) Spans() []*Span {
s.mutex.Lock()
defer s.mutex.Unlock()
res := make([]*z.Span, len(s.spans))
res := make([]*Span, len(s.spans))
copy(res, s.spans)
return res
}

func findDomainLog(span *Span, key string) *opentracing.LogRecord {
for _, log := range span.logs {
if log.Fields[0].Value().(string) == key {
return &log
}
}
return nil
}

func findDomainTag(span *Span, key string) *Tag {
for _, tag := range span.tags {
if tag.key == key {
return &tag
}
}
return nil
}
Loading

0 comments on commit fda19be

Please sign in to comment.