forked from elastic/apm-server
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpub.go
154 lines (132 loc) · 4.46 KB
/
pub.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package beater
import (
"context"
"runtime"
"sync"
"time"
"github.com/pkg/errors"
"github.com/elastic/apm-agent-go"
"github.com/elastic/apm-server/transform"
"github.com/elastic/beats/libbeat/beat"
)
// Publisher forwards batches of events to libbeat. It uses GuaranteedSend
// to enable infinite retry of events being processed.
// If the publisher's input channel is full, an error is returned immediately.
// Number of concurrent requests waiting for processing do depend on the configured
// queue size. As the publisher is not waiting for the outputs ACK, the total
// number requests(events) active in the system can exceed the queue size. Only
// the number of concurrent HTTP requests trying to publish at the same time is limited.
type publisher struct {
pendingRequests chan pendingReq
tracer *elasticapm.Tracer
client beat.Client
m sync.RWMutex
stopped bool
}
type pendingReq struct {
transformables []transform.Transformable
tcontext transform.Context
trace bool
}
var (
errFull = errors.New("Queue is full")
errInvalidBufferSize = errors.New("Request buffer must be > 0")
errChannelClosed = errors.New("Can't send batch, publisher is being stopped")
)
// newPublisher creates a new publisher instance.
//MaxCPU new go-routines are started for forwarding events to libbeat.
//Stop must be called to close the beat.Client and free resources.
func newPublisher(pipeline beat.Pipeline, N int, shutdownTimeout time.Duration, tracer *elasticapm.Tracer) (*publisher, error) {
if N <= 0 {
return nil, errInvalidBufferSize
}
client, err := pipeline.ConnectWith(beat.ClientConfig{
PublishMode: beat.GuaranteedSend,
// If set >0 `Close` will block for the duration or until pipeline is empty
WaitClose: shutdownTimeout,
SkipNormalization: true,
})
if err != nil {
return nil, err
}
p := &publisher{
tracer: tracer,
client: client,
// Set channel size to N - 1. One request will be actively processed by the
// worker, while the other concurrent requests will be buffered in the queue.
pendingRequests: make(chan pendingReq, N-1),
}
for i := 0; i < runtime.GOMAXPROCS(0); i++ {
go p.run()
}
return p, nil
}
// Stop closes all channels and waits for the the worker to stop.
// The worker will drain the queue on shutdown, but no more pending requests
// will be published.
func (p *publisher) Stop() {
p.m.Lock()
p.stopped = true
p.m.Unlock()
close(p.pendingRequests)
p.client.Close()
}
// Send tries to forward pendingReq to the publishers worker. If the queue is full,
// an error is returned.
// Calling send after Stop will return an error.
func (p *publisher) Send(ctx context.Context, req pendingReq) error {
p.m.RLock()
defer p.m.RUnlock()
if p.stopped {
return errChannelClosed
}
span, ctx := elasticapm.StartSpan(ctx, "Send", "Publisher")
if span != nil {
defer span.End()
req.trace = !span.Dropped()
}
select {
case <-ctx.Done():
return ctx.Err()
case p.pendingRequests <- req:
return nil
case <-time.After(time.Second * 1): // this forces the go scheduler to try something else for a while
return errFull
}
}
func (p *publisher) run() {
for req := range p.pendingRequests {
p.processPendingReq(req)
}
}
func (p *publisher) processPendingReq(req pendingReq) {
var tx *elasticapm.Transaction
if req.trace {
tx = p.tracer.StartTransaction("ProcessPending", "Publisher")
defer tx.End()
}
for _, transformable := range req.transformables {
span := tx.StartSpan("Transform", "Publisher", nil)
events := transformable.Transform(&req.tcontext)
span.End()
span = tx.StartSpan("PublishAll", "Publisher", nil)
p.client.PublishAll(events)
span.End()
}
}