Skip to content

Commit

Permalink
Partial engine integration.
Browse files Browse the repository at this point in the history
  • Loading branch information
benbjohnson committed Jan 13, 2015
1 parent 4494a3a commit 47cd03f
Show file tree
Hide file tree
Showing 5 changed files with 200 additions and 35 deletions.
94 changes: 80 additions & 14 deletions database.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"sort"
"strings"
"time"

"github.com/influxdb/influxdb/influxql"
)

// database is a collection of retention policies and shards. It also has methods
Expand Down Expand Up @@ -120,7 +122,7 @@ func NewMeasurement(name string) *Measurement {

// CreateFieldIfNotExists creates a new field with an autoincrementing ID.
// Returns an error if 255 fields have already been created on the measurement.
func (m *Measurement) createFieldIfNotExists(name string, typ FieldType) (*Field, error) {
func (m *Measurement) createFieldIfNotExists(name string, typ influxql.DataType) (*Field, error) {
// Ignore if the field already exists.
if f := m.FieldByName(name); f != nil {
return f, nil
Expand Down Expand Up @@ -280,21 +282,11 @@ type Measurements []*Measurement

// Field represents a series field.
type Field struct {
ID uint8 `json:"id,omitempty"`
Name string `json:"name,omitempty"`
Type FieldType `json:"field"`
ID uint8 `json:"id,omitempty"`
Name string `json:"name,omitempty"`
Type influxql.DataType `json:"type,omitempty"`
}

type FieldType int

const (
Int64 FieldType = iota
Float64
String
Boolean
Binary
)

// Fields represents a list of fields.
type Fields []*Field

Expand All @@ -306,6 +298,16 @@ type Series struct {
measurement *Measurement
}

// match returns true if all tags match the series' tags.
func (s *Series) match(tags map[string]string) bool {
for k, v := range tags {
if s.Tags[k] != v {
return false
}
}
return true
}

// RetentionPolicy represents a policy for creating new shards in a database and how long they're kept around for.
type RetentionPolicy struct {
// Unique name within database. Required.
Expand Down Expand Up @@ -737,3 +739,67 @@ func marshalTags(tags map[string]string) []byte {
}
return []byte(strings.Join(s, "|"))
}

// dbi is an interface the query engine to communicate with the database during planning.
type dbi struct {
server *Server
db *database
}

// MatchSeries returns a list of series data ids matching a name and tags.
func (dbi *dbi) MatchSeries(name string, tags map[string]string) (a []uint32) {
// Find measurement by name.
m := dbi.db.measurements[name]
if m == nil {
return nil
}

// Match each series on the measurement by tagset.
// TODO: Use paul's fancy index.
for _, s := range m.seriesByID {
if s.match(tags) {
a = append(a, s.ID)
}
}
return
}

// SeriesTagValues returns a slice of tag values for a series.
func (dbi *dbi) SeriesTagValues(seriesID uint32, keys []string) []string {
// Find series by id.
s := dbi.db.series[seriesID]

// Lookup value for each key.
values := make([]string, len(keys))
for i, keys := range keys {
values[i] = s.Tags[keys]
}
return values
}

// Field returns the id and data type for a series field.
// Returns id of zero if not a field.
func (dbi *dbi) Field(name, field string) (fieldID uint8, typ influxql.DataType) {
// Find measurement by name.
m := dbi.db.measurements[name]
if m == nil {
return 0, influxql.Unknown
}

// Find field by name.
f := m.FieldByName(name)
if f == nil {
return 0, influxql.Unknown
}

return f.ID, f.Type
}

// CreateIterator returns an iterator given a series data id, field id, & field data type.
func (dbi *dbi) CreateIterator(seriesID uint32, fieldID uint8, typ influxql.DataType, min, max time.Time, interval time.Duration) influxql.Iterator {
// TODO: Find shard group.
// TODO: Find shard for series.
// TODO: Open bolt cursor.
// TODO: Return wrapper cursor.
panic("TODO")
}
4 changes: 4 additions & 0 deletions influxdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ var (

// ErrSeriesExists is returned when attempting to set the id of a series by database, name and tags that already exists
ErrSeriesExists = errors.New("series already exists")

// ErrNotExecuted is returned when a statement is not executed in a query.
// This can occur when a previous statement in the same query has errorred.
ErrNotExecuted = errors.New("not executed")
)

// mustMarshal encodes a value to JSON.
Expand Down
97 changes: 84 additions & 13 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"time"

"code.google.com/p/go.crypto/bcrypt"
"github.com/influxdb/influxdb/influxql"
"github.com/influxdb/influxdb/messaging"
)

Expand Down Expand Up @@ -247,8 +248,9 @@ func (s *Server) setClient(client MessagingClient) error {

// Start goroutine to read messages from the broker.
if client != nil {
s.done = make(chan struct{}, 0)
go s.processor(client, s.done)
done := make(chan struct{}, 0)
s.done = done
go s.processor(client, done)
}

return nil
Expand Down Expand Up @@ -1211,42 +1213,42 @@ type createSeriesIfNotExistsCommand struct {
}

// WriteSeries writes series data to the database.
func (s *Server) WriteSeries(database, retentionPolicy, name string, tags map[string]string, timestamp time.Time, values map[string]interface{}) error {
func (s *Server) WriteSeries(database, retentionPolicy, name string, tags map[string]string, timestamp time.Time, values map[string]interface{}) (uint64, error) {
// Find the id for the series and tagset
seriesID, err := s.createSeriesIfNotExists(database, name, tags)
if err != nil {
return err
return 0, err
}

// If the retention policy is not set, use the default for this database.
if retentionPolicy == "" {
rp, err := s.DefaultRetentionPolicy(database)
if err != nil {
return fmt.Errorf("failed to determine default retention policy: %s", err.Error())
return 0, fmt.Errorf("failed to determine default retention policy: %s", err.Error())
}
retentionPolicy = rp.Name
}

// Retrieve measurement.
m, err := s.measurement(database, name)
if err != nil {
return err
return 0, err
} else if m == nil {
return ErrMeasurementNotFound
return 0, ErrMeasurementNotFound
}

// Retrieve shard group.
g, err := s.createShardGroupIfNotExists(database, retentionPolicy, timestamp)
if err != nil {
return fmt.Errorf("create shard(%s/%d): %s", retentionPolicy, timestamp.Format(time.RFC3339Nano), err)
return 0, fmt.Errorf("create shard(%s/%d): %s", retentionPolicy, timestamp.Format(time.RFC3339Nano), err)
}

// Find appropriate shard within the shard group.
sh := g.Shards[int(seriesID)%len(g.Shards)]

// Ignore requests that have no values.
if len(values) == 0 {
return nil
return 0, nil
}

// Convert string-key/values to fieldID-key/values.
Expand All @@ -1268,7 +1270,7 @@ func (s *Server) WriteSeries(database, retentionPolicy, name string, tags map[st
TopicID: sh.ID,
Data: data,
})
return err
return 0, err
}

// If we can successfully encode the string keys to raw field ids then
Expand All @@ -1279,12 +1281,11 @@ func (s *Server) WriteSeries(database, retentionPolicy, name string, tags map[st
data = append(data, marshalValues(rawValues)...)

// Publish "raw write series" message on shard's topic to broker.
_, err = s.client.Publish(&messaging.Message{
return s.client.Publish(&messaging.Message{
Type: writeRawSeriesMessageType,
TopicID: sh.ID,
Data: data,
})
return err
}

type writeSeriesCommand struct {
Expand Down Expand Up @@ -1331,7 +1332,7 @@ func (s *Server) applyWriteSeries(m *messaging.Message) error {
// Find or create fields.
// If too many fields are on the measurement then log the issue.
// If any other error occurs then exit.
f, err := mm.createFieldIfNotExists(k, Float64)
f, err := mm.createFieldIfNotExists(k, influxql.Number)
if err == ErrFieldOverflow {
log.Printf("no more fields allowed: %s::%s", mm.Name, k)
continue
Expand Down Expand Up @@ -1505,6 +1506,70 @@ func (s *Server) measurement(database, name string) (*Measurement, error) {
return db.measurements[name], nil
}

// ExecuteQuery executes an InfluxQL query against the server.
// Returns a resultset for each statement in the query.
// Stops on first execution error that occurs.
func (s *Server) Execute(q *influxql.Query, database string) []*Result {
// Build empty resultsets.
results := make([]*Result, len(q.Statements))

// Execute each statement.
for i, st := range q.Statements {
switch st := st.(type) {
case *influxql.SelectStatement:
results[i] = s.executeSelectStatement(st, database)
}
}

// Fill any empty results after error.
for i, res := range results {
if res == nil {
results[i] = &Result{Error: ErrNotExecuted}
}
}

return results
}

// plans and executes a select statement against a database.
func (s *Server) executeSelectStatement(stmt *influxql.SelectStatement, database string) *Result {
// Plan statement execution.
e, err := s.planSelectStatement(stmt, database)
if err != nil {
return &Result{Error: err}
}

// Execute plan.
ch, err := e.Execute()
if err != nil {
return &Result{Error: err}
}

// Read all rows from channel.
res := &Result{Rows: make([]*influxql.Row, 0)}
for row := range ch {
res.Rows = append(res.Rows, row)
}

return res
}

// plans a selection statement under lock.
func (s *Server) planSelectStatement(stmt *influxql.SelectStatement, database string) (*influxql.Executor, error) {
s.mu.Lock()
defer s.mu.Unlock()

// Find database.
db := s.databases[database]
if db == nil {
return nil, ErrDatabaseNotFound
}

// Plan query.
p := influxql.NewPlanner(&dbi{server: s, db: db})
return p.Plan(stmt)
}

// processor runs in a separate goroutine and processes all incoming broker messages.
func (s *Server) processor(client MessagingClient, done chan struct{}) {
for {
Expand Down Expand Up @@ -1561,6 +1626,12 @@ func (s *Server) processor(client MessagingClient, done chan struct{}) {
}
}

// Result represents a resultset returned from a single statement.
type Result struct {
Rows []*influxql.Row `json:"rows"`
Error error `json:"error"`
}

// MessagingClient represents the client used to receive messages from brokers.
type MessagingClient interface {
// Publishes a message to the broker.
Expand Down
36 changes: 28 additions & 8 deletions server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -505,29 +505,46 @@ func TestServer_WriteSeries(t *testing.T) {

// Write series with one point to the database.
tags := map[string]string{"host": "servera.influx.com", "region": "uswest"}
values := map[string]interface{}{"value": 23.2}
if err := s.WriteSeries("foo", "mypolicy", "cpu_load", tags, mustParseTime("2000-01-01T00:00:00Z"), values); err != nil {
index, err := s.WriteSeries("foo", "mypolicy", "cpu_load", tags, mustParseTime("2000-01-01T00:00:00Z"), map[string]interface{}{"value": 23.2})
if err != nil {
t.Fatal(err)
} else if err = s.Sync(index); err != nil {
t.Fatalf("sync error: %s", err)
}
time.Sleep(1 * time.Second) // TEMP

// Write another point 10 seconds later so it goes through "raw series".
if err := s.WriteSeries("foo", "mypolicy", "cpu_load", tags, mustParseTime("2000-01-01T00:00:10Z"), values); err != nil {
index, err = s.WriteSeries("foo", "mypolicy", "cpu_load", tags, mustParseTime("2000-01-01T00:00:10Z"), map[string]interface{}{"value": 100})
if err != nil {
t.Fatal(err)
} else if err = s.Sync(index); err != nil {
t.Fatalf("sync error: %s", err)
}
time.Sleep(1 * time.Second) // TEMP

// Verify a subscription was made.
if !subscribed {
t.Fatal("expected subscription")
}

// Retrieve series data point.
// Retrieve first series data point.
if v, err := s.ReadSeries("foo", "mypolicy", "cpu_load", tags, mustParseTime("2000-01-01T00:00:00Z")); err != nil {
t.Fatal(err)
} else if !reflect.DeepEqual(v, values) {
} else if !reflect.DeepEqual(v, map[string]interface{}{"value": 23.2}) {
t.Fatalf("values mismatch: %#v", v)
}

// Retrieve second series data point.
if v, err := s.ReadSeries("foo", "mypolicy", "cpu_load", tags, mustParseTime("2000-01-01T00:00:10Z")); err != nil {
t.Fatal(err)
} else if mustMarshalJSON(v) != mustMarshalJSON(map[string]interface{}{"value": 100}) {
t.Fatalf("values mismatch: %#v", mustMarshalJSON(v))
}

// Retrieve non-existent series data point.
if v, err := s.ReadSeries("foo", "mypolicy", "cpu_load", tags, mustParseTime("2000-01-01T00:01:00Z")); err != nil {
t.Fatal(err)
} else if v != nil {
t.Fatalf("expected nil values: %#v", v)
}
}

func TestServer_CreateShardGroupIfNotExist(t *testing.T) {
Expand Down Expand Up @@ -563,8 +580,11 @@ func TestServer_Measurements(t *testing.T) {
tags := map[string]string{"host": "servera.influx.com", "region": "uswest"}
values := map[string]interface{}{"value": 23.2}

if err := s.WriteSeries("foo", "mypolicy", "cpu_load", tags, timestamp, values); err != nil {
index, err := s.WriteSeries("foo", "mypolicy", "cpu_load", tags, timestamp, values)
if err != nil {
t.Fatal(err)
} else if err = s.Sync(index); err != nil {
t.Fatal("sync error: %s", err)
}

expectedMeasurementNames := []string{"cpu_load"}
Expand Down
Loading

0 comments on commit 47cd03f

Please sign in to comment.