Skip to content

Commit

Permalink
update for new influxdb code
Browse files Browse the repository at this point in the history
  • Loading branch information
nathanielc committed Mar 14, 2016
1 parent 0053af0 commit dbffd45
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 37 deletions.
28 changes: 7 additions & 21 deletions services/httpd/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"time"

"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/cluster"
"github.com/influxdata/influxdb/influxql"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/services/meta"
Expand Down Expand Up @@ -52,7 +51,7 @@ type Handler struct {
}

PointsWriter interface {
WritePoints(p *cluster.WritePointsRequest) error
WritePoints(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error
}

Logger *log.Logger
Expand Down Expand Up @@ -349,26 +348,13 @@ func (h *Handler) serveWriteLine(w http.ResponseWriter, r *http.Request, body []
return
}

// Determine required consistency level.
consistency := cluster.ConsistencyLevelOne
switch r.Form.Get("consistency") {
case "all":
consistency = cluster.ConsistencyLevelAll
case "any":
consistency = cluster.ConsistencyLevelAny
case "one":
consistency = cluster.ConsistencyLevelOne
case "quorum":
consistency = cluster.ConsistencyLevelQuorum
}

// Write points.
if err := h.PointsWriter.WritePoints(&cluster.WritePointsRequest{
Database: database,
RetentionPolicy: r.FormValue("rp"),
ConsistencyLevel: consistency,
Points: points,
}); influxdb.IsClientError(err) {
if err := h.PointsWriter.WritePoints(
database,
r.FormValue("rp"),
models.ConsistencyLevelAll,
points,
); influxdb.IsClientError(err) {
h.statMap.Add(statPointsWrittenFail, int64(len(points)))
h.writeError(w, influxql.Result{Err: err}, http.StatusBadRequest)
return
Expand Down
6 changes: 3 additions & 3 deletions services/influxdb/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (

"github.com/cenkalti/backoff"
client "github.com/influxdata/influxdb/client/v2"
"github.com/influxdata/influxdb/cluster"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/kapacitor"
"github.com/influxdata/kapacitor/services/udp"
)
Expand All @@ -32,7 +32,7 @@ type Service struct {
clusters map[string]*influxdb

PointsWriter interface {
WritePoints(p *cluster.WritePointsRequest) error
WritePoints(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error
}
LogService interface {
NewLogger(string, int) *log.Logger
Expand Down Expand Up @@ -153,7 +153,7 @@ type influxdb struct {
subName string

PointsWriter interface {
WritePoints(p *cluster.WritePointsRequest) error
WritePoints(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error
}
LogService interface {
NewLogger(string, int) *log.Logger
Expand Down
15 changes: 7 additions & 8 deletions services/udp/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"strings"
"sync"

"github.com/influxdata/influxdb/cluster"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/kapacitor"
"github.com/influxdata/kapacitor/expvar"
Expand Down Expand Up @@ -42,7 +41,7 @@ type Service struct {
config Config

PointsWriter interface {
WritePoints(p *cluster.WritePointsRequest) error
WritePoints(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error
}

Logger *log.Logger
Expand Down Expand Up @@ -149,12 +148,12 @@ func (s *Service) processPackets() {
continue
}

if err := s.PointsWriter.WritePoints(&cluster.WritePointsRequest{
Database: s.config.Database,
RetentionPolicy: s.config.RetentionPolicy,
ConsistencyLevel: cluster.ConsistencyLevelOne,
Points: points,
}); err == nil {
if err := s.PointsWriter.WritePoints(
s.config.Database,
s.config.RetentionPolicy,
models.ConsistencyLevelAll,
points,
); err == nil {
s.statMap.Add(statPointsTransmitted, int64(len(points)))
} else {
s.Logger.Printf("E! failed to write points to database %q: %s", s.config.Database, err)
Expand Down
10 changes: 5 additions & 5 deletions task_master.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"time"

client "github.com/influxdata/influxdb/client/v2"
"github.com/influxdata/influxdb/cluster"
imodels "github.com/influxdata/influxdb/models"
"github.com/influxdata/kapacitor/models"
"github.com/influxdata/kapacitor/pipeline"
"github.com/influxdata/kapacitor/services/httpd"
Expand Down Expand Up @@ -414,14 +414,14 @@ func (tm *TaskMaster) forkPoint(p models.Point) {
}
}

func (tm *TaskMaster) WritePoints(pts *cluster.WritePointsRequest) error {
func (tm *TaskMaster) WritePoints(database, retentionPolicy string, consistencyLevel imodels.ConsistencyLevel, points []imodels.Point) error {
if tm.closed {
return ErrTaskMasterClosed
}
for _, mp := range pts.Points {
for _, mp := range points {
p := models.Point{
Database: pts.Database,
RetentionPolicy: pts.RetentionPolicy,
Database: database,
RetentionPolicy: retentionPolicy,
Name: mp.Name(),
Group: models.NilGroup,
Tags: models.Tags(mp.Tags()),
Expand Down

0 comments on commit dbffd45

Please sign in to comment.