forked from influxdata/influxdb
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathscheduler.go
135 lines (120 loc) · 2.99 KB
/
scheduler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
package gather
import (
"bytes"
"context"
"encoding/json"
"fmt"
"time"
"github.com/influxdata/influxdb/v2"
"github.com/influxdata/influxdb/v2/kit/tracing"
"github.com/influxdata/influxdb/v2/nats"
"go.uber.org/zap"
)
// nats subjects
const (
MetricsSubject = "metrics"
promTargetSubject = "promTarget"
)
// Scheduler is struct to run scrape jobs.
type Scheduler struct {
Targets influxdb.ScraperTargetStoreService
// Interval is between each metrics gathering event.
Interval time.Duration
// Timeout is the maximum time duration allowed by each TCP request
Timeout time.Duration
// Publisher will send the gather requests and gathered metrics to the queue.
Publisher nats.Publisher
log *zap.Logger
gather chan struct{}
}
// NewScheduler creates a new Scheduler and subscriptions for scraper jobs.
func NewScheduler(
log *zap.Logger,
numScrapers int,
targets influxdb.ScraperTargetStoreService,
p nats.Publisher,
s nats.Subscriber,
interval time.Duration,
timeout time.Duration,
) (*Scheduler, error) {
if interval == 0 {
interval = 60 * time.Second
}
if timeout == 0 {
timeout = 30 * time.Second
}
scheduler := &Scheduler{
Targets: targets,
Interval: interval,
Timeout: timeout,
Publisher: p,
log: log,
gather: make(chan struct{}, 100),
}
for i := 0; i < numScrapers; i++ {
err := s.Subscribe(promTargetSubject, "metrics", &handler{
Scraper: newPrometheusScraper(),
Publisher: p,
log: log,
})
if err != nil {
return nil, err
}
}
return scheduler, nil
}
// Run will retrieve scraper targets from the target storage,
// and publish them to nats job queue for gather.
func (s *Scheduler) Run(ctx context.Context) error {
go func(s *Scheduler, ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case <-time.After(s.Interval): // TODO: change to ticker because of garbage collection
s.gather <- struct{}{}
}
}
}(s, ctx)
return s.run(ctx)
}
func (s *Scheduler) run(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return nil
case <-s.gather:
s.doGather(ctx)
}
}
}
func (s *Scheduler) doGather(ctx context.Context) {
ctx, cancel := context.WithTimeout(ctx, s.Timeout)
defer cancel()
span, ctx := tracing.StartSpanFromContext(ctx)
defer span.Finish()
targets, err := s.Targets.ListTargets(ctx, influxdb.ScraperTargetFilter{})
if err != nil {
s.log.Error("Cannot list targets", zap.Error(err))
tracing.LogError(span, err)
return
}
for _, target := range targets {
if err := requestScrape(target, s.Publisher); err != nil {
s.log.Error("JSON encoding error", zap.Error(err))
tracing.LogError(span, err)
}
}
}
func requestScrape(t influxdb.ScraperTarget, publisher nats.Publisher) error {
buf := new(bytes.Buffer)
err := json.NewEncoder(buf).Encode(t)
if err != nil {
return err
}
switch t.Type {
case influxdb.PrometheusScraperType:
return publisher.Publish(promTargetSubject, buf)
}
return fmt.Errorf("unsupported target scrape type: %s", t.Type)
}