Skip to content

Commit

Permalink
Encode real batches for shards
Browse files Browse the repository at this point in the history
  • Loading branch information
otoolep committed Feb 20, 2015
1 parent 3cb9398 commit 2585a9e
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 20 deletions.
3 changes: 3 additions & 0 deletions influxdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ var (
// ErrShardNotFound is returned writing to a non-existent shard.
ErrShardNotFound = errors.New("shard not found")

// ErrInvalidPointBuffer is returned when a buffer containing data for writing is invalid
ErrInvalidPointBuffer = errors.New("invalid point buffer")

// ErrReadAccessDenied is returned when a user attempts to read
// data that he or she does not have permission to read.
ErrReadAccessDenied = errors.New("read access denied")
Expand Down
47 changes: 34 additions & 13 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1577,7 +1577,7 @@ func (s *Server) WriteSeries(database, retentionPolicy string, points []Point) (
}

// Build writeRawSeriesMessageType publish commands.
shardData := make(map[uint64][]byte)
shardData := make(map[uint64][]byte, 0)
for _, p := range points {
// Local function makes lock management foolproof.
measurement, series, err := func() (*Measurement, *Series, error) {
Expand Down Expand Up @@ -1619,9 +1619,12 @@ func (s *Server) WriteSeries(database, retentionPolicy string, points []Point) (
return 0, err
}

// Encode point header, followed by point data, and assign to shard.
data := marshalPointHeader(series.ID, p.Timestamp.UnixNano())
// Encode point header, followed by point data, and add to shard's batch.
data := marshalPointHeader(series.ID, uint32(len(encodedFields)), p.Timestamp.UnixNano())
data = append(data, encodedFields...)
if shardData[sh.ID] == nil {
shardData[sh.ID] = make([]byte, 0)
}
shardData[sh.ID] = append(shardData[sh.ID], data...)
}

Expand Down Expand Up @@ -1655,19 +1658,37 @@ func (s *Server) applyWriteRawSeries(m *messaging.Message) error {
return ErrShardNotFound
}

// Extract the series id and timestamp from the header.
// Everything after the header is the marshalled value.
seriesID, timestamp := unmarshalPointHeader(m.Data[:pointHeaderSize])
data := m.Data[pointHeaderSize:]

// Add to lookup.
s.addShardBySeriesID(sh, seriesID)

// TODO: Enable some way to specify if the data should be overwritten
overwrite := true

// Write to shard.
return sh.writeSeries(seriesID, timestamp, data, overwrite)
for {
if pointHeaderSize > len(m.Data) {
return ErrInvalidPointBuffer
}
seriesID, payloadLength, timestamp := unmarshalPointHeader(m.Data[:pointHeaderSize])
m.Data = m.Data[pointHeaderSize:]

if payloadLength > uint32(len(m.Data)) {
return ErrInvalidPointBuffer
}
data := m.Data[:payloadLength]

// Add to lookup.
s.addShardBySeriesID(sh, seriesID)

// Write to shard.
if err := sh.writeSeries(seriesID, timestamp, data, overwrite); err != nil {
return err
}

// Push the buffer forward and check if we're done.
m.Data = m.Data[payloadLength:]
if len(m.Data) == 0 {
break
}
}

return nil
}

func (s *Server) addShardBySeriesID(sh *Shard, seriesID uint32) {
Expand Down
16 changes: 9 additions & 7 deletions shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,20 +134,22 @@ func (s *Shard) deleteSeries(name string) error {
type Shards []*Shard

// pointHeaderSize represents the size of a point header, in bytes.
const pointHeaderSize = 4 + 8 // seriesID + timestamp
const pointHeaderSize = 4 + 4 + 8 // seriesID + payload length + timestamp

// marshalPointHeader encodes a series id, timestamp, & flagset into a byte slice.
func marshalPointHeader(seriesID uint32, timestamp int64) []byte {
b := make([]byte, 12)
// marshalPointHeader encodes a series id, payload length, timestamp, & flagset into a byte slice.
func marshalPointHeader(seriesID uint32, payloadLength uint32, timestamp int64) []byte {
b := make([]byte, pointHeaderSize)
binary.BigEndian.PutUint32(b[0:4], seriesID)
binary.BigEndian.PutUint64(b[4:12], uint64(timestamp))
binary.BigEndian.PutUint32(b[4:8], payloadLength)
binary.BigEndian.PutUint64(b[8:16], uint64(timestamp))
return b
}

// unmarshalPointHeader decodes a byte slice into a series id, timestamp & flagset.
func unmarshalPointHeader(b []byte) (seriesID uint32, timestamp int64) {
func unmarshalPointHeader(b []byte) (seriesID uint32, payloadLength uint32, timestamp int64) {
seriesID = binary.BigEndian.Uint32(b[0:4])
timestamp = int64(binary.BigEndian.Uint64(b[4:12]))
payloadLength = binary.BigEndian.Uint32(b[4:8])
timestamp = int64(binary.BigEndian.Uint64(b[8:16]))
return
}

Expand Down

0 comments on commit 2585a9e

Please sign in to comment.