Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gmp: Skip WAL (passthrough) if no PRW is specified, normal agent mode otherwise. #207

Draft
wants to merge 1 commit into
base: release-2.45.3-gmp
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 31 additions & 1 deletion cmd/prometheus/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -771,6 +771,7 @@ func main() {
// This is passed to ruleManager.Update().
externalURL := cfg.web.ExternalURL.String()

gcmAgentWriteSkipper := &writeSkipperForNoRWConfig{logger: logger, noRWEndpointConfigured: atomic.NewBool(true)}
reloaders := []reloader{
{
name: "db_storage",
Expand All @@ -781,6 +782,15 @@ func main() {
}, {
name: "web_handler",
reloader: webHandler.ApplyConfig,
}, {
// NOTE(bwplotka): GMP forked logic.
name: "gmp_noopfornorwconfig_storage",
reloader: func(cfg *config.Config) error {
if agentMode {
return gcmAgentWriteSkipper.ApplyConfig(cfg)
}
return nil
},
}, {
name: "query_engine",
reloader: func(cfg *config.Config) error {
Expand Down Expand Up @@ -1115,7 +1125,7 @@ func main() {
func() error {
select {
case <-dbOpen:
// In case a shutdown is initiated before the dbOpen is released
// In case a shutdown is initiated before the dbOpen is released
case <-cancel:
reloadReady.Close()
return nil
Expand Down Expand Up @@ -1196,6 +1206,7 @@ func main() {
if agentMode {
// WAL storage.
opts := cfg.agent.ToAgentOptions()
opts.SkipWrite = gcmAgentWriteSkipper.noRWEndpointConfigured
cancel := make(chan struct{})
g.Add(
func() error {
Expand Down Expand Up @@ -1770,3 +1781,22 @@ func deleteStorageData(agentMode bool, dataPath string) error {
}
return nil
}

type writeSkipperForNoRWConfig struct {
logger log.Logger
noRWEndpointConfigured *atomic.Bool
}

func (s *writeSkipperForNoRWConfig) ApplyConfig(conf *config.Config) error {
if len(conf.RemoteWriteConfigs) > 0 {
if s.noRWEndpointConfigured.Swap(false) {
level.Info(s.logger).Log("msg", "gmp forked logic: enabling agent storage appending given a new remote_write config entry")
}
} else {
if !s.noRWEndpointConfigured.Swap(true) {
level.Info(s.logger).Log("msg", "gmp forked logic: disabling agent storage appending given no remote_write was configured; no need to utilize agent WAL.")
// TODO(bwplotka): Remove left-over from WAL?
}
}
return nil
}
38 changes: 30 additions & 8 deletions tsdb/agent/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@ type Options struct {

// NoLockfile disables creation and consideration of a lock file.
NoLockfile bool

// NOTE: GCM forked logic, controls if we should skip WAL/WBL for GCM only mode.
SkipWrite *atomic.Bool
}

// DefaultOptions used for the WAL storage. They are reasonable for setups using
Expand All @@ -95,6 +98,7 @@ func DefaultOptions() *Options {
MinWALTime: DefaultMinWALTime,
MaxWALTime: DefaultMaxWALTime,
NoLockfile: false,
SkipWrite: atomic.NewBool(false),
}
}

Expand Down Expand Up @@ -298,6 +302,7 @@ func Open(l log.Logger, reg prometheus.Registerer, rs *remote.Storage, dir strin
pendingFloatHistograms: make([]record.RefFloatHistogramSample, 0, 100),
pendingExamplars: make([]record.RefExemplar, 0, 10),
exportExemplars: make(map[storage.SeriesRef]record.RefExemplar, 10),
skipWrite: opts.SkipWrite.Load(),
}
}

Expand Down Expand Up @@ -349,6 +354,9 @@ func validateOptions(opts *Options) *Options {
if t := int64(opts.TruncateFrequency / time.Millisecond); opts.MaxWALTime < t {
opts.MaxWALTime = t
}
if opts.SkipWrite == nil {
opts.SkipWrite = atomic.NewBool(false)
}
return opts
}

Expand Down Expand Up @@ -621,6 +629,8 @@ Loop:
if ts < 0 {
ts = 0
}
// TODO(bwplotka): Debug, remove later.
level.Warn(db.logger).Log("msg", "gmp: truncating", "lowest", db.rs.LowestSentTimestamp(), "minTime", db.opts.MinWALTime, "result", ts)

// Network issues can prevent the result of getRemoteWriteTimestamp from
// changing. We don't want data in the WAL to grow forever, so we set a cap
Expand All @@ -629,8 +639,8 @@ Loop:
if maxTS := timestamp.FromTime(time.Now()) - db.opts.MaxWALTime; ts < maxTS {
ts = maxTS
}

level.Debug(db.logger).Log("msg", "truncating the WAL", "ts", ts)
// TODO(bwplotka): Move back to debug.
level.Warn(db.logger).Log("msg", "truncating the WAL", "ts", ts)
if err := db.truncate(ts); err != nil {
level.Warn(db.logger).Log("msg", "failed to truncate WAL", "err", err)
}
Expand Down Expand Up @@ -800,10 +810,13 @@ type appender struct {
// Series lock is not held on elements.
floatHistogramSeries []*memSeries

// NOTE: GCM forked logic
metadata gcm_export.MetadataFunc

// exemplars to be exported to GCM
// exemplars to be exported to GCM.
exportExemplars map[storage.SeriesRef]record.RefExemplar
// skipWrite ignore writes to WAL/WBL if true. This is to skip appending to
// WAL storage when no PRW is configured -- GCM export does not need that storage.
skipWrite bool
}

func (a *appender) Append(ref storage.SeriesRef, l labels.Labels, t int64, v float64) (storage.SeriesRef, error) {
Expand Down Expand Up @@ -831,7 +844,9 @@ func (a *appender) Append(ref storage.SeriesRef, l labels.Labels, t int64, v flo
Labels: l,
})

a.metrics.numActiveSeries.Inc()
if !a.skipWrite {
a.metrics.numActiveSeries.Inc()
}
}
}

Expand Down Expand Up @@ -961,7 +976,9 @@ func (a *appender) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t int
Labels: l,
})

a.metrics.numActiveSeries.Inc()
if !a.skipWrite {
a.metrics.numActiveSeries.Inc()
}
}
}

Expand Down Expand Up @@ -1006,6 +1023,13 @@ func (a *appender) Commit() error {
a.mtx.RLock()
defer a.mtx.RUnlock()

// NOTE: GCM forked logic.
gcm_exportsetup.Global().Export(a.metadata, a.pendingSamples, a.exportExemplars)
if a.skipWrite {
return a.Rollback()
}
// ----

var encoder record.Encoder
buf := a.bufPool.Get().([]byte)

Expand Down Expand Up @@ -1069,8 +1093,6 @@ func (a *appender) Commit() error {
}
}

gcm_exportsetup.Global().Export(a.metadata, a.pendingSamples, a.exportExemplars)

//nolint:staticcheck
a.bufPool.Put(buf)
return a.Rollback()
Expand Down
Loading