Skip to content

Commit

Permalink
Add support for continuous queries.
Browse files Browse the repository at this point in the history
  • Loading branch information
toddboom authored and jvshahid committed Jan 17, 2014
1 parent 1ae3320 commit 889b218
Show file tree
Hide file tree
Showing 20 changed files with 1,065 additions and 25 deletions.
2 changes: 1 addition & 1 deletion Makefile.in
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ ifneq ($(only),)
GOTEST_OPTS = -gocheck.f $(only)
endif
ifneq ($(verbose),off)
GOTEST_OPTS += -v -gocheck.v
GOTEST_OPTS += -v -gocheck.v -gocheck.vv
endif

test: test_dependencies parser
Expand Down
55 changes: 55 additions & 0 deletions src/api/http/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"path/filepath"
"protocol"
"regexp"
"strconv"
"strings"
"time"
)
Expand Down Expand Up @@ -99,6 +100,11 @@ func (self *HttpServer) Serve(listener net.Listener) {
self.registerEndpoint(p, "del", "/db/:db/users/:user", self.deleteDbUser)
self.registerEndpoint(p, "post", "/db/:db/users/:user", self.updateDbUser)

// continuous queries management interface
self.registerEndpoint(p, "get", "/db/:db/continuous_queries", self.listDbContinuousQueries)
self.registerEndpoint(p, "post", "/db/:db/continuous_queries", self.createDbContinuousQueries)
self.registerEndpoint(p, "del", "/db/:db/continuous_queries/:id", self.deleteDbContinuousQueries)

// healthcheck
self.registerEndpoint(p, "get", "/ping", self.ping)

Expand Down Expand Up @@ -622,6 +628,10 @@ type User struct {
Name string `json:"username"`
}

type NewContinuousQuery struct {
Query string `json:"query"`
}

func (self *HttpServer) listClusterAdmins(w libhttp.ResponseWriter, r *libhttp.Request) {
self.tryAsClusterAdmin(w, r, func(u common.User) (int, interface{}) {
names, err := self.userManager.ListClusterAdmins(u)
Expand Down Expand Up @@ -906,3 +916,48 @@ func (self *HttpServer) listInterfaces(w libhttp.ResponseWriter, r *libhttp.Requ
w.Write(body)
}
}

func (self *HttpServer) listDbContinuousQueries(w libhttp.ResponseWriter, r *libhttp.Request) {
db := r.URL.Query().Get(":db")

self.tryAsDbUserAndClusterAdmin(w, r, func(u common.User) (int, interface{}) {
queries, err := self.coordinator.ListContinuousQueries(u, db)
if err != nil {
return errorToStatusCode(err), err.Error()
}

return libhttp.StatusOK, queries
})
}

func (self *HttpServer) createDbContinuousQueries(w libhttp.ResponseWriter, r *libhttp.Request) {
db := r.URL.Query().Get(":db")

self.tryAsDbUserAndClusterAdmin(w, r, func(u common.User) (int, interface{}) {
var values interface{}
body, err := ioutil.ReadAll(r.Body)
if err != nil {
return libhttp.StatusInternalServerError, err.Error()
}
json.Unmarshal(body, &values)
query := values.(map[string]interface{})["query"].(string)
fmt.Println(query)

if err := self.coordinator.CreateContinuousQuery(u, db, query); err != nil {
return errorToStatusCode(err), err.Error()
}
return libhttp.StatusOK, nil
})
}

func (self *HttpServer) deleteDbContinuousQueries(w libhttp.ResponseWriter, r *libhttp.Request) {
db := r.URL.Query().Get(":db")
id, _ := strconv.ParseInt(r.URL.Query().Get(":id"), 10, 64)

self.tryAsDbUserAndClusterAdmin(w, r, func(u common.User) (int, interface{}) {
if err := self.coordinator.DeleteContinuousQuery(u, db, uint32(id)); err != nil {
return errorToStatusCode(err), err.Error()
}
return libhttp.StatusOK, nil
})
}
137 changes: 132 additions & 5 deletions src/api/http/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,11 @@ func (self *MockEngine) RunQuery(_ common.User, _ string, query string, localOnl

type MockCoordinator struct {
coordinator.Coordinator
series []*protocol.Series
deleteQueries []*parser.DeleteQuery
db string
droppedDb string
series []*protocol.Series
continuousQueries map[string][]*coordinator.ContinuousQuery
deleteQueries []*parser.DeleteQuery
db string
droppedDb string
}

func (self *MockCoordinator) WriteSeriesData(_ common.User, db string, series *protocol.Series) error {
Expand All @@ -127,14 +128,56 @@ func (self *MockCoordinator) DropDatabase(_ common.User, db string) error {
return nil
}

func (self *MockCoordinator) ListContinuousQueries(_ common.User, db string) ([]*protocol.Series, error) {
points := []*protocol.Point{}

for _, query := range self.continuousQueries[db] {
queryId := int64(query.Id)
queryString := query.Query
points = append(points, &protocol.Point{
Values: []*protocol.FieldValue{
&protocol.FieldValue{Int64Value: &queryId},
&protocol.FieldValue{StringValue: &queryString},
},
Timestamp: nil,
SequenceNumber: nil,
})
}

seriesName := "continuous queries"
series := []*protocol.Series{&protocol.Series{
Name: &seriesName,
Fields: []string{"id", "query"},
Points: points,
}}
return series, nil
}

func (self *MockCoordinator) CreateContinuousQuery(_ common.User, db string, query string) error {
self.continuousQueries[db] = append(self.continuousQueries[db], &coordinator.ContinuousQuery{2, query})
return nil
}

func (self *MockCoordinator) DeleteContinuousQuery(_ common.User, db string, id uint32) error {
length := len(self.continuousQueries[db])
_, self.continuousQueries[db] = self.continuousQueries[db][length-1], self.continuousQueries[db][:length-1]
return nil
}

func (self *ApiSuite) formatUrl(path string, args ...interface{}) string {
path = fmt.Sprintf(path, args...)
port := self.listener.Addr().(*net.TCPAddr).Port
return fmt.Sprintf("http://localhost:%d%s", port, path)
}

func (self *ApiSuite) SetUpSuite(c *C) {
self.coordinator = &MockCoordinator{}
self.coordinator = &MockCoordinator{
continuousQueries: map[string][]*coordinator.ContinuousQuery{
"db1": []*coordinator.ContinuousQuery{
&coordinator.ContinuousQuery{1, "select * from foo into bar;"},
},
},
}
self.manager = &MockUserManager{
clusterAdmins: []string{"root"},
dbUsers: map[string][]string{"db1": []string{"db_user1"}},
Expand Down Expand Up @@ -733,3 +776,87 @@ func (self *ApiSuite) TestBasicAuthentication(c *C) {
c.Assert(err, IsNil)
c.Assert(users, DeepEquals, []*coordinator.Database{&coordinator.Database{"db1", 1}, &coordinator.Database{"db2", 1}})
}

func (self *ApiSuite) TestContinuousQueryOperations(c *C) {
// verify current continuous query index
url := self.formatUrl("/db/db1/continuous_queries?u=root&p=root")
resp, err := libhttp.Get(url)
c.Assert(err, IsNil)
c.Assert(resp.Header.Get("content-type"), Equals, "application/json")
body, err := ioutil.ReadAll(resp.Body)
c.Assert(err, IsNil)
series := []*protocol.Series{}
err = json.Unmarshal(body, &series)
c.Assert(err, IsNil)
c.Assert(series, HasLen, 1)
c.Assert(series[0].Points, HasLen, 1)
c.Assert(series[0].Points[0].Values, HasLen, 2)

c.Assert(*series[0].Name, Equals, "continuous queries")
c.Assert(*series[0].Points[0].Values[0].Int64Value, Equals, int64(1))
c.Assert(*series[0].Points[0].Values[1].StringValue, Equals, "select * from foo into bar;")

resp.Body.Close()

// add a new continuous query
data := `{"query": "select * from quu into qux;"}`
url = self.formatUrl("/db/db1/continuous_queries?u=root&p=root")
resp, err = libhttp.Post(url, "application/json", bytes.NewBufferString(data))
c.Assert(err, IsNil)
c.Assert(resp.StatusCode, Equals, libhttp.StatusOK)
resp.Body.Close()

// verify updated continuous query index
url = self.formatUrl("/db/db1/continuous_queries?u=root&p=root")
resp, err = libhttp.Get(url)
c.Assert(err, IsNil)
c.Assert(resp.Header.Get("content-type"), Equals, "application/json")
body, err = ioutil.ReadAll(resp.Body)
c.Assert(err, IsNil)
series = []*protocol.Series{}
err = json.Unmarshal(body, &series)
c.Assert(err, IsNil)

c.Assert(series, HasLen, 1)
c.Assert(series[0].Points, HasLen, 2)
c.Assert(series[0].Points[0].Values, HasLen, 2)
c.Assert(series[0].Points[1].Values, HasLen, 2)

c.Assert(*series[0].Name, Equals, "continuous queries")
c.Assert(*series[0].Points[0].Values[0].Int64Value, Equals, int64(1))
c.Assert(*series[0].Points[0].Values[1].StringValue, Equals, "select * from foo into bar;")
c.Assert(*series[0].Points[1].Values[0].Int64Value, Equals, int64(2))
c.Assert(*series[0].Points[1].Values[1].StringValue, Equals, "select * from quu into qux;")

resp.Body.Close()

// delete the newly-created query
url = self.formatUrl("/db/db1/continuous_queries/2?u=root&p=root")
req, err := libhttp.NewRequest("DELETE", url, nil)
c.Assert(err, IsNil)
resp, err = libhttp.DefaultClient.Do(req)
c.Assert(err, IsNil)
_, err = ioutil.ReadAll(resp.Body)
c.Assert(err, IsNil)
c.Assert(resp.StatusCode, Equals, libhttp.StatusOK)
resp.Body.Close()

// verify updated continuous query index
url = self.formatUrl("/db/db1/continuous_queries?u=root&p=root")
resp, err = libhttp.Get(url)
c.Assert(err, IsNil)
c.Assert(resp.Header.Get("content-type"), Equals, "application/json")
body, err = ioutil.ReadAll(resp.Body)
c.Assert(err, IsNil)
series = []*protocol.Series{}
err = json.Unmarshal(body, &series)
c.Assert(err, IsNil)
c.Assert(series, HasLen, 1)
c.Assert(series[0].Points, HasLen, 1)
c.Assert(series[0].Points[0].Values, HasLen, 2)

c.Assert(*series[0].Name, Equals, "continuous queries")
c.Assert(*series[0].Points[0].Values[0].Int64Value, Equals, int64(1))
c.Assert(*series[0].Points[0].Values[1].StringValue, Equals, "select * from foo into bar;")
resp.Body.Close()
}
81 changes: 81 additions & 0 deletions src/coordinator/cluster_configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@ import (
"encoding/gob"
"errors"
"fmt"
"parser"
"sync"
"sync/atomic"
"time"
)

/*
Expand All @@ -29,13 +31,22 @@ type ClusterConfiguration struct {
dbUsers map[string]map[string]*dbUser
servers []*ClusterServer
serversLock sync.RWMutex
continuousQueries map[string][]*ContinuousQuery
continuousQueriesLock sync.RWMutex
parsedContinuousQueries map[string]map[uint32]*parser.SelectQuery
continuousQueryTimestamp time.Time
hasRunningServers bool
localServerId uint32
ClusterVersion uint32
config *configuration.Configuration
addedLocalServerWait chan bool
}

type ContinuousQuery struct {
Id uint32
Query string
}

type Database struct {
Name string `json:"name"`
ReplicationFactor uint8 `json:"replicationFactor"`
Expand All @@ -46,6 +57,8 @@ func NewClusterConfiguration(config *configuration.Configuration) *ClusterConfig
databaseReplicationFactors: make(map[string]uint8),
clusterAdmins: make(map[string]*clusterAdmin),
dbUsers: make(map[string]map[string]*dbUser),
continuousQueries: make(map[string][]*ContinuousQuery),
parsedContinuousQueries: make(map[string]map[uint32]*parser.SelectQuery),
servers: make([]*ClusterServer, 0),
config: config,
addedLocalServerWait: make(chan bool, 1),
Expand Down Expand Up @@ -336,6 +349,74 @@ func (self *ClusterConfiguration) DropDatabase(name string) error {
return nil
}

func (self *ClusterConfiguration) CreateContinuousQuery(db string, query string) error {
self.continuousQueriesLock.Lock()
defer self.continuousQueriesLock.Unlock()

if self.continuousQueries == nil {
self.continuousQueries = map[string][]*ContinuousQuery{}
}

if self.parsedContinuousQueries == nil {
self.parsedContinuousQueries = map[string]map[uint32]*parser.SelectQuery{}
}

maxId := uint32(0)
for _, query := range self.continuousQueries[db] {
if query.Id > maxId {
maxId = query.Id
}
}

selectQuery, err := parser.ParseSelectQuery(query)
if err != nil {
return fmt.Errorf("Failed to parse continuous query: %s", query)
}

queryId := maxId + 1
if self.parsedContinuousQueries[db] == nil {
self.parsedContinuousQueries[db] = map[uint32]*parser.SelectQuery{queryId: selectQuery}
} else {
self.parsedContinuousQueries[db][queryId] = selectQuery
}
self.continuousQueries[db] = append(self.continuousQueries[db], &ContinuousQuery{queryId, query})

return nil
}

func (self *ClusterConfiguration) SetContinuousQueryTimestamp(timestamp time.Time) error {
self.continuousQueriesLock.Lock()
defer self.continuousQueriesLock.Unlock()

self.continuousQueryTimestamp = timestamp

return nil
}

func (self *ClusterConfiguration) DeleteContinuousQuery(db string, id uint32) error {
self.continuousQueriesLock.Lock()
defer self.continuousQueriesLock.Unlock()

for i, query := range self.continuousQueries[db] {
if query.Id == id {
q := self.continuousQueries[db]
q[len(q)-1], q[i], q = nil, q[len(q)-1], q[:len(q)-1]
self.continuousQueries[db] = q
delete(self.parsedContinuousQueries[db], id)
break
}
}

return nil
}

func (self *ClusterConfiguration) GetContinuousQueries(db string) []*ContinuousQuery {
self.continuousQueriesLock.Lock()
defer self.continuousQueriesLock.Unlock()

return self.continuousQueries[db]
}

func (self *ClusterConfiguration) GetDbUsers(db string) (names []string) {
self.usersLock.RLock()
defer self.usersLock.RUnlock()
Expand Down
Loading

0 comments on commit 889b218

Please sign in to comment.