Skip to content

Commit

Permalink
nsqadmin: add /nodes/:node
Browse files Browse the repository at this point in the history
  • Loading branch information
mreiferson committed Jul 31, 2015
1 parent edbb465 commit e06eed1
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 0 deletions.
47 changes: 47 additions & 0 deletions nsqadmin/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ func NewHTTPServer(ctx *Context) *httpServer {
router.Handle("GET", "/topics/:topic", http_api.Decorate(s.doTopic, log, http_api.V1))
router.Handle("GET", "/topics/:topic/:channel", http_api.Decorate(s.doChannel, log, http_api.V1))
router.Handle("GET", "/nodes", http_api.Decorate(s.doNodes, log, http_api.V1))
router.Handle("GET", "/nodes/:node", http_api.Decorate(s.doNode, log, http_api.V1))
router.Handle("POST", "/topics", http_api.Decorate(s.doCreateTopicChannel, log, http_api.V1))
router.Handle("DELETE", "/nodes/:node", http_api.Decorate(s.doTombstoneTopicNode, log, http_api.V1))

Expand Down Expand Up @@ -200,6 +201,52 @@ func (s *httpServer) doTombstoneTopicNode(w http.ResponseWriter, req *http.Reque
return nil, nil
}

func (s *httpServer) doNode(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
node := ps.ByName("node")

found := false
for _, n := range s.ctx.nsqadmin.opts.NSQDHTTPAddresses {
if node == n {
found = true
break
}
}
producers, _ := s.ci.GetLookupdProducers(s.ctx.nsqadmin.opts.NSQLookupdHTTPAddresses)
for _, p := range producers {
if node == fmt.Sprintf("%s:%d", p.BroadcastAddress, p.HTTPPort) {
found = true
break
}
}

if !found {
return nil, http_api.Err{404, "NODE_NOT_FOUND"}
}

topicStats, _, _ := s.ci.GetNSQDStats([]string{node}, "")

var totalClients int64
var totalMessages int64
for _, ts := range topicStats {
for _, cs := range ts.Channels {
totalClients += int64(len(cs.Clients))
}
totalMessages += ts.MessageCount
}

return struct {
Node Node `json:"node"`
TopicStats []*clusterinfo.TopicStats `json:"topics"`
TotalMessages int64 `json:"total_messages"`
TotalClients int64 `json:"total_clients"`
}{
Node: Node(node),
TopicStats: topicStats,
TotalMessages: totalMessages,
TotalClients: totalClients,
}, nil
}

func (s *httpServer) doCreateTopicChannel(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
data, err := ioutil.ReadAll(req.Body)
if err != nil {
Expand Down
40 changes: 40 additions & 0 deletions nsqadmin/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,46 @@ func TestHTTPChannelGET(t *testing.T) {
equal(t, len(js.Get("clients").MustArray()), 0)
}

func TestHTTPNodesSingleGET(t *testing.T) {
dataPath, nsqds, nsqlookupds, nsqadmin1 := bootstrapNSQCluster(t)
defer os.RemoveAll(dataPath)
defer nsqds[0].Exit()
defer nsqlookupds[0].Exit()
defer nsqadmin1.Exit()

topicName := "test_nodes_single_get" + strconv.Itoa(int(time.Now().Unix()))
topic := nsqds[0].GetTopic(topicName)
topic.GetChannel("ch")
time.Sleep(100 * time.Millisecond)

client := http.Client{}
url := fmt.Sprintf("http://%s/nodes/%s", nsqadmin1.RealHTTPAddr(),
nsqds[0].RealHTTPAddr().String())
req, _ := http.NewRequest("GET", url, nil)
resp, err := client.Do(req)
equal(t, err, nil)
equal(t, resp.StatusCode, 200)
body, _ := ioutil.ReadAll(resp.Body)
resp.Body.Close()

js, err := simplejson.NewJson(body)
equal(t, err, nil)
equal(t, js.Get("node").MustString(), nsqds[0].RealHTTPAddr().String())
equal(t, len(js.Get("topics").MustArray()), 1)
testTopic := js.Get("topics").GetIndex(0)
equal(t, testTopic.Get("name").MustString(), topicName)
equal(t, testTopic.Get("depth").MustInt(), 0)
equal(t, testTopic.Get("memory_depth").MustInt(), 0)
equal(t, testTopic.Get("backend_depth").MustInt(), 0)
equal(t, testTopic.Get("msg_count").MustInt(), 0)
equal(t, testTopic.Get("paused").MustBool(), false)
equal(t, testTopic.Get("in_flight_count").MustInt(), 0)
equal(t, testTopic.Get("defer_count").MustInt(), 0)
equal(t, testTopic.Get("requeue_count").MustInt(), 0)
equal(t, testTopic.Get("timeout_count").MustInt(), 0)
equal(t, testTopic.Get("msg_count").MustInt(), 0)
}

func TestHTTPCreateTopicPOST(t *testing.T) {
dataPath, nsqds, nsqlookupds, nsqadmin1 := bootstrapNSQCluster(t)
defer os.RemoveAll(dataPath)
Expand Down

0 comments on commit e06eed1

Please sign in to comment.