Skip to content

Commit

Permalink
add header for writer post
Browse files Browse the repository at this point in the history
  • Loading branch information
UlricQin committed Jun 7, 2022
1 parent c62b9ed commit 3899144
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 36 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

NOW = $(shell date -u '+%Y%m%d%I%M%S')

RELEASE_VERSION = 5.9.0
RELEASE_VERSION = 5.9.1

APP = n9e
SERVER_BIN = $(APP)
Expand Down
28 changes: 25 additions & 3 deletions src/server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/didi/nightingale/v5/src/pkg/logx"
"github.com/didi/nightingale/v5/src/pkg/ormx"
"github.com/didi/nightingale/v5/src/server/reader"
"github.com/didi/nightingale/v5/src/server/writer"
"github.com/didi/nightingale/v5/src/storage"
)

Expand Down Expand Up @@ -135,12 +134,35 @@ type Config struct {
NoData NoData
Redis storage.RedisConfig
DB ormx.DBConfig
WriterOpt writer.GlobalOpt
Writers []writer.Options
WriterOpt WriterGlobalOpt
Writers []WriterOptions
Reader reader.Options
Ibex Ibex
}

type WriterOptions struct {
Url string
BasicAuthUser string
BasicAuthPass string

Timeout int64
DialTimeout int64
TLSHandshakeTimeout int64
ExpectContinueTimeout int64
IdleConnTimeout int64
KeepAlive int64

MaxConnsPerHost int
MaxIdleConns int
MaxIdleConnsPerHost int
}

type WriterGlobalOpt struct {
QueueCount int
QueueMaxSize int
QueuePopSize int
}

type HeartbeatConfig struct {
IP string
Interval int64
Expand Down
43 changes: 11 additions & 32 deletions src/server/writer/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,38 +9,16 @@ import (
"net/http"
"time"

"github.com/didi/nightingale/v5/src/server/config"
"github.com/golang/protobuf/proto"
"github.com/golang/snappy"
"github.com/prometheus/client_golang/api"
"github.com/prometheus/prometheus/prompb"
"github.com/toolkits/pkg/logger"
)

type Options struct {
Url string
BasicAuthUser string
BasicAuthPass string

Timeout int64
DialTimeout int64
TLSHandshakeTimeout int64
ExpectContinueTimeout int64
IdleConnTimeout int64
KeepAlive int64

MaxConnsPerHost int
MaxIdleConns int
MaxIdleConnsPerHost int
}

type GlobalOpt struct {
QueueCount int
QueueMaxSize int
QueuePopSize int
}

type WriterType struct {
Opts Options
Opts config.WriterOptions
Client api.Client
}

Expand Down Expand Up @@ -102,7 +80,7 @@ func (w WriterType) Post(req []byte, headers ...map[string]string) error {
}

type WritersType struct {
globalOpt GlobalOpt
globalOpt config.WriterGlobalOpt
backends map[string]WriterType
chans map[int]chan *prompb.TimeSeries
}
Expand All @@ -128,7 +106,7 @@ func (ws *WritersType) PushSample(ident string, v interface{}) {

// StartConsumer every ident channel has a consumer, start it
// @Author: quzhihao
func (ws *WritersType) StartConsumer(ch chan *prompb.TimeSeries) {
func (ws *WritersType) StartConsumer(index int, ch chan *prompb.TimeSeries) {
var (
batch = ws.globalOpt.QueuePopSize
series = make([]*prompb.TimeSeries, 0, batch)
Expand All @@ -143,15 +121,15 @@ func (ws *WritersType) StartConsumer(ch chan *prompb.TimeSeries) {

batchCounter++
if batchCounter >= ws.globalOpt.QueuePopSize {
ws.post(series)
ws.post(index, series)

// reset
batchCounter = 0
series = make([]*prompb.TimeSeries, 0, batch)
}
case <-time.After(time.Second):
if len(series) > 0 {
ws.post(series)
ws.post(index, series)

// reset
batchCounter = 0
Expand All @@ -163,9 +141,10 @@ func (ws *WritersType) StartConsumer(ch chan *prompb.TimeSeries) {

// post post series to TSDB
// @Author: quzhihao
func (ws *WritersType) post(series []*prompb.TimeSeries) {
func (ws *WritersType) post(index int, series []*prompb.TimeSeries) {
header := map[string]string{"hash": fmt.Sprintf("%s-%d", config.C.Heartbeat.Endpoint, index)}
for key := range ws.backends {
go ws.backends[key].Write(series)
go ws.backends[key].Write(series, header)
}
}

Expand All @@ -177,14 +156,14 @@ func NewWriters() WritersType {

var Writers = NewWriters()

func Init(opts []Options, globalOpt GlobalOpt) error {
func Init(opts []config.WriterOptions, globalOpt config.WriterGlobalOpt) error {
Writers.globalOpt = globalOpt
Writers.chans = make(map[int]chan *prompb.TimeSeries)

// init channels
for i := 0; i < globalOpt.QueueCount; i++ {
Writers.chans[i] = make(chan *prompb.TimeSeries, Writers.globalOpt.QueueMaxSize)
go Writers.StartConsumer(Writers.chans[i])
go Writers.StartConsumer(i, Writers.chans[i])
}

for i := 0; i < len(opts); i++ {
Expand Down

0 comments on commit 3899144

Please sign in to comment.