Skip to content

Commit

Permalink
remove ProcessContinousQueries from httpd endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
corylanou committed Oct 20, 2016
1 parent 6fd74a6 commit 5b72b87
Show file tree
Hide file tree
Showing 5 changed files with 1 addition and 67 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
- [#7161](https://github.com/influxdata/influxdb/issues/7161): Drop measurement causes cache max memory exceeded error.
- [#7334](https://github.com/influxdata/influxdb/issues/7334): Panic with unread show series iterators during drop database
- [#7482](https://github.com/influxdata/influxdb/issues/7482): Fix issue where point would be written to wrong shard.
- [#7431](https://github.com/influxdata/influxdb/issues/7431): Remove /data/process_continuous_queries endpoint.

## v1.0.2 [2016-10-05]

Expand Down
7 changes: 0 additions & 7 deletions cmd/influxd/run/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,13 +267,6 @@ func (s *Server) appendHTTPDService(c httpd.Config) {
srv.Handler.PointsWriter = s.PointsWriter
srv.Handler.Version = s.buildInfo.Version

// If a ContinuousQuerier service has been started, attach it.
for _, srvc := range s.Services {
if cqsrvc, ok := srvc.(continuous_querier.ContinuousQuerier); ok {
srv.Handler.ContinuousQuerier = cqsrvc
}
}

s.Services = append(s.Services, srv)
}

Expand Down
9 changes: 0 additions & 9 deletions cmd/influxd/run/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6268,15 +6268,6 @@ func TestServer_ContinuousQuery(t *testing.T) {
// Run first test to create CQs.
runTest(&test, t)

// Trigger CQs to run.
u := fmt.Sprintf("%s/data/process_continuous_queries?time=%d", s.URL(), interval0.UnixNano())
if _, err := s.HTTPPost(u, nil); err != nil {
t.Fatal(err)
}

// Wait for CQs to run. TODO: fix this ugly hack
//time.Sleep(time.Second * 5)

// Setup tests to check the CQ results.
test2 := NewTest("db0", "rp1")
test2.addQueries([]*Query{
Expand Down
50 changes: 0 additions & 50 deletions services/httpd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"github.com/influxdata/influxdb/influxql"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/monitor"
"github.com/influxdata/influxdb/services/continuous_querier"
"github.com/influxdata/influxdb/services/meta"
"github.com/influxdata/influxdb/tsdb"
"github.com/influxdata/influxdb/uuid"
Expand Down Expand Up @@ -89,8 +88,6 @@ type Handler struct {
WritePoints(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error
}

ContinuousQuerier continuous_querier.ContinuousQuerier

Config *Config
Logger *log.Logger
CLFLogger *log.Logger
Expand Down Expand Up @@ -144,11 +141,6 @@ func NewHandler(c Config) *Handler {
"status-head",
"HEAD", "/status", false, true, h.serveStatus,
},
// TODO: (corylanou) remove this and associated code
Route{ // Tell data node to run CQs that should be run
"process-continuous-queries",
"POST", "/data/process_continuous_queries", false, false, h.serveProcessContinuousQueries,
},
}...)

return h
Expand Down Expand Up @@ -184,7 +176,6 @@ func (h *Handler) Statistics(tags map[string]string) []models.Statistic {
Tags: tags,
Values: map[string]interface{}{
statRequest: atomic.LoadInt64(&h.stats.Requests),
statCQRequest: atomic.LoadInt64(&h.stats.CQRequests),
statQueryRequest: atomic.LoadInt64(&h.stats.QueryRequests),
statWriteRequest: atomic.LoadInt64(&h.stats.WriteRequests),
statPingRequest: atomic.LoadInt64(&h.stats.PingRequests),
Expand Down Expand Up @@ -279,47 +270,6 @@ func (h *Handler) writeHeader(w http.ResponseWriter, code int) {
w.WriteHeader(code)
}

func (h *Handler) serveProcessContinuousQueries(w http.ResponseWriter, r *http.Request, user *meta.UserInfo) {
atomic.AddInt64(&h.stats.CQRequests, 1)

// If the continuous query service isn't configured, return 404.
if h.ContinuousQuerier == nil {
h.writeHeader(w, http.StatusNotImplemented)
return
}

q := r.URL.Query()

// Get the database name (blank means all databases).
db := q.Get("db")
// Get the name of the CQ to run (blank means run all).
name := q.Get("name")
// Get the time for which the CQ should be evaluated.
t := time.Now()
var err error
s := q.Get("time")
if s != "" {
t, err = time.Parse(time.RFC3339Nano, s)
if err != nil {
// Try parsing as an int64 nanosecond timestamp.
i, err := strconv.ParseInt(s, 10, 64)
if err != nil {
h.writeHeader(w, http.StatusBadRequest)
return
}
t = time.Unix(0, i)
}
}

// Pass the request to the CQ service.
if err := h.ContinuousQuerier.Run(db, name, t); err != nil {
h.writeHeader(w, http.StatusBadRequest)
return
}

h.writeHeader(w, http.StatusNoContent)
}

// serveQuery parses an incoming query and, if valid, executes the query.
func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user *meta.UserInfo) {
atomic.AddInt64(&h.stats.QueryRequests, 1)
Expand Down
1 change: 0 additions & 1 deletion services/httpd/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
// statistics gathered by the httpd package.
const (
statRequest = "req" // Number of HTTP requests served
statCQRequest = "cqReq" // Number of CQ-execute requests served
statQueryRequest = "queryReq" // Number of query requests served
statWriteRequest = "writeReq" // Number of write requests serverd
statPingRequest = "pingReq" // Number of ping requests served
Expand Down

0 comments on commit 5b72b87

Please sign in to comment.