Skip to content

Commit

Permalink
feat: support loki datasources (ccfos#1700)
Browse files Browse the repository at this point in the history
  • Loading branch information
gravese authored Sep 1, 2023
1 parent 10a0b50 commit 616d56d
Show file tree
Hide file tree
Showing 11 changed files with 52 additions and 15 deletions.
12 changes: 12 additions & 0 deletions alert/eval/alert_rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,18 @@ func (s *Scheduler) syncAlertRules() {
alertRule := NewAlertRuleWorker(rule, dsId, processor, s.promClients, s.ctx)
alertRuleWorkers[alertRule.Hash()] = alertRule
}
} else if rule.IsLokiRule() {
datasourceIds := s.promClients.Hit(rule.DatasourceIdsJson)
for _, dsId := range datasourceIds {
if !naming.DatasourceHashRing.IsHit(dsId, fmt.Sprintf("%d", rule.Id), s.aconf.Heartbeat.Endpoint) {
continue
}

processor := process.NewProcessor(rule, dsId, s.alertRuleCache, s.targetCache, s.busiGroupCache, s.alertMuteCache, s.datasourceCache, s.promClients, s.ctx, s.stats)

alertRuleWorker := NewAlertRuleWorker(rule, dsId, processor, s.promClients, s.ctx)
alertRuleWorkers[alertRuleWorker.Hash()] = alertRuleWorker
}
} else if rule.IsHostRule() && s.ctx.IsCenter {
// all host rule will be processed by center instance
if !naming.DatasourceHashRing.IsHit(naming.HostDatasource, fmt.Sprintf("%d", rule.Id), s.aconf.Heartbeat.Endpoint) {
Expand Down
4 changes: 3 additions & 1 deletion alert/eval/eval.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func (arw *AlertRuleWorker) Start() {
func (arw *AlertRuleWorker) Eval() {
cachedRule := arw.rule
if cachedRule == nil {
//logger.Errorf("rule_eval:%s rule not found", arw.Key())
// logger.Errorf("rule_eval:%s rule not found", arw.Key())
return
}

Expand All @@ -98,6 +98,8 @@ func (arw *AlertRuleWorker) Eval() {
lst = arw.GetPromAnomalyPoint(cachedRule.RuleConfig)
case models.HOST:
lst = arw.GetHostAnomalyPoint(cachedRule.RuleConfig)
case models.LOKI:
lst = arw.GetPromAnomalyPoint(cachedRule.RuleConfig)
default:
return
}
Expand Down
6 changes: 6 additions & 0 deletions center/cconf/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,10 @@ var Plugins = []Plugin{
Type: "elasticsearch",
TypeName: "Elasticsearch",
},
{
Id: 2,
Category: "loki",
Type: "loki",
TypeName: "Loki",
},
}
12 changes: 12 additions & 0 deletions center/router/router_datasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,18 @@ func DatasourceCheck(ds models.Datasource) error {
}
}

if ds.PluginType == models.LOKI {
subPath := "/api/v1/labels"

fullURL = fmt.Sprintf("%s%s", ds.HTTPJson.Url, subPath)

req, err = http.NewRequest("GET", fullURL, nil)
if err != nil {
logger.Errorf("Error creating request: %v", err)
return fmt.Errorf("request url:%s failed", fullURL)
}
}

if ds.AuthJson.BasicAuthUser != "" {
req.SetBasicAuth(ds.AuthJson.BasicAuthUser, ds.AuthJson.BasicAuthPassword)
}
Expand Down
8 changes: 4 additions & 4 deletions center/router/router_mute.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@ func (rt *Router) alertMuteAdd(c *gin.Context) {
ginx.NewRender(c).Message(f.Add(rt.Ctx))
}

//Preview events (alert_cur_event) that match the mute strategy based on the following criteria:
//business group ID (group_id, group_id), product (prod, rule_prod),
//alert event severity (severities, severity), and event tags (tags, tags).
//For products of type not 'host', also consider the category (cate, cate) and datasource ID (datasource_ids, datasource_id).
// Preview events (alert_cur_event) that match the mute strategy based on the following criteria:
// business group ID (group_id, group_id), product (prod, rule_prod),
// alert event severity (severities, severity), and event tags (tags, tags).
// For products of type not 'host', also consider the category (cate, cate) and datasource ID (datasource_ids, datasource_id).
func (rt *Router) alertMutePreview(c *gin.Context) {
//Generally the match of events would be less.

Expand Down
2 changes: 1 addition & 1 deletion center/router/router_notify_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ type form struct {
Email string `json:"email"`
}

//After configuring the aconf.SMTPConfig, users can choose to perform a test. In this test, the function attempts to send an email
// After configuring the aconf.SMTPConfig, users can choose to perform a test. In this test, the function attempts to send an email
func (rt *Router) attemptSendEmail(c *gin.Context) {
var f form
ginx.BindJSON(c, &f)
Expand Down
6 changes: 2 additions & 4 deletions front/statik/statik.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 5 additions & 2 deletions models/alert_rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
const (
METRIC = "metric"
HOST = "host"
LOKI = "loki"

PROMETHEUS = "prometheus"
)
Expand Down Expand Up @@ -420,7 +421,7 @@ func (ar *AlertRule) FillDatasourceIds() error {

func (ar *AlertRule) FillSeverities() error {
if ar.RuleConfig != "" {
if ar.Cate == PROMETHEUS {
if ar.Cate == PROMETHEUS || ar.Cate == LOKI {
var rule PromRuleConfig
if err := json.Unmarshal([]byte(ar.RuleConfig), &rule); err != nil {
return err
Expand Down Expand Up @@ -787,7 +788,9 @@ func AlertRuleStatistics(ctx *ctx.Context) (*Statistics, error) {
func (ar *AlertRule) IsPrometheusRule() bool {
return ar.Prod == METRIC && ar.Cate == PROMETHEUS
}

func (ar *AlertRule) IsLokiRule() bool {
return ar.Prod == LOKI
}
func (ar *AlertRule) IsHostRule() bool {
return ar.Prod == HOST
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/poster/post.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,11 @@ func GetByUrl[T any](url string, cfg conf.CenterApi) (T, error) {
if len(cfg.BasicAuthUser) > 0 {
req.SetBasicAuth(cfg.BasicAuthUser, cfg.BasicAuthPass)
}

if cfg.Timeout < 1 {
cfg.Timeout = 5000
}

client := &http.Client{
Timeout: time.Duration(cfg.Timeout) * time.Millisecond,
}
Expand Down
4 changes: 4 additions & 0 deletions prom/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ func (pc *PromClientMap) loadFromDatabase() {
var err error
if !pc.ctx.IsCenter {
datasources, err = poster.GetByUrls[[]*models.Datasource](pc.ctx, "/v1/n9e/datasources?typ="+models.PROMETHEUS)
lokiDatasource, err := poster.GetByUrls[[]*models.Datasource](pc.ctx, "/v1/n9e/datasources?typ="+models.LOKI)
datasources = append(datasources, lokiDatasource...)
if err != nil {
logger.Errorf("failed to get datasources, error: %v", err)
return
Expand All @@ -52,6 +54,8 @@ func (pc *PromClientMap) loadFromDatabase() {
}
} else {
datasources, err = models.GetDatasourcesGetsBy(pc.ctx, models.PROMETHEUS, "", "", "")
lokiDatasource, err := models.GetDatasourcesGetsBy(pc.ctx, models.LOKI, "", "", "")
datasources = append(datasources, lokiDatasource...)
if err != nil {
logger.Errorf("failed to get datasources, error: %v", err)
return
Expand Down
2 changes: 1 addition & 1 deletion pushgw/router/router_opentsdb_easyjson.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 616d56d

Please sign in to comment.