Skip to content

Commit

Permalink
feat(influxql): Initial implementation of InfluxQL query engine
Browse files Browse the repository at this point in the history
* Some tests are broken or do not pass; follow up PRs will resolve that
  • Loading branch information
stuartcarnie committed Aug 14, 2020
1 parent b73340a commit 8a7dcc2
Show file tree
Hide file tree
Showing 49 changed files with 3,403 additions and 2,205 deletions.
23 changes: 23 additions & 0 deletions cmd/influxd/launcher/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import (
"github.com/influxdata/influxdb/v2/endpoints"
"github.com/influxdata/influxdb/v2/gather"
"github.com/influxdata/influxdb/v2/http"
iqlcontrol "github.com/influxdata/influxdb/v2/influxql/control"
iqlquery "github.com/influxdata/influxdb/v2/influxql/query"
"github.com/influxdata/influxdb/v2/inmem"
"github.com/influxdata/influxdb/v2/internal/fs"
"github.com/influxdata/influxdb/v2/kit/cli"
Expand Down Expand Up @@ -64,6 +66,7 @@ import (
"github.com/influxdata/influxdb/v2/task/backend/scheduler"
"github.com/influxdata/influxdb/v2/telemetry"
"github.com/influxdata/influxdb/v2/tenant"
iqlcoordinator "github.com/influxdata/influxdb/v2/v1/coordinator"
"github.com/influxdata/influxdb/v2/v1/services/meta"
storage2 "github.com/influxdata/influxdb/v2/v1/services/storage"
_ "github.com/influxdata/influxdb/v2/v1/tsdb/engine/tsm1" // needed for tsm1
Expand Down Expand Up @@ -845,6 +848,25 @@ func (m *Launcher) run(ctx context.Context) (err error) {
dbrpSvc := dbrp.NewService(ctx, authorizer.NewBucketService(ts.BucketService), m.kvStore)
dbrpSvc = dbrp.NewAuthorizedService(dbrpSvc)

cm := iqlcontrol.NewControllerMetrics([]string{})
m.reg.MustRegister(cm.PrometheusCollectors()...)

mapper := &iqlcoordinator.LocalShardMapper{
MetaClient: metaClient,
TSDBStore: m.engine.TSDBStore,
DBRP: dbrpSvc,
}

qe := iqlquery.NewExecutor(m.log, cm)
se := &iqlcoordinator.StatementExecutor{
MetaClient: metaClient,
TSDBStore: m.engine.TSDBStore,
ShardMapper: mapper,
DBRP: dbrpSvc,
}
qe.StatementExecutor = se
qe.StatementNormalizer = se

var checkSvc platform.CheckService
{
coordinator := coordinator.NewCoordinator(m.log, m.scheduler, m.executor)
Expand Down Expand Up @@ -1022,6 +1044,7 @@ func (m *Launcher) run(ctx context.Context) (err error) {
VariableService: variableSvc,
PasswordsService: ts.PasswordsService,
InfluxQLService: storageQueryService,
InfluxqldService: iqlquery.NewProxyExecutor(m.log, qe),
FluxService: storageQueryService,
FluxLanguageService: fluxlang.DefaultService,
TaskService: taskSvc,
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ require (
github.com/stretchr/testify v1.5.1
github.com/tcnksm/go-input v0.0.0-20180404061846-548a7d7a8ee8
github.com/testcontainers/testcontainers-go v0.0.0-20190108154635-47c0da630f72
github.com/tinylib/msgp v1.1.0 // indirect
github.com/tinylib/msgp v1.1.0
github.com/tylerb/graceful v1.2.15
github.com/uber-go/atomic v1.3.2 // indirect
github.com/uber/jaeger-client-go v2.16.0+incompatible
Expand Down
2 changes: 2 additions & 0 deletions http/api_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/influxdata/influxdb/v2/chronograf/server"
"github.com/influxdata/influxdb/v2/dbrp"
"github.com/influxdata/influxdb/v2/http/metric"
"github.com/influxdata/influxdb/v2/influxql"
"github.com/influxdata/influxdb/v2/kit/feature"
"github.com/influxdata/influxdb/v2/kit/prom"
kithttp "github.com/influxdata/influxdb/v2/kit/transport/http"
Expand Down Expand Up @@ -76,6 +77,7 @@ type APIBackend struct {
VariableService influxdb.VariableService
PasswordsService influxdb.PasswordsService
InfluxQLService query.ProxyQueryService
InfluxqldService influxql.ProxyQueryService
FluxService query.ProxyQueryService
FluxLanguageService influxdb.FluxLanguageService
TaskService influxdb.TaskService
Expand Down
158 changes: 158 additions & 0 deletions http/influx1x_authentication_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
package http

import (
"fmt"
"net/http"
"strings"

"github.com/influxdata/influxdb/v2"
platcontext "github.com/influxdata/influxdb/v2/context"
"github.com/opentracing/opentracing-go"
)

type Influx1xAuthenticationHandler struct {
influxdb.HTTPErrorHandler
next http.Handler
auth influxdb.AuthorizationService
user influxdb.UserService
}

// NewInflux1xAuthenticationHandler creates an authentication handler to process
// InfluxDB 1.x authentication requests.
func NewInflux1xAuthenticationHandler(next http.Handler, auth influxdb.AuthorizationService, user influxdb.UserService, h influxdb.HTTPErrorHandler) *Influx1xAuthenticationHandler {
return &Influx1xAuthenticationHandler{
HTTPErrorHandler: h,
next: next,
auth: auth,
user: user,
}
}

// ServeHTTP extracts the session or token from the http request and places the resulting authorizer on the request context.
func (h *Influx1xAuthenticationHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// The ping endpoint does not need authorization
if r.URL.Path == "/ping" {
h.next.ServeHTTP(w, r)
return
}
ctx := r.Context()

creds, err := h.parseCredentials(r)
if err != nil {
UnauthorizedError(ctx, h, w)
return
}

auth, err := h.auth.FindAuthorizationByToken(ctx, creds.Token)
if err != nil {
UnauthorizedError(ctx, h, w)
return
}

var user *influxdb.User
if creds.Username != "" {
user, err = h.user.FindUser(ctx, influxdb.UserFilter{Name: &creds.Username})
if err != nil {
UnauthorizedError(ctx, h, w)
return
}

if user.ID != auth.UserID {
h.HandleHTTPError(ctx, &influxdb.Error{
Code: influxdb.EForbidden,
Msg: "Username and Token do not match",
}, w)
return
}
} else {
user, err = h.user.FindUserByID(ctx, auth.UserID)
if err != nil {
UnauthorizedError(ctx, h, w)
return
}
}

if err = h.isUserActive(user); err != nil {
InactiveUserError(ctx, h, w)
return
}

ctx = platcontext.SetAuthorizer(ctx, auth)

if span := opentracing.SpanFromContext(ctx); span != nil {
span.SetTag("user_id", auth.GetUserID().String())
}

h.next.ServeHTTP(w, r.WithContext(ctx))
}

func (h *Influx1xAuthenticationHandler) isUserActive(u *influxdb.User) error {
if u.Status != "inactive" {
return nil
}

return &influxdb.Error{Code: influxdb.EForbidden, Msg: "User is inactive"}
}

type credentials struct {
Username string
Token string
}

func parseToken(token string) (user, pass string, ok bool) {
s := strings.IndexByte(token, ':')
if s < 0 {
// Token <token>
return "", token, true
}

// Token <username>:<token>
return token[:s], token[s+1:], true
}

// parseCredentials parses a request and returns the authentication credentials.
// The credentials may be present as URL query params, or as a Basic
// Authentication header.
// As params: http://127.0.0.1/query?u=username&p=token
// As basic auth: http://username:[email protected]
// As Token in Authorization header: Token <username:token>
func (h *Influx1xAuthenticationHandler) parseCredentials(r *http.Request) (*credentials, error) {
q := r.URL.Query()

// Check for username and password in URL params.
if u, p := q.Get("u"), q.Get("p"); u != "" && p != "" {
return &credentials{
Username: u,
Token: p,
}, nil
}

// Check for the HTTP Authorization header.
if s := r.Header.Get("Authorization"); s != "" {
// Check for Bearer token.
strs := strings.Split(s, " ")
if len(strs) == 2 {
switch strs[0] {
case "Token":
if u, p, ok := parseToken(strs[1]); ok {
return &credentials{
Username: u,
Token: p,
}, nil
}

// fallback to only a token
}
}

// Check for basic auth.
if u, p, ok := r.BasicAuth(); ok {
return &credentials{
Username: u,
Token: p,
}, nil
}
}

return nil, fmt.Errorf("unable to parse authentication credentials")
}
Loading

0 comments on commit 8a7dcc2

Please sign in to comment.