Skip to content

Commit

Permalink
WIP: Implement writes series data on database
Browse files Browse the repository at this point in the history
* Add setSeriesId to raft, metastore
* Add methods to get seriesIds

[ci skip]
  • Loading branch information
pauldix committed Nov 26, 2014
1 parent 8049c34 commit d08bc02
Show file tree
Hide file tree
Showing 5 changed files with 194 additions and 97 deletions.
167 changes: 97 additions & 70 deletions database.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,73 +326,112 @@ func (db *Database) applyCreateShardIfNotExists(id uint64, policy string, timest
}

// WriteSeries writes series data to the database.
func (db *Database) WriteSeries(name string, tags map[string]string, timestamp time.Time, values map[string]interface{}) error {
panic("not yet implemented: Database.WriteSeries()")

/* TEMPORARILY REMOVED FOR PROTOBUFS.
func (db *Database) WriteSeries(retentionPolicy, name string, tags map[string]string, timestamp time.Time, values map[string]interface{}) error {
// Find retention policy matching the series and split points by shard.
db.mu.Lock()
name := db.name
space := db.retentionPolicyBySeries(series.GetName())
_, ok := db.policies[retentionPolicy]
db.mu.Unlock()

// Ensure there is a space available.
if space == nil {
// Ensure the policy was found
if !ok {
return ErrRetentionPolicyNotFound
}

// Group points by shard.
pointsByShard, unassigned := space.Split(series.Points)
// Request shard creation for timestamps for missing shards.
for _, p := range unassigned {
timestamp := time.Unix(0, p.GetTimestamp())
if err := db.CreateShardIfNotExists(space.Name, timestamp); err != nil {
return fmt.Errorf("create shard(%s/%d): %s", space.Name, timestamp.Format(time.RFC3339Nano), err)
// get the id for the series and tagset
id, ok := db.getSeriesId(name, tags)
if !ok {
var err error
if id, err = db.setSeriesId(name, tags); err != nil {
return err
}
}

// Try to split the points again. Fail if it doesn't work this time.
pointsByShard, unassigned = space.Split(series.Points)
if len(unassigned) > 0 {
return fmt.Errorf("unmatched points in space(%s): %#v", unassigned)
}
// Publish each group of points.
for shardID, points := range pointsByShard {
// Marshal series into protobuf format.
req := &protocol.WriteSeriesRequest{
Database: proto.String(name),
Series: &protocol.Series{
Name: series.Name,
Fields: series.Fields,
FieldIds: series.FieldIds,
ShardId: proto.Uint64(shardID),
Points: points,
},
}
data, err := proto.Marshal(req)
if err != nil {
return err
}
// TODO: now write it in
fmt.Println(id)

/*
// Group points by shard.
pointsByShard, unassigned := space.Split(series.Points)
// Publish "write series" message on shard's topic to broker.
m := &messaging.Message{
Type: writeSeriesMessageType,
TopicID: shardID,
Data: data,
// Request shard creation for timestamps for missing shards.
for _, p := range unassigned {
timestamp := time.Unix(0, p.GetTimestamp())
if err := db.CreateShardIfNotExists(space.Name, timestamp); err != nil {
return fmt.Errorf("create shard(%s/%d): %s", space.Name, timestamp.Format(time.RFC3339Nano), err)
}
}
index, err := db.server.client.Publish(m)
if err != nil {
return err
// Try to split the points again. Fail if it doesn't work this time.
pointsByShard, unassigned = space.Split(series.Points)
if len(unassigned) > 0 {
return fmt.Errorf("unmatched points in space(%s): %#v", unassigned)
}
if err := db.server.sync(index); err != nil {
return err
// Publish each group of points.
for shardID, points := range pointsByShard {
// Marshal series into protobuf format.
req := &protocol.WriteSeriesRequest{
Database: proto.String(name),
Series: &protocol.Series{
Name: series.Name,
Fields: series.Fields,
FieldIds: series.FieldIds,
ShardId: proto.Uint64(shardID),
Points: points,
},
}
data, err := proto.Marshal(req)
if err != nil {
return err
}
// Publish "write series" message on shard's topic to broker.
m := &messaging.Message{
Type: writeSeriesMessageType,
TopicID: shardID,
Data: data,
}
index, err := db.server.client.Publish(m)
if err != nil {
return err
}
if err := db.server.sync(index); err != nil {
return err
}
}
*/
return nil
}

// getSeriesId returns the unique id of a series and tagset and a bool indicating if it was found
func (db *Database) getSeriesId(name string, tags map[string]string) (uint32, bool) {
var id uint32
var err error
db.server.meta.view(func(tx *metatx) error {
id, err = tx.getSeriesId(db.name, name, tags)
return nil
})
if err != nil {
return uint32(0), false
}
return id, true
}

return nil
*/
func (db *Database) setSeriesId(name string, tags map[string]string) (uint32, error) {
c := &setSeriesIdCommand{
Database: db.Name(),
Name: name,
Tags: tags,
}
_, err := db.server.broadcast(setSeriesIdMessageType, c)
if err != nil {
return uint32(0), err
}
id, ok := db.getSeriesId(name, tags)
if !ok {
return uint32(0), ErrSeriesNotFound
}
return id, nil
}

/* TEMPORARILY REMOVED FOR PROTOBUFS.
Expand Down Expand Up @@ -564,30 +603,18 @@ func NewRetentionPolicy() *RetentionPolicy {
}
}

/*
// SplitPoints groups a set of points by shard id.
// Also returns a list of timestamps that did not match an existing shard.
func (ss *RetentionPolicy) Split(a []*protocol.Point) (points map[uint64][]*protocol.Point, unassigned []*protocol.Point) {
points = make(map[uint64][]*protocol.Point)
for _, p := range a {
if s := ss.ShardByTimestamp(time.Unix(0, p.GetTimestamp())); s != nil {
points[s.ID] = append(points[s.ID], p)
} else {
unassigned = append(unassigned, p)
}
}
return
}
*/

// ShardByTimestamp returns the shard in the space that owns a given timestamp.
// ShardBySeriesTimestamp returns the shard in the space that owns a given timestamp for a given series id.
// Returns nil if the shard does not exist.
func (ss *RetentionPolicy) ShardByTimestamp(timestamp time.Time) *Shard {
func (ss *RetentionPolicy) ShardBySeriesTimestamp(id uint32, timestamp time.Time) *Shard {
shards := make([]*Shard, 0, ss.SplitN)
for _, s := range ss.Shards {
if timeBetween(timestamp, s.StartTime, s.EndTime) {
return s
shards = append(shards, s)
}
}
if len(shards) > 0 {
return shards[int(id)%len(shards)]
}
return nil
}

Expand Down
33 changes: 12 additions & 21 deletions database_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,38 +394,29 @@ func TestDatabase_SetDefaultRetentionPolicy_ErrRetentionPolicyNotFound(t *testin

// Ensure the database can write data to the database.
func TestDatabase_WriteSeries(t *testing.T) {
t.Skip("pending")

/* TEMPORARILY REMOVED FOR PROTOBUFS.
s := OpenServer(NewMessagingClient())
defer s.Close()
s.CreateDatabase("foo")
db := s.Database("foo")
db.CreateRetentionPolicys(&influxdb.RetentionPolicy{Name: "myspace", Duration: 1 * time.Hour})
db.CreateRetentionPolicy(&influxdb.RetentionPolicy{Name: "myspace", Duration: 1 * time.Hour})
db.CreateUser("susy", "pass", nil)

// Write series with one point to the database.
timestamp := mustParseMicroTime("2000-01-01T00:00:00Z")
series := &protocol.Series{
Name: proto.String("cpu_load"),
Fields: []string{"myval"},
Points: []*protocol.Point{
{
Values: []*protocol.FieldValue{{Int64Value: proto.Int64(100)}},
Timestamp: proto.Int64(timestamp),
},
},
}
if err := db.WriteSeries(series); err != nil {
timestamp := mustParseTime("2000-01-01T00:00:00Z")

name := "cpu_load"
tags := map[string]string{"host": "servera.influx.com", "region": "uswest"}
values := map[string]interface{}{"value": 23.2}

if err := db.WriteSeries("myspace", name, tags, timestamp, values); err != nil {
t.Fatal(err)
}

// Execute a query and record all series found.
q := mustParseQuery(`select myval from cpu_load`)
if err := db.ExecuteQuery(q); err != nil {
t.Fatal(err)
}
*/
q := mustParseQuery(`select myval from cpu_load`)
if err := db.ExecuteQuery(q); err != nil {
t.Fatal(err)
}
}

// mustParseQuery parses a query string into a query object. Panic on error.
Expand Down
6 changes: 6 additions & 0 deletions influxdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,10 @@ var (

// ErrInvalidQuery is returned when executing an unknown query type.
ErrInvalidQuery = errors.New("invalid query")

// ErrSeriesNotFound is returned when looking up a non-existent series by database, name and tags
ErrSeriesNotFound = errors.New("Series not found")

// ErrSeriesExists is returned when attempting to set the id of a series by database, name and tags that already exists
ErrSeriesExists = errors.New("Series already exists")
)
79 changes: 79 additions & 0 deletions server.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package influxdb

import (
"bytes"
"encoding/json"
"fmt"
"os"
Expand All @@ -9,6 +10,7 @@ import (
"strconv"
"sync"
"time"
"unsafe"

"github.com/boltdb/bolt"
"github.com/influxdb/influxdb/messaging"
Expand Down Expand Up @@ -49,6 +51,7 @@ const (
dbUserSetPasswordMessageType = messaging.MessageType(0x09)
createShardIfNotExistsMessageType = messaging.MessageType(0x0a)
setDefaultRetentionPolicyMessageType = messaging.MessageType(0x0b)
setSeriesIdMessageType = messaging.MessageType(0x0c)

// per-topic messages
writeSeriesMessageType = messaging.MessageType(0x80)
Expand Down Expand Up @@ -636,6 +639,29 @@ type setDefaultRetentionPolicyCommand struct {
Name string `json:"name"`
}

func (s *Server) applySetSeriesId(m *messaging.Message) error {
var c setSeriesIdCommand
mustUnmarshalJSON(m.Data, &c)

s.mu.Lock()
db := s.databases[c.Database]
s.mu.Unlock()

if db == nil {
return ErrDatabaseNotFound
}

// TODO: finish this up

return nil
}

type setSeriesIdCommand struct {
Database string `json:"database"`
Name string `json:"name"`
Tags map[string]string `json:"tags"`
}

/* TEMPORARILY REMOVED FOR PROTOBUFS.
func (s *Server) applyWriteSeries(m *messaging.Message) error {
req := &protocol.WriteSeriesRequest{}
Expand Down Expand Up @@ -697,6 +723,8 @@ func (s *Server) processor(done chan struct{}) {
err = s.applyCreateShardIfNotExists(m)
case setDefaultRetentionPolicyMessageType:
err = s.applySetDefaultRetentionPolicy(m)
case setSeriesIdMessageType:
err = s.applySetSeriesId(m)
case writeSeriesMessageType:
/* TEMPORARILY REMOVED FOR PROTOBUFS.
err = s.applyWriteSeries(m)
Expand Down Expand Up @@ -814,6 +842,57 @@ func (tx *metatx) deleteDatabase(name string) error {
return tx.Bucket([]byte("Databases")).Delete([]byte(name))
}

// returns a unique series id by database, name and tags. Returns ErrSeriesNotFound
func (tx *metatx) getSeriesId(database, name string, tags map[string]string) (uint32, error) {
// get the bucket that holds series data for the database
b := tx.Bucket([]byte("Series")).Bucket([]byte(database))
if b == nil {
return uint32(0), ErrSeriesNotFound
}

// get the bucket that holds tag data for the series name
b = b.Bucket([]byte(name))
if b == nil {
return uint32(0), ErrSeriesNotFound
}

// look up the id of the tagset
tagBytes, err := tagsToBytes(tags)
if err != nil {
return uint32(0), err
}
v := b.Get(tagBytes)
if v == nil {
return uint32(0), ErrSeriesNotFound
}

// the value is the bytes for a uint32, return it
x := (*uint32)(unsafe.Pointer(&v[0]))
return *x, nil
}

// used to convert the tag set to bytes for use as a key in bolt
func tagsToBytes(tags map[string]string) ([]byte, error) {
// don't want to resize the byte buffer, so assume that tag key/values are less than 500 bytes
defaultTagLength := 500
b := make([]byte, 0, len(tags)*defaultTagLength)

buf := bytes.NewBuffer(b)

for k, v := range tags {
_, err := buf.Write([]byte(k))
if err != nil {
return nil, err
}
_, err = buf.Write([]byte(v))
if err != nil {
return nil, err
}
}

return buf.Bytes(), nil
}

// series returns a series by database and name.
func (tx *metatx) series(database, name string) (s *Series) {
b := tx.Bucket([]byte("Series")).Bucket([]byte(database))
Expand Down
6 changes: 0 additions & 6 deletions server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,12 +290,6 @@ func mustParseTime(s string) time.Time {
return t
}

// mustParseMicroTime parses an IS0-8601 string into microseconds since epoch.
// Panic on error.
func mustParseMicroTime(s string) int64 {
return mustParseTime(s).UnixNano() / int64(time.Microsecond)
}

// errstr is an ease-of-use function to convert an error to a string.
func errstr(err error) string {
if err != nil {
Expand Down

0 comments on commit d08bc02

Please sign in to comment.