Skip to content

Commit

Permalink
Switch ExecutionContext to be passed by value
Browse files Browse the repository at this point in the history
  • Loading branch information
jsternberg committed Jun 10, 2016
1 parent a6147fa commit 9db82e6
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 34 deletions.
4 changes: 2 additions & 2 deletions coordinator/statement_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,10 @@ type StatementExecutor struct {
MaxSelectBucketsN int
}

func (e *StatementExecutor) ExecuteStatement(stmt influxql.Statement, ctx *influxql.ExecutionContext) error {
func (e *StatementExecutor) ExecuteStatement(stmt influxql.Statement, ctx influxql.ExecutionContext) error {
// Select statements are handled separately so that they can be streamed.
if stmt, ok := stmt.(*influxql.SelectStatement); ok {
return e.executeSelectStatement(stmt, ctx)
return e.executeSelectStatement(stmt, &ctx)
}

var rows models.Rows
Expand Down
4 changes: 2 additions & 2 deletions influxql/query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ type ExecutionContext struct {
type StatementExecutor interface {
// ExecuteStatement executes a statement. Results should be sent to the
// results channel in the ExecutionContext.
ExecuteStatement(stmt Statement, ctx *ExecutionContext) error
ExecuteStatement(stmt Statement, ctx ExecutionContext) error
}

// StatementNormalizer normalizes a statement before it is executed.
Expand Down Expand Up @@ -208,7 +208,7 @@ func (e *QueryExecutor) executeQuery(query *Query, opt ExecutionOptions, closing
e.Logger.Println(stmt.String())

// Send any other statements to the underlying statement executor.
err = e.StatementExecutor.ExecuteStatement(stmt, &ctx)
err = e.StatementExecutor.ExecuteStatement(stmt, ctx)
if err == ErrQueryInterrupted {
// Query was interrupted so retrieve the real interrupt error from
// the query task if there is one.
Expand Down
20 changes: 10 additions & 10 deletions influxql/query_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ import (
var errUnexpected = errors.New("unexpected error")

type StatementExecutor struct {
ExecuteStatementFn func(stmt influxql.Statement, ctx *influxql.ExecutionContext) error
ExecuteStatementFn func(stmt influxql.Statement, ctx influxql.ExecutionContext) error
}

func (e *StatementExecutor) ExecuteStatement(stmt influxql.Statement, ctx *influxql.ExecutionContext) error {
func (e *StatementExecutor) ExecuteStatement(stmt influxql.Statement, ctx influxql.ExecutionContext) error {
return e.ExecuteStatementFn(stmt, ctx)
}

Expand All @@ -31,7 +31,7 @@ func TestQueryExecutor_AttachQuery(t *testing.T) {

e := NewQueryExecutor()
e.StatementExecutor = &StatementExecutor{
ExecuteStatementFn: func(stmt influxql.Statement, ctx *influxql.ExecutionContext) error {
ExecuteStatementFn: func(stmt influxql.Statement, ctx influxql.ExecutionContext) error {
if ctx.QueryID != 1 {
t.Errorf("incorrect query id: exp=1 got=%d", ctx.QueryID)
}
Expand All @@ -52,7 +52,7 @@ func TestQueryExecutor_KillQuery(t *testing.T) {

e := NewQueryExecutor()
e.StatementExecutor = &StatementExecutor{
ExecuteStatementFn: func(stmt influxql.Statement, ctx *influxql.ExecutionContext) error {
ExecuteStatementFn: func(stmt influxql.Statement, ctx influxql.ExecutionContext) error {
switch stmt.(type) {
case *influxql.KillQueryStatement:
return e.TaskManager.ExecuteStatement(stmt, ctx)
Expand Down Expand Up @@ -90,7 +90,7 @@ func TestQueryExecutor_Interrupt(t *testing.T) {

e := NewQueryExecutor()
e.StatementExecutor = &StatementExecutor{
ExecuteStatementFn: func(stmt influxql.Statement, ctx *influxql.ExecutionContext) error {
ExecuteStatementFn: func(stmt influxql.Statement, ctx influxql.ExecutionContext) error {
select {
case <-ctx.InterruptCh:
return influxql.ErrQueryInterrupted
Expand Down Expand Up @@ -118,7 +118,7 @@ func TestQueryExecutor_ShowQueries(t *testing.T) {

e := NewQueryExecutor()
e.StatementExecutor = &StatementExecutor{
ExecuteStatementFn: func(stmt influxql.Statement, ctx *influxql.ExecutionContext) error {
ExecuteStatementFn: func(stmt influxql.Statement, ctx influxql.ExecutionContext) error {
switch stmt.(type) {
case *influxql.ShowQueriesStatement:
return e.TaskManager.ExecuteStatement(stmt, ctx)
Expand Down Expand Up @@ -152,7 +152,7 @@ func TestQueryExecutor_Limit_Timeout(t *testing.T) {

e := NewQueryExecutor()
e.StatementExecutor = &StatementExecutor{
ExecuteStatementFn: func(stmt influxql.Statement, ctx *influxql.ExecutionContext) error {
ExecuteStatementFn: func(stmt influxql.Statement, ctx influxql.ExecutionContext) error {
select {
case <-ctx.InterruptCh:
return influxql.ErrQueryInterrupted
Expand Down Expand Up @@ -181,7 +181,7 @@ func TestQueryExecutor_Limit_ConcurrentQueries(t *testing.T) {

e := NewQueryExecutor()
e.StatementExecutor = &StatementExecutor{
ExecuteStatementFn: func(stmt influxql.Statement, ctx *influxql.ExecutionContext) error {
ExecuteStatementFn: func(stmt influxql.Statement, ctx influxql.ExecutionContext) error {
qid <- ctx.QueryID
<-ctx.InterruptCh
return influxql.ErrQueryInterrupted
Expand Down Expand Up @@ -221,7 +221,7 @@ func TestQueryExecutor_Close(t *testing.T) {

e := NewQueryExecutor()
e.StatementExecutor = &StatementExecutor{
ExecuteStatementFn: func(stmt influxql.Statement, ctx *influxql.ExecutionContext) error {
ExecuteStatementFn: func(stmt influxql.Statement, ctx influxql.ExecutionContext) error {
close(ch1)
<-ctx.InterruptCh
close(ch2)
Expand Down Expand Up @@ -268,7 +268,7 @@ func TestQueryExecutor_Panic(t *testing.T) {

e := NewQueryExecutor()
e.StatementExecutor = &StatementExecutor{
ExecuteStatementFn: func(stmt influxql.Statement, ctx *influxql.ExecutionContext) error {
ExecuteStatementFn: func(stmt influxql.Statement, ctx influxql.ExecutionContext) error {
panic("test error")
},
}
Expand Down
2 changes: 1 addition & 1 deletion influxql/task_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func NewTaskManager() *TaskManager {
}

// ExecuteStatement executes a statement containing one of the task management queries.
func (t *TaskManager) ExecuteStatement(stmt Statement, ctx *ExecutionContext) error {
func (t *TaskManager) ExecuteStatement(stmt Statement, ctx ExecutionContext) error {
switch stmt := stmt.(type) {
case *ShowQueriesStatement:
rows, err := t.executeShowQueriesStatement(stmt)
Expand Down
18 changes: 9 additions & 9 deletions services/continuous_querier/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func TestContinuousQueryService_Run(t *testing.T) {

// Set a callback for ExecuteStatement.
s.QueryExecutor.StatementExecutor = &StatementExecutor{
ExecuteStatementFn: func(stmt influxql.Statement, ctx *influxql.ExecutionContext) error {
ExecuteStatementFn: func(stmt influxql.Statement, ctx influxql.ExecutionContext) error {
callCnt++
if callCnt >= expectCallCnt {
done <- struct{}{}
Expand Down Expand Up @@ -120,7 +120,7 @@ func TestContinuousQueryService_ResampleOptions(t *testing.T) {

// Set a callback for ExecuteStatement.
s.QueryExecutor.StatementExecutor = &StatementExecutor{
ExecuteStatementFn: func(stmt influxql.Statement, ctx *influxql.ExecutionContext) error {
ExecuteStatementFn: func(stmt influxql.Statement, ctx influxql.ExecutionContext) error {
callCnt++
if callCnt >= expectCallCnt {
done <- struct{}{}
Expand Down Expand Up @@ -184,7 +184,7 @@ func TestContinuousQueryService_EveryHigherThanInterval(t *testing.T) {

// Set a callback for ExecuteQuery.
s.QueryExecutor.StatementExecutor = &StatementExecutor{
ExecuteStatementFn: func(stmt influxql.Statement, ctx *influxql.ExecutionContext) error {
ExecuteStatementFn: func(stmt influxql.Statement, ctx influxql.ExecutionContext) error {
callCnt++
if callCnt >= expectCallCnt {
done <- struct{}{}
Expand Down Expand Up @@ -239,7 +239,7 @@ func TestContinuousQueryService_NotLeader(t *testing.T) {
done := make(chan struct{})
// Set a callback for ExecuteStatement. Shouldn't get called because we're not the leader.
s.QueryExecutor.StatementExecutor = &StatementExecutor{
ExecuteStatementFn: func(stmt influxql.Statement, ctx *influxql.ExecutionContext) error {
ExecuteStatementFn: func(stmt influxql.Statement, ctx influxql.ExecutionContext) error {
done <- struct{}{}
ctx.Results <- &influxql.Result{Err: errUnexpected}
return nil
Expand All @@ -266,7 +266,7 @@ func TestContinuousQueryService_MetaClientFailsToGetDatabases(t *testing.T) {
done := make(chan struct{})
// Set ExecuteQuery callback, which shouldn't get called because of meta store failure.
s.QueryExecutor.StatementExecutor = &StatementExecutor{
ExecuteStatementFn: func(stmt influxql.Statement, ctx *influxql.ExecutionContext) error {
ExecuteStatementFn: func(stmt influxql.Statement, ctx influxql.ExecutionContext) error {
done <- struct{}{}
ctx.Results <- &influxql.Result{Err: errUnexpected}
return nil
Expand All @@ -287,7 +287,7 @@ func TestContinuousQueryService_MetaClientFailsToGetDatabases(t *testing.T) {
func TestExecuteContinuousQuery_InvalidQueries(t *testing.T) {
s := NewTestService(t)
s.QueryExecutor.StatementExecutor = &StatementExecutor{
ExecuteStatementFn: func(stmt influxql.Statement, ctx *influxql.ExecutionContext) error {
ExecuteStatementFn: func(stmt influxql.Statement, ctx influxql.ExecutionContext) error {
return errUnexpected
},
}
Expand Down Expand Up @@ -320,7 +320,7 @@ func TestExecuteContinuousQuery_InvalidQueries(t *testing.T) {
func TestExecuteContinuousQuery_QueryExecutor_Error(t *testing.T) {
s := NewTestService(t)
s.QueryExecutor.StatementExecutor = &StatementExecutor{
ExecuteStatementFn: func(stmt influxql.Statement, ctx *influxql.ExecutionContext) error {
ExecuteStatementFn: func(stmt influxql.Statement, ctx influxql.ExecutionContext) error {
return errExpected
},
}
Expand Down Expand Up @@ -483,10 +483,10 @@ type QueryExecutor struct {

// StatementExecutor is a mock statement executor.
type StatementExecutor struct {
ExecuteStatementFn func(stmt influxql.Statement, ctx *influxql.ExecutionContext) error
ExecuteStatementFn func(stmt influxql.Statement, ctx influxql.ExecutionContext) error
}

func (e *StatementExecutor) ExecuteStatement(stmt influxql.Statement, ctx *influxql.ExecutionContext) error {
func (e *StatementExecutor) ExecuteStatement(stmt influxql.Statement, ctx influxql.ExecutionContext) error {
return e.ExecuteStatementFn(stmt, ctx)
}

Expand Down
20 changes: 10 additions & 10 deletions services/httpd/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
// Ensure the handler returns results from a query (including nil results).
func TestHandler_Query(t *testing.T) {
h := NewHandler(false)
h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx *influxql.ExecutionContext) error {
h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx influxql.ExecutionContext) error {
if stmt.String() != `SELECT * FROM bar` {
t.Fatalf("unexpected query: %s", stmt.String())
} else if ctx.Database != `foo` {
Expand Down Expand Up @@ -86,7 +86,7 @@ func TestHandler_Query_Auth(t *testing.T) {
}

// Set mock statement executor for handler to use.
h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx *influxql.ExecutionContext) error {
h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx influxql.ExecutionContext) error {
if stmt.String() != `SELECT * FROM bar` {
t.Fatalf("unexpected query: %s", stmt.String())
} else if ctx.Database != `foo` {
Expand Down Expand Up @@ -178,7 +178,7 @@ func TestHandler_Query_Auth(t *testing.T) {
// Ensure the handler returns results from a query (including nil results).
func TestHandler_QueryRegex(t *testing.T) {
h := NewHandler(false)
h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx *influxql.ExecutionContext) error {
h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx influxql.ExecutionContext) error {
if stmt.String() != `SELECT * FROM test WHERE url =~ /http\:\/\/www.akamai\.com/` {
t.Fatalf("unexpected query: %s", stmt.String())
} else if ctx.Database != `test` {
Expand All @@ -195,7 +195,7 @@ func TestHandler_QueryRegex(t *testing.T) {
// Ensure the handler merges results from the same statement.
func TestHandler_Query_MergeResults(t *testing.T) {
h := NewHandler(false)
h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx *influxql.ExecutionContext) error {
h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx influxql.ExecutionContext) error {
ctx.Results <- &influxql.Result{StatementID: 1, Series: models.Rows([]*models.Row{{Name: "series0"}})}
ctx.Results <- &influxql.Result{StatementID: 1, Series: models.Rows([]*models.Row{{Name: "series1"}})}
return nil
Expand All @@ -213,7 +213,7 @@ func TestHandler_Query_MergeResults(t *testing.T) {
// Ensure the handler merges results from the same statement.
func TestHandler_Query_MergeEmptyResults(t *testing.T) {
h := NewHandler(false)
h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx *influxql.ExecutionContext) error {
h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx influxql.ExecutionContext) error {
ctx.Results <- &influxql.Result{StatementID: 1, Series: models.Rows{}}
ctx.Results <- &influxql.Result{StatementID: 1, Series: models.Rows([]*models.Row{{Name: "series1"}})}
return nil
Expand All @@ -231,7 +231,7 @@ func TestHandler_Query_MergeEmptyResults(t *testing.T) {
// Ensure the handler can parse chunked and chunk size query parameters.
func TestHandler_Query_Chunked(t *testing.T) {
h := NewHandler(false)
h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx *influxql.ExecutionContext) error {
h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx influxql.ExecutionContext) error {
if ctx.ChunkSize != 2 {
t.Fatalf("unexpected chunk size: %d", ctx.ChunkSize)
}
Expand Down Expand Up @@ -292,7 +292,7 @@ func TestHandler_Query_ErrInvalidQuery(t *testing.T) {
// Ensure the handler returns a status 200 if an error is returned in the result.
func TestHandler_Query_ErrResult(t *testing.T) {
h := NewHandler(false)
h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx *influxql.ExecutionContext) error {
h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx influxql.ExecutionContext) error {
return errors.New("measurement not found")
}

Expand Down Expand Up @@ -323,7 +323,7 @@ func TestHandler_Ping(t *testing.T) {
// Ensure the handler returns the version correctly from the different endpoints.
func TestHandler_Version(t *testing.T) {
h := NewHandler(false)
h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx *influxql.ExecutionContext) error {
h.StatementExecutor.ExecuteStatementFn = func(stmt influxql.Statement, ctx influxql.ExecutionContext) error {
return nil
}
tests := []struct {
Expand Down Expand Up @@ -479,10 +479,10 @@ func (s *HandlerMetaStore) User(username string) (*meta.UserInfo, error) {

// HandlerStatementExecutor is a mock implementation of Handler.StatementExecutor.
type HandlerStatementExecutor struct {
ExecuteStatementFn func(stmt influxql.Statement, ctx *influxql.ExecutionContext) error
ExecuteStatementFn func(stmt influxql.Statement, ctx influxql.ExecutionContext) error
}

func (e *HandlerStatementExecutor) ExecuteStatement(stmt influxql.Statement, ctx *influxql.ExecutionContext) error {
func (e *HandlerStatementExecutor) ExecuteStatement(stmt influxql.Statement, ctx influxql.ExecutionContext) error {
return e.ExecuteStatementFn(stmt, ctx)
}

Expand Down

0 comments on commit 9db82e6

Please sign in to comment.