Skip to content

Commit edbb465

Browse files
committed
nsqadmin: add POST /nodes/:node (topic tombstoning)
1 parent 61312d2 commit edbb465

File tree

2 files changed

+101
-0
lines changed

2 files changed

+101
-0
lines changed

nsqadmin/http.go

+78
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ func NewHTTPServer(ctx *Context) *httpServer {
7979
router.Handle("GET", "/topics/:topic/:channel", http_api.Decorate(s.doChannel, log, http_api.V1))
8080
router.Handle("GET", "/nodes", http_api.Decorate(s.doNodes, log, http_api.V1))
8181
router.Handle("POST", "/topics", http_api.Decorate(s.doCreateTopicChannel, log, http_api.V1))
82+
router.Handle("DELETE", "/nodes/:node", http_api.Decorate(s.doTombstoneTopicNode, log, http_api.V1))
8283

8384
// deprecated endpoints
8485
router.Handle("GET", "/", http_api.Decorate(s.indexHandler, log))
@@ -177,6 +178,28 @@ func (s *httpServer) doNodes(w http.ResponseWriter, req *http.Request, ps httpro
177178
}{producers}, nil
178179
}
179180

181+
func (s *httpServer) doTombstoneTopicNode(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
182+
node := ps.ByName("node")
183+
184+
data, err := ioutil.ReadAll(req.Body)
185+
if err != nil {
186+
return nil, http_api.Err{400, err.Error()}
187+
}
188+
189+
js, err := simplejson.NewJson(data)
190+
if err != nil {
191+
return nil, http_api.Err{400, err.Error()}
192+
}
193+
194+
topicName := js.Get("topic").MustString()
195+
196+
err = s.tombstoneTopicNode(req, topicName, node)
197+
if err != nil {
198+
return nil, http_api.Err{500, err.Error()}
199+
}
200+
return nil, nil
201+
}
202+
180203
func (s *httpServer) doCreateTopicChannel(w http.ResponseWriter, req *http.Request, ps httprouter.Params) (interface{}, error) {
181204
data, err := ioutil.ReadAll(req.Body)
182205
if err != nil {
@@ -266,6 +289,61 @@ func (s *httpServer) createTopicChannel(req *http.Request, topicName string, cha
266289
return nil
267290
}
268291

292+
func (s *httpServer) tombstoneTopicNode(req *http.Request, topicName string, node string) error {
293+
if !protocol.IsValidTopicName(topicName) {
294+
return errors.New("INVALID_TOPIC")
295+
}
296+
297+
if node == "" {
298+
return errors.New("INVALID_NODE")
299+
}
300+
301+
// tombstone the topic on all the lookupds
302+
for _, addr := range s.ctx.nsqadmin.opts.NSQLookupdHTTPAddresses {
303+
nsqlookupdVersion, err := s.ci.GetVersion(addr)
304+
if err != nil {
305+
s.ctx.nsqadmin.logf("ERROR: failed to get nsqlookupd %s version - %s", addr, err)
306+
}
307+
308+
uri := "tombstone_topic_producer"
309+
if !nsqlookupdVersion.LT(v1EndpointVersion) {
310+
uri = "topic/tombstone"
311+
}
312+
313+
endpoint := fmt.Sprintf("http://%s/%s?topic=%s&node=%s",
314+
addr, uri,
315+
url.QueryEscape(topicName), url.QueryEscape(node))
316+
s.ctx.nsqadmin.logf("LOOKUPD: querying %s", endpoint)
317+
_, err = http_api.NegotiateV1("POST", endpoint, nil)
318+
if err != nil {
319+
s.ctx.nsqadmin.logf("ERROR: lookupd %s - %s", endpoint, err)
320+
}
321+
}
322+
323+
nsqdVersion, err := s.ci.GetVersion(node)
324+
if err != nil {
325+
s.ctx.nsqadmin.logf("ERROR: failed to get nsqd %s version - %s", node, err)
326+
}
327+
328+
uri := "delete_topic"
329+
if !nsqdVersion.LT(v1EndpointVersion) {
330+
uri = "topic/delete"
331+
}
332+
333+
// delete the topic on the producer
334+
endpoint := fmt.Sprintf("http://%s/%s?topic=%s", node,
335+
uri, url.QueryEscape(topicName))
336+
s.ctx.nsqadmin.logf("NSQD: querying %s", endpoint)
337+
_, err = http_api.NegotiateV1("POST", endpoint, nil)
338+
if err != nil {
339+
s.ctx.nsqadmin.logf("ERROR: nsqd %s - %s", endpoint, err)
340+
}
341+
342+
s.notifyAdminAction("tombstone_topic_producer", topicName, "", node, req)
343+
344+
return nil
345+
}
346+
269347
func (s *httpServer) getProducers(topicName string) []string {
270348
var producers []string
271349
if len(s.ctx.nsqadmin.opts.NSQLookupdHTTPAddresses) != 0 {

nsqadmin/http_test.go

+23
Original file line numberDiff line numberDiff line change
@@ -275,3 +275,26 @@ func TestHTTPCreateTopicChannelPOST(t *testing.T) {
275275
equal(t, resp.StatusCode, 200)
276276
resp.Body.Close()
277277
}
278+
279+
func TestHTTPTombstoneTopicNodePOST(t *testing.T) {
280+
dataPath, nsqds, nsqlookupds, nsqadmin1 := bootstrapNSQCluster(t)
281+
defer os.RemoveAll(dataPath)
282+
defer nsqds[0].Exit()
283+
defer nsqlookupds[0].Exit()
284+
defer nsqadmin1.Exit()
285+
286+
topicName := "test_tombstone_topic_node_post" + strconv.Itoa(int(time.Now().Unix()))
287+
nsqds[0].GetTopic(topicName)
288+
time.Sleep(100 * time.Millisecond)
289+
290+
client := http.Client{}
291+
url := fmt.Sprintf("http://%s/nodes/%s", nsqadmin1.RealHTTPAddr(), nsqds[0].RealHTTPAddr())
292+
body, _ := json.Marshal(map[string]interface{}{
293+
"topic": topicName,
294+
})
295+
req, _ := http.NewRequest("DELETE", url, bytes.NewBuffer(body))
296+
resp, err := client.Do(req)
297+
equal(t, err, nil)
298+
equal(t, resp.StatusCode, 200)
299+
resp.Body.Close()
300+
}

0 commit comments

Comments
 (0)