Skip to content

Commit

Permalink
use interface for server info
Browse files Browse the repository at this point in the history
  • Loading branch information
nathanielc committed Jun 2, 2017
1 parent c187664 commit 53127e8
Show file tree
Hide file tree
Showing 17 changed files with 156 additions and 62 deletions.
20 changes: 10 additions & 10 deletions alert.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"github.com/influxdata/kapacitor/services/victorops"
"github.com/influxdata/kapacitor/tick/ast"
"github.com/influxdata/kapacitor/tick/stateful"
"github.com/influxdata/kapacitor/vars"
"github.com/pkg/errors"
)

Expand Down Expand Up @@ -74,20 +73,13 @@ type AlertNode struct {

levelResets []stateful.Expression
lrScopePools []stateful.ScopePool

serverInfo serverInfo
}

// Create a new AlertNode which caches the most recent item and exposes it over the HTTP API.
func newAlertNode(et *ExecutingTask, n *pipeline.AlertNode, l *log.Logger) (an *AlertNode, err error) {
an = &AlertNode{
node: node{Node: n, et: et, logger: l},
a: n,
serverInfo: serverInfo{
Hostname: vars.HostVar.StringValue(),
ClusterID: vars.ClusterIDVar.StringValue(),
ServerID: vars.ServerIDVar.StringValue(),
},
}
an.node.runF = an.runAlert

Expand Down Expand Up @@ -1031,6 +1023,14 @@ type detailsInfo struct {
Message string
}

func (a *AlertNode) serverInfo() serverInfo {
return serverInfo{
Hostname: a.et.tm.ServerInfo.Hostname(),
ClusterID: a.et.tm.ServerInfo.ClusterID().String(),
ServerID: a.et.tm.ServerInfo.ServerID().String(),
}

}
func (a *AlertNode) renderID(name string, group models.GroupID, tags models.Tags) (string, error) {
g := string(group)
if group == models.NilGroup {
Expand All @@ -1041,7 +1041,7 @@ func (a *AlertNode) renderID(name string, group models.GroupID, tags models.Tags
TaskName: a.et.Task.ID,
Group: g,
Tags: tags,
ServerInfo: a.serverInfo,
ServerInfo: a.serverInfo(),
}
id := a.bufPool.Get().(*bytes.Buffer)
defer func() {
Expand All @@ -1067,7 +1067,7 @@ func (a *AlertNode) renderMessageAndDetails(id, name string, t time.Time, group
TaskName: a.et.Task.ID,
Group: g,
Tags: tags,
ServerInfo: a.serverInfo,
ServerInfo: a.serverInfo(),
},
ID: id,
Fields: fields,
Expand Down
2 changes: 1 addition & 1 deletion alert/topics.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"sync"

"github.com/influxdata/kapacitor/expvar"
"github.com/influxdata/kapacitor/vars"
"github.com/influxdata/kapacitor/server/vars"
)

const (
Expand Down
2 changes: 1 addition & 1 deletion edge.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"github.com/influxdata/kapacitor/expvar"
"github.com/influxdata/kapacitor/models"
"github.com/influxdata/kapacitor/pipeline"
"github.com/influxdata/kapacitor/vars"
"github.com/influxdata/kapacitor/server/vars"
)

const (
Expand Down
4 changes: 2 additions & 2 deletions integrations/batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
func TestBatch_InvalidQuery(t *testing.T) {

// Create a new execution env
tm := kapacitor.NewTaskMaster("invalidQuery", logService)
tm := kapacitor.NewTaskMaster("invalidQuery", newServerInfo(), logService)
tm.HTTPDService = newHTTPDService()
tm.TaskStore = taskStore{}
tm.DeadmanService = deadman{}
Expand Down Expand Up @@ -2976,7 +2976,7 @@ func testBatcher(t *testing.T, name, script string) (clock.Setter, *kapacitor.Ex
}

// Create a new execution env
tm := kapacitor.NewTaskMaster("testBatcher", logService)
tm := kapacitor.NewTaskMaster("testBatcher", newServerInfo(), logService)
httpdService := newHTTPDService()
tm.HTTPDService = httpdService
tm.TaskStore = taskStore{}
Expand Down
2 changes: 1 addition & 1 deletion integrations/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ func Bench(b *testing.B, tasksCount, pointCount, expectedProcessedCount int, tic
for i := 0; i < b.N; i++ {
// Do not time setup
b.StopTimer()
tm := kapacitor.NewTaskMaster("bench", loggingtest.New())
tm := kapacitor.NewTaskMaster("bench", newServerInfo(), loggingtest.New())
tm.HTTPDService = httpdService
tm.UDFService = nil
tm.TaskStore = taskStore{}
Expand Down
65 changes: 65 additions & 0 deletions integrations/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/influxdata/kapacitor/services/httpd"
k8s "github.com/influxdata/kapacitor/services/k8s/client"
"github.com/influxdata/kapacitor/udf"
"github.com/influxdata/kapacitor/uuid"
)

func newHTTPDService() *httpd.Service {
Expand Down Expand Up @@ -202,3 +203,67 @@ func (k k8sScales) Get(kind, name string) (*k8s.Scale, error) {
func (k k8sScales) Update(kind string, scale *k8s.Scale) error {
return k.ScalesUpdateFunc(kind, scale)
}

type serverInfo struct {
clusterID,
serverID uuid.UUID

hostname,
version,
product string

numTasks,
numEnabledTasks,
numSubscriptions int64

uptime func() time.Duration
}

func newServerInfo() serverInfo {
return serverInfo{
clusterID: uuid.New(),
serverID: uuid.New(),
hostname: "localhost",
version: "test",
product: "kapacitor",
}
}

func (i serverInfo) ClusterID() uuid.UUID {
return i.clusterID
}

func (i serverInfo) ServerID() uuid.UUID {
return i.serverID
}

func (i serverInfo) Hostname() string {
return i.hostname
}

func (i serverInfo) Version() string {
return i.version
}

func (i serverInfo) Product() string {
return i.product
}

func (i serverInfo) NumTasks() int64 {
return i.numTasks
}

func (i serverInfo) NumEnabledTasks() int64 {
return i.numEnabledTasks
}

func (i serverInfo) NumSubscriptions() int64 {
return i.numSubscriptions
}

func (i serverInfo) Uptime() time.Duration {
if i.uptime != nil {
return i.uptime()
}
return 0
}
2 changes: 1 addition & 1 deletion integrations/streamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10766,7 +10766,7 @@ func compareListIgnoreOrder(got, exp []interface{}, cmpF func(got, exp interface
}

func createTaskMaster() (*kapacitor.TaskMaster, error) {
tm := kapacitor.NewTaskMaster("testStreamer", logService)
tm := kapacitor.NewTaskMaster("testStreamer", newServerInfo(), logService)
httpdService := newHTTPDService()
tm.HTTPDService = httpdService
tm.TaskStore = taskStore{}
Expand Down
2 changes: 1 addition & 1 deletion node.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ import (
kexpvar "github.com/influxdata/kapacitor/expvar"
"github.com/influxdata/kapacitor/models"
"github.com/influxdata/kapacitor/pipeline"
"github.com/influxdata/kapacitor/server/vars"
"github.com/influxdata/kapacitor/timer"
"github.com/influxdata/kapacitor/vars"
"github.com/pkg/errors"
)

Expand Down
18 changes: 4 additions & 14 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/influxdata/kapacitor/auth"
"github.com/influxdata/kapacitor/command"
iclient "github.com/influxdata/kapacitor/influxdb"
"github.com/influxdata/kapacitor/server/vars"
"github.com/influxdata/kapacitor/services/alert"
"github.com/influxdata/kapacitor/services/alerta"
"github.com/influxdata/kapacitor/services/azure"
Expand Down Expand Up @@ -62,7 +63,6 @@ import (
"github.com/influxdata/kapacitor/services/udp"
"github.com/influxdata/kapacitor/services/victorops"
"github.com/influxdata/kapacitor/uuid"
"github.com/influxdata/kapacitor/vars"
"github.com/pkg/errors"
)

Expand Down Expand Up @@ -171,7 +171,7 @@ func New(c *Config, buildInfo BuildInfo, logService logging.Interface) (*Server,

// Start Task Master
s.TaskMasterLookup = kapacitor.NewTaskMasterLookup()
s.TaskMaster = kapacitor.NewTaskMaster(kapacitor.MainTaskMaster, logService)
s.TaskMaster = kapacitor.NewTaskMaster(kapacitor.MainTaskMaster, vars.Info, logService)
s.TaskMaster.DefaultRetentionPolicy = c.DefaultRetentionPolicy
s.TaskMaster.Commander = s.Commander
s.TaskMasterLookup.Set(s.TaskMaster)
Expand Down Expand Up @@ -347,7 +347,7 @@ func (s *Server) appendInfluxDBService() error {
if err != nil {
return errors.Wrap(err, "failed to get http port")
}
srv, err := influxdb.NewService(c, httpPort, s.config.Hostname, varsIDer{}, s.config.HTTP.AuthEnabled, l)
srv, err := influxdb.NewService(c, httpPort, s.config.Hostname, vars.Info, s.config.HTTP.AuthEnabled, l)
if err != nil {
return err
}
Expand Down Expand Up @@ -673,7 +673,7 @@ func (s *Server) appendReportingService() {
c := s.config.Reporting
if c.Enabled {
l := s.LogService.NewLogger("[reporting] ", log.LstdFlags)
srv := reporting.NewService(c, l)
srv := reporting.NewService(c, vars.Info, l)

s.AppendService("reporting", srv)
}
Expand Down Expand Up @@ -1037,13 +1037,3 @@ func (qe *Queryexecutor) Authorize(u *meta.UserInfo, q *influxql.Query, db strin
func (qe *Queryexecutor) ExecuteQuery(q *influxql.Query, db string, chunkSize int) (<-chan *influxql.Result, error) {
return nil, errors.New("cannot execute queries against Kapacitor")
}

type varsIDer struct {
}

func (v varsIDer) ClusterID() uuid.UUID {
return vars.ClusterIDVar.UUIDValue()
}
func (v varsIDer) ServerID() uuid.UUID {
return vars.ServerIDVar.UUIDValue()
}
2 changes: 1 addition & 1 deletion vars/stats.go → server/vars/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func GetStatsData() ([]StatsData, error) {
})

// Add uptime to globalData
globalData.Values[UptimeVarName] = Uptime().Seconds()
globalData.Values[UptimeVarName] = uptime().Seconds()

// Add Go runtime stats.
data := StatsData{
Expand Down
48 changes: 47 additions & 1 deletion vars/vars.go → server/vars/vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"time"

kexpvar "github.com/influxdata/kapacitor/expvar"
"github.com/influxdata/kapacitor/uuid"
)

const (
Expand Down Expand Up @@ -56,6 +57,51 @@ func init() {
expvar.Publish(VersionVarName, VersionVar)
}

func Uptime() time.Duration {
func uptime() time.Duration {
return time.Since(startTime)
}

type Infoer interface {
ClusterID() uuid.UUID
ServerID() uuid.UUID
Hostname() string
Version() string
Product() string
NumTasks() int64
NumEnabledTasks() int64
NumSubscriptions() int64
Uptime() time.Duration
}

var Info = info{}

type info struct{}

func (info) ClusterID() uuid.UUID {
return ClusterIDVar.UUIDValue()
}
func (info) ServerID() uuid.UUID {
return ServerIDVar.UUIDValue()
}
func (info) Hostname() string {
return HostVar.StringValue()
}
func (info) Version() string {
return VersionVar.StringValue()
}
func (info) Product() string {
return ProductVar.StringValue()
}

func (info) NumTasks() int64 {
return NumTasksVar.IntValue()
}
func (info) NumEnabledTasks() int64 {
return NumEnabledTasksVar.IntValue()
}
func (info) NumSubscriptions() int64 {
return NumSubscriptionsVar.IntValue()
}
func (info) Uptime() time.Duration {
return uptime()
}
2 changes: 1 addition & 1 deletion services/influxdb/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ import (
"github.com/influxdata/influxdb/influxql"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/kapacitor/influxdb"
"github.com/influxdata/kapacitor/server/vars"
"github.com/influxdata/kapacitor/services/httpd"
"github.com/influxdata/kapacitor/services/udp"
"github.com/influxdata/kapacitor/uuid"
"github.com/influxdata/kapacitor/vars"
"github.com/pkg/errors"
)

Expand Down
Loading

0 comments on commit 53127e8

Please sign in to comment.