Skip to content

Commit

Permalink
refactor: push series (ccfos#1455)
Browse files Browse the repository at this point in the history
  • Loading branch information
710leo authored Mar 30, 2023
1 parent 50345cb commit 4ad47fb
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 24 deletions.
5 changes: 5 additions & 0 deletions pushgw/pconf/conf.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

type Pushgw struct {
BusiGroupLabelKey string
WriteConcurrency int
LabelRewrite bool
ForceUseServerTS bool
DebugSample map[string]string
Expand Down Expand Up @@ -80,6 +81,10 @@ func (p *Pushgw) PreCheck() {
p.WriterOpt.ShardingKey = "ident"
}

if p.WriteConcurrency <= 0 {
p.WriteConcurrency = 5000
}

for _, writer := range p.Writers {
for _, relabel := range writer.WriteRelabels {
if relabel.Regex == "" {
Expand Down
95 changes: 71 additions & 24 deletions pushgw/writer/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ import (
"bytes"
"context"
"fmt"
"hash/crc32"
"net"
"net/http"
"sync"
"time"

"github.com/ccfos/nightingale/v6/pushgw/pconf"
Expand All @@ -15,6 +15,7 @@ import (
"github.com/golang/snappy"
"github.com/prometheus/client_golang/api"
"github.com/prometheus/prometheus/prompb"
"github.com/toolkits/pkg/concurrent/semaphore"
"github.com/toolkits/pkg/logger"
)

Expand All @@ -36,7 +37,8 @@ func (w WriterType) writeRelabel(items []*prompb.TimeSeries) []*prompb.TimeSerie
return ritems
}

func (w WriterType) Write(index int, items []*prompb.TimeSeries, headers ...map[string]string) {
func (w WriterType) Write(items []*prompb.TimeSeries, sema *semaphore.Semaphore, headers ...map[string]string) {
defer sema.Release()
if len(items) == 0 {
return
}
Expand Down Expand Up @@ -121,57 +123,102 @@ func (w WriterType) Post(req []byte, headers ...map[string]string) error {
type WritersType struct {
pushgw pconf.Pushgw
backends map[string]WriterType
queues map[int]*SafeListLimited
queues map[string]*IdentQueue
sema *semaphore.Semaphore
sync.RWMutex
}

type IdentQueue struct {
list *SafeListLimited
closeCh chan struct{}
ts int64
}

func NewWriters(pushgwConfig pconf.Pushgw) *WritersType {
writers := &WritersType{
backends: make(map[string]WriterType),
queues: make(map[string]*IdentQueue),
pushgw: pushgwConfig,
sema: semaphore.NewSemaphore(pushgwConfig.WriteConcurrency),
}

writers.Init()
go writers.CleanExpQueue()
return writers
}

func (ws *WritersType) Put(name string, writer WriterType) {
ws.backends[name] = writer
}

func (ws *WritersType) PushSample(ident string, v interface{}) {
hashkey := crc32.ChecksumIEEE([]byte(ident)) % uint32(ws.pushgw.WriterOpt.QueueCount)
func (ws *WritersType) CleanExpQueue() {
for {
ws.Lock()
for ident := range ws.queues {
identQueue := ws.queues[ident]
if identQueue == nil {
delete(ws.queues, ident)
logger.Warningf("Write channel(%s) not found", ident)
continue
}

c, ok := ws.queues[int(hashkey)]
if ok {
succ := c.PushFront(v)
if !succ {
logger.Warningf("Write channel(%s) full, current channel size: %d", ident, c.Len())
if time.Now().Unix()-identQueue.ts > 3600 {
close(identQueue.closeCh)
delete(ws.queues, ident)
}
}
ws.Unlock()
time.Sleep(time.Second * 600)
}
}

func (ws *WritersType) StartConsumer(index int, ch *SafeListLimited) {
for {
series := ch.PopBack(ws.pushgw.WriterOpt.QueuePopSize)
if len(series) == 0 {
time.Sleep(time.Millisecond * 400)
continue
func (ws *WritersType) PushSample(ident string, v interface{}) {
ws.RLock()
identQueue := ws.queues[ident]
ws.RUnlock()
if identQueue == nil {
identQueue = &IdentQueue{
list: NewSafeListLimited(ws.pushgw.WriterOpt.QueueMaxSize),
closeCh: make(chan struct{}),
ts: time.Now().Unix(),
}

for key := range ws.backends {
go ws.backends[key].Write(index, series)
ws.Lock()
ws.queues[ident] = identQueue
ws.Unlock()

go ws.StartConsumer(identQueue)
}

identQueue.ts = time.Now().Unix()
succ := identQueue.list.PushFront(v)
if !succ {
logger.Warningf("Write channel(%s) full, current channel size: %d", ident, identQueue.list.Len())
}
}

func (ws *WritersType) StartConsumer(identQueue *IdentQueue) {
for {
select {
case <-identQueue.closeCh:
return
default:
ws.sema.Acquire()
series := identQueue.list.PopBack(ws.pushgw.WriterOpt.QueuePopSize)
if len(series) == 0 {
time.Sleep(time.Millisecond * 400)
continue
}
for key := range ws.backends {
ws.sema.Acquire()
go ws.backends[key].Write(series, ws.sema)
}
}
}
}

func (ws *WritersType) Init() error {
opts := ws.pushgw.Writers
ws.queues = make(map[int]*SafeListLimited)

for i := 0; i < ws.pushgw.WriterOpt.QueueCount; i++ {
ws.queues[i] = NewSafeListLimited(ws.pushgw.WriterOpt.QueueMaxSize)
go ws.StartConsumer(i, ws.queues[i])
}

for i := 0; i < len(opts); i++ {
tlsConf, err := opts[i].ClientConfig.TLSConfig()
Expand Down

0 comments on commit 4ad47fb

Please sign in to comment.