Skip to content

Commit

Permalink
watch: support checks watch
Browse files Browse the repository at this point in the history
  • Loading branch information
armon committed Aug 20, 2014
1 parent f3c8873 commit 5aefdf0
Show file tree
Hide file tree
Showing 2 changed files with 164 additions and 1 deletion.
37 changes: 36 additions & 1 deletion watch/funcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func init() {
"services": servicesWatch,
"nodes": nodesWatch,
"service": serviceWatch,
"checks": nil,
"checks": checksWatch,
}
}

Expand Down Expand Up @@ -137,3 +137,38 @@ func serviceWatch(params map[string][]string) (WatchFunc, error) {
}
return fn, nil
}

// checksWatch is used to watch a specific checks in a given state
func checksWatch(params map[string][]string) (WatchFunc, error) {
var service, state string
if err := assignValue(params, "service", &service); err != nil {
return nil, err
}
if err := assignValue(params, "state", &state); err != nil {
return nil, err
}
if service != "" && state != "" {
return nil, fmt.Errorf("Cannot specify service and state")
}
if service == "" && state == "" {
state = "any"
}

fn := func(p *WatchPlan) (uint64, interface{}, error) {
health := p.client.Health()
opts := consulapi.QueryOptions{WaitIndex: p.lastIndex}
var checks []*consulapi.HealthCheck
var meta *consulapi.QueryMeta
var err error
if state != "" {
checks, meta, err = health.State(state, &opts)
} else {
checks, meta, err = health.Checks(service, &opts)
}
if err != nil {
return 0, nil, err
}
return meta.LastIndex, checks, err
}
return fn, nil
}
128 changes: 128 additions & 0 deletions watch/funcs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,3 +264,131 @@ func TestServiceWatch(t *testing.T) {
t.Fatalf("bad: %v", invoke)
}
}

func TestChecksWatch_State(t *testing.T) {
if consulAddr == "" {
t.Skip()
}
plan := mustParse(t, "type:checks state:warning")
invoke := 0
plan.Handler = func(idx uint64, raw interface{}) {
if invoke == 0 {
if raw == nil {
return
}
v, ok := raw.([]*consulapi.HealthCheck)
if len(v) == 0 {
return
}
if !ok || v[0].CheckID != "foobar" {
t.Fatalf("Bad: %#v", raw)
}
invoke++
}
}

go func() {
time.Sleep(20 * time.Millisecond)

catalog := plan.client.Catalog()
reg := &consulapi.CatalogRegistration{
Node: "foobar",
Address: "1.1.1.1",
Datacenter: "dc1",
Check: &consulapi.AgentCheck{
Node: "foobar",
CheckID: "foobar",
Name: "foobar",
Status: "warning",
},
}
catalog.Register(reg, nil)

time.Sleep(20 * time.Millisecond)
plan.Stop()

dereg := &consulapi.CatalogDeregistration{
Node: "foobar",
Address: "1.1.1.1",
Datacenter: "dc1",
}
catalog.Deregister(dereg, nil)
}()

err := plan.Run(consulAddr)
if err != nil {
t.Fatalf("err: %v", err)
}

if invoke == 0 {
t.Fatalf("bad: %v", invoke)
}
}

func TestChecksWatch_Service(t *testing.T) {
if consulAddr == "" {
t.Skip()
}
plan := mustParse(t, "type:checks service:foobar")
invoke := 0
plan.Handler = func(idx uint64, raw interface{}) {
if invoke == 0 {
if raw == nil {
return
}
v, ok := raw.([]*consulapi.HealthCheck)
if len(v) == 0 {
return
}
if !ok || v[0].CheckID != "foobar" {
t.Fatalf("Bad: %#v", raw)
}
invoke++
}
}

go func() {
time.Sleep(20 * time.Millisecond)

catalog := plan.client.Catalog()
reg := &consulapi.CatalogRegistration{
Node: "foobar",
Address: "1.1.1.1",
Datacenter: "dc1",
Service: &consulapi.AgentService{
ID: "foobar",
Service: "foobar",
},
Check: &consulapi.AgentCheck{
Node: "foobar",
CheckID: "foobar",
Name: "foobar",
Status: "passing",
ServiceID: "foobar",
},
}
_, err := catalog.Register(reg, nil)
if err != nil {
t.Fatalf("err: %v", err)
}

time.Sleep(20 * time.Millisecond)
plan.Stop()

dereg := &consulapi.CatalogDeregistration{
Node: "foobar",
Address: "1.1.1.1",
Datacenter: "dc1",
}
catalog.Deregister(dereg, nil)
}()

err := plan.Run(consulAddr)
if err != nil {
t.Fatalf("err: %v", err)
}

if invoke == 0 {
t.Fatalf("bad: %v", invoke)
}
}

0 comments on commit 5aefdf0

Please sign in to comment.