diff --git a/httpd/handler.go b/httpd/handler.go index 63e97a4da82..d42b4748b8f 100644 --- a/httpd/handler.go +++ b/httpd/handler.go @@ -16,7 +16,6 @@ import ( "github.com/bmizerany/pat" "github.com/influxdb/influxdb" - "github.com/influxdb/influxdb/client" "github.com/influxdb/influxdb/influxql" ) @@ -141,73 +140,9 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user *influ httpResults(w, results, pretty) } -// BatchWrite is used to send batch write data to the http /write endpoint -type BatchWrite struct { - Points []client.Point `json:"points"` - Database string `json:"database"` - RetentionPolicy string `json:"retentionPolicy"` - Tags map[string]string `json:"tags"` - Timestamp time.Time `json:"timestamp"` - Precision string `json:"precision"` -} - -// UnmarshalJSON decodes the data into the batchWrite struct -func (br *BatchWrite) UnmarshalJSON(b []byte) error { - var normal struct { - Points []client.Point `json:"points"` - Database string `json:"database"` - RetentionPolicy string `json:"retentionPolicy"` - Tags map[string]string `json:"tags"` - Timestamp time.Time `json:"timestamp"` - Precision string `json:"precision"` - } - var epoch struct { - Points []client.Point `json:"points"` - Database string `json:"database"` - RetentionPolicy string `json:"retentionPolicy"` - Tags map[string]string `json:"tags"` - Timestamp int64 `json:"timestamp"` - Precision string `json:"precision"` - } - - if err := func() error { - var err error - if err = json.Unmarshal(b, &epoch); err != nil { - return err - } - // Convert from epoch to time.Time - ts, err := client.EpochToTime(epoch.Timestamp, epoch.Precision) - if err != nil { - return err - } - br.Points = epoch.Points - br.Database = epoch.Database - br.RetentionPolicy = epoch.RetentionPolicy - br.Tags = epoch.Tags - br.Timestamp = ts - br.Precision = epoch.Precision - return nil - }(); err == nil { - return nil - } - - if err := json.Unmarshal(b, &normal); err != nil { - return err - } - normal.Timestamp = client.SetPrecision(normal.Timestamp, normal.Precision) - br.Points = normal.Points - br.Database = normal.Database - br.RetentionPolicy = normal.RetentionPolicy - br.Tags = normal.Tags - br.Timestamp = normal.Timestamp - br.Precision = normal.Precision - - return nil -} - // serveWrite receives incoming series data and writes it to the database. func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request, user *influxdb.User) { - var br BatchWrite + var bp influxdb.BatchPoints dec := json.NewDecoder(r.Body) @@ -218,58 +153,39 @@ func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request, user *influ return } - for { - if err := dec.Decode(&br); err != nil { - if err.Error() == "EOF" { - w.WriteHeader(http.StatusOK) - return - } - writeError(influxdb.Result{Err: err}, http.StatusInternalServerError) + if err := dec.Decode(&bp); err != nil { + if err.Error() == "EOF" { + w.WriteHeader(http.StatusOK) return } + writeError(influxdb.Result{Err: err}, http.StatusInternalServerError) + return + } - if br.Database == "" { - writeError(influxdb.Result{Err: fmt.Errorf("database is required")}, http.StatusInternalServerError) - return - } + if bp.Database == "" { + writeError(influxdb.Result{Err: fmt.Errorf("database is required")}, http.StatusInternalServerError) + return + } - if !h.server.DatabaseExists(br.Database) { - writeError(influxdb.Result{Err: fmt.Errorf("database not found: %q", br.Database)}, http.StatusNotFound) - return - } + if !h.server.DatabaseExists(bp.Database) { + writeError(influxdb.Result{Err: fmt.Errorf("database not found: %q", bp.Database)}, http.StatusNotFound) + return + } - if h.requireAuthentication && !user.Authorize(influxql.WritePrivilege, br.Database) { - writeError(influxdb.Result{Err: fmt.Errorf("%q user is not authorized to write to database %q", user.Name, br.Database)}, http.StatusUnauthorized) - return - } + if h.requireAuthentication && !user.Authorize(influxql.WritePrivilege, bp.Database) { + writeError(influxdb.Result{Err: fmt.Errorf("%q user is not authorized to write to database %q", user.Name, bp.Database)}, http.StatusUnauthorized) + return + } - for _, p := range br.Points { - if p.Timestamp.Time().IsZero() { - p.Timestamp = client.Timestamp(br.Timestamp) - } - if p.Precision == "" && br.Precision != "" { - p.Precision = br.Precision - } - p.Timestamp = client.Timestamp(client.SetPrecision(p.Timestamp.Time(), p.Precision)) - if len(br.Tags) > 0 { - for k := range br.Tags { - if p.Tags[k] == "" { - p.Tags[k] = br.Tags[k] - } - } - } - // Need to convert from a client.Point to a influxdb.Point - iPoint := influxdb.Point{ - Name: p.Name, - Tags: p.Tags, - Timestamp: p.Timestamp.Time(), - Values: p.Values, - } - if _, err := h.server.WriteSeries(br.Database, br.RetentionPolicy, []influxdb.Point{iPoint}); err != nil { - writeError(influxdb.Result{Err: err}, http.StatusInternalServerError) - return - } - } + points, err := influxdb.NormalizeBatchPoints(bp) + if err != nil { + writeError(influxdb.Result{Err: err}, http.StatusInternalServerError) + return + } + + if _, err := h.server.WriteSeries(bp.Database, bp.RetentionPolicy, points); err != nil { + writeError(influxdb.Result{Err: err}, http.StatusInternalServerError) + return } } diff --git a/httpd/handler_test.go b/httpd/handler_test.go index edc0be47723..59e95239fb5 100644 --- a/httpd/handler_test.go +++ b/httpd/handler_test.go @@ -87,13 +87,13 @@ func TestBatchWrite_UnmarshalEpoch(t *testing.T) { t.Logf("testing %q\n", test.name) data := []byte(fmt.Sprintf(`{"timestamp": %d, "precision":"%s"}`, test.epoch, test.precision)) t.Logf("json: %s", string(data)) - var br httpd.BatchWrite - err := json.Unmarshal(data, &br) + var bp influxdb.BatchPoints + err := json.Unmarshal(data, &bp) if err != nil { t.Fatalf("unexpected error. exptected: %v, actual: %v", nil, err) } - if !br.Timestamp.Equal(test.expected) { - t.Fatalf("Unexpected time. expected: %v, actual: %v", test.expected, br.Timestamp) + if !bp.Timestamp.Equal(test.expected) { + t.Fatalf("Unexpected time. expected: %v, actual: %v", test.expected, bp.Timestamp) } } } @@ -125,13 +125,13 @@ func TestBatchWrite_UnmarshalRFC(t *testing.T) { ts := test.now.Format(test.rfc) data := []byte(fmt.Sprintf(`{"timestamp": %q}`, ts)) t.Logf("json: %s", string(data)) - var br httpd.BatchWrite - err := json.Unmarshal(data, &br) + var bp influxdb.BatchPoints + err := json.Unmarshal(data, &bp) if err != nil { t.Fatalf("unexpected error. exptected: %v, actual: %v", nil, err) } - if !br.Timestamp.Equal(test.expected) { - t.Fatalf("Unexpected time. expected: %v, actual: %v", test.expected, br.Timestamp) + if !bp.Timestamp.Equal(test.expected) { + t.Fatalf("Unexpected time. expected: %v, actual: %v", test.expected, bp.Timestamp) } } } diff --git a/influxdb.go b/influxdb.go index 046a6650782..7c205c07aba 100644 --- a/influxdb.go +++ b/influxdb.go @@ -5,6 +5,9 @@ import ( "errors" "fmt" "os" + + "github.com/influxdb/influxdb/client" + "time" ) var ( @@ -109,6 +112,102 @@ var ( ErrInvalidGrantRevoke = errors.New("invalid privilege requested") ) +// BatchPoints is used to send batched data in a single write. +type BatchPoints struct { + Points []client.Point `json:"points"` + Database string `json:"database"` + RetentionPolicy string `json:"retentionPolicy"` + Tags map[string]string `json:"tags"` + Timestamp time.Time `json:"timestamp"` + Precision string `json:"precision"` +} + +// UnmarshalJSON decodes the data into the BatchPoints struct +func (bp *BatchPoints) UnmarshalJSON(b []byte) error { + var normal struct { + Points []client.Point `json:"points"` + Database string `json:"database"` + RetentionPolicy string `json:"retentionPolicy"` + Tags map[string]string `json:"tags"` + Timestamp time.Time `json:"timestamp"` + Precision string `json:"precision"` + } + var epoch struct { + Points []client.Point `json:"points"` + Database string `json:"database"` + RetentionPolicy string `json:"retentionPolicy"` + Tags map[string]string `json:"tags"` + Timestamp int64 `json:"timestamp"` + Precision string `json:"precision"` + } + + if err := func() error { + var err error + if err = json.Unmarshal(b, &epoch); err != nil { + return err + } + // Convert from epoch to time.Time + ts, err := client.EpochToTime(epoch.Timestamp, epoch.Precision) + if err != nil { + return err + } + bp.Points = epoch.Points + bp.Database = epoch.Database + bp.RetentionPolicy = epoch.RetentionPolicy + bp.Tags = epoch.Tags + bp.Timestamp = ts + bp.Precision = epoch.Precision + return nil + }(); err == nil { + return nil + } + + if err := json.Unmarshal(b, &normal); err != nil { + return err + } + normal.Timestamp = client.SetPrecision(normal.Timestamp, normal.Precision) + bp.Points = normal.Points + bp.Database = normal.Database + bp.RetentionPolicy = normal.RetentionPolicy + bp.Tags = normal.Tags + bp.Timestamp = normal.Timestamp + bp.Precision = normal.Precision + + return nil +} + +// NormalizeBatchPoints returns a slice of Points, created by populating individual +// points within the batch, which do not have timestamps or tags, with the top-level +// values. +func NormalizeBatchPoints(bp BatchPoints) ([]Point, error) { + points := []Point{} + for _, p := range bp.Points { + if p.Timestamp.Time().IsZero() { + p.Timestamp = client.Timestamp(bp.Timestamp) + } + if p.Precision == "" && bp.Precision != "" { + p.Precision = bp.Precision + } + p.Timestamp = client.Timestamp(client.SetPrecision(p.Timestamp.Time(), p.Precision)) + if len(bp.Tags) > 0 { + for k := range bp.Tags { + if p.Tags[k] == "" { + p.Tags[k] = bp.Tags[k] + } + } + } + // Need to convert from a client.Point to a influxdb.Point + points = append(points, Point{ + Name: p.Name, + Tags: p.Tags, + Timestamp: p.Timestamp.Time(), + Values: p.Values, + }) + } + + return points, nil +} + // ErrAuthorize represents an authorization error. type ErrAuthorize struct { text string