Skip to content

Commit

Permalink
refactor(tasks): remove dead scheduler code (influxdata#16252)
Browse files Browse the repository at this point in the history
  • Loading branch information
AlirieGray authored Jan 2, 2020
1 parent 047afcf commit 6c6bd39
Show file tree
Hide file tree
Showing 30 changed files with 1,747 additions and 5,898 deletions.
147 changes: 54 additions & 93 deletions cmd/influxd/launcher/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ import (
"github.com/influxdata/influxdb/storage/readservice"
taskbackend "github.com/influxdata/influxdb/task/backend"
"github.com/influxdata/influxdb/task/backend/coordinator"
taskexecutor "github.com/influxdata/influxdb/task/backend/executor"
"github.com/influxdata/influxdb/task/backend/executor"
"github.com/influxdata/influxdb/task/backend/middleware"
"github.com/influxdata/influxdb/task/backend/scheduler"
"github.com/influxdata/influxdb/telemetry"
Expand Down Expand Up @@ -259,12 +259,6 @@ func buildLauncherCommand(l *Launcher, cmd *cobra.Command) {
Default: "",
Desc: "TLS key for HTTPs",
},
{
DestP: &l.EnableNewScheduler,
Flag: "feature-enable-new-scheduler",
Default: false,
Desc: "feature flag that enables using the new treescheduler",
},
}

cli.BindOptions(cmd, opts)
Expand Down Expand Up @@ -308,9 +302,8 @@ type Launcher struct {
natsServer *nats.Server
natsPort int

EnableNewScheduler bool
scheduler *taskbackend.TickScheduler
treeScheduler *scheduler.TreeScheduler
scheduler *scheduler.TreeScheduler
executor *executor.Executor
taskControlService taskbackend.TaskControlService

jaegerTracerCloser io.Closer
Expand Down Expand Up @@ -374,11 +367,8 @@ func (m *Launcher) Shutdown(ctx context.Context) {
m.httpServer.Shutdown(ctx)

m.log.Info("Stopping", zap.String("service", "task"))
if m.EnableNewScheduler {
m.treeScheduler.Stop()
} else {
m.scheduler.Stop()
}

m.scheduler.Stop()

m.log.Info("Stopping", zap.String("service", "nats"))
m.natsServer.Close()
Expand Down Expand Up @@ -631,90 +621,67 @@ func (m *Launcher) run(ctx context.Context) (err error) {
var storageQueryService = readservice.NewProxyQueryService(m.queryController)
var taskSvc platform.TaskService
{
// create the task stack:
// validation(coordinator(analyticalstore(kv.Service)))
// create the task stack
combinedTaskService := taskbackend.NewAnalyticalStorage(m.log.With(zap.String("service", "task-analytical-store")), m.kvService, m.kvService, m.kvService, pointsWriter, query.QueryServiceBridge{AsyncQueryService: m.queryController})
if m.EnableNewScheduler {
executor, executorMetrics := taskexecutor.NewExecutor(
m.log.With(zap.String("service", "task-executor")),
query.QueryServiceBridge{AsyncQueryService: m.queryController},
authSvc,
combinedTaskService,
combinedTaskService,
)
m.reg.MustRegister(executorMetrics.PrometheusCollectors()...)
schLogger := m.log.With(zap.String("service", "task-scheduler"))

sch, sm, err := scheduler.NewScheduler(
executor,
taskbackend.NewSchedulableTaskService(m.kvService),
scheduler.WithOnErrorFn(func(ctx context.Context, taskID scheduler.ID, scheduledFor time.Time, err error) {
schLogger.Info(
"error in scheduler run",
zap.String("taskID", platform.ID(taskID).String()),
zap.Time("scheduledFor", scheduledFor),
zap.Error(err))
}),
)
if err != nil {
m.log.Fatal("could not start task scheduler", zap.Error(err))
}
m.treeScheduler = sch
m.reg.MustRegister(sm.PrometheusCollectors()...)
coordLogger := m.log.With(zap.String("service", "task-coordinator"))
taskCoord := coordinator.NewCoordinator(
coordLogger,
sch,
executor)

taskSvc = middleware.New(combinedTaskService, taskCoord)
m.taskControlService = combinedTaskService
if err := taskbackend.TaskNotifyCoordinatorOfExisting(
ctx,
taskSvc,
combinedTaskService,
taskCoord,
func(ctx context.Context, taskID platform.ID, runID platform.ID) error {
_, err := executor.ResumeCurrentRun(ctx, taskID, runID)
return err
},
coordLogger); err != nil {
m.log.Error("Failed to resume existing tasks", zap.Error(err))
}
} else {

// define the executor and build analytical storage middleware
executor := taskexecutor.NewAsyncQueryServiceExecutor(m.log.With(zap.String("service", "task-executor")), m.queryController, authSvc, combinedTaskService)

// create the scheduler
m.scheduler = taskbackend.NewScheduler(m.log.With(zap.String("svc", "taskd/scheduler")), combinedTaskService, executor, time.Now().UTC().Unix(), taskbackend.WithTicker(ctx, 100*time.Millisecond))
m.scheduler.Start(ctx)
m.reg.MustRegister(m.scheduler.PrometheusCollectors()...)

logger := m.log.With(zap.String("service", "task-coordinator"))
coordinator := coordinator.New(logger, m.scheduler)

// resume existing task claims from task service
if err := taskbackend.NotifyCoordinatorOfExisting(ctx, logger, combinedTaskService, coordinator); err != nil {
logger.Error("Failed to resume existing tasks", zap.Error(err))
}

taskSvc = middleware.New(combinedTaskService, coordinator)
taskSvc = authorizer.NewTaskService(m.log.With(zap.String("service", "task-authz-validator")), taskSvc)
m.taskControlService = combinedTaskService
executor, executorMetrics := executor.NewExecutor(
m.log.With(zap.String("service", "task-executor")),
query.QueryServiceBridge{AsyncQueryService: m.queryController},
authSvc,
combinedTaskService,
combinedTaskService,
)
m.executor = executor
m.reg.MustRegister(executorMetrics.PrometheusCollectors()...)
schLogger := m.log.With(zap.String("service", "task-scheduler"))

sch, sm, err := scheduler.NewScheduler(
executor,
taskbackend.NewSchedulableTaskService(m.kvService),
scheduler.WithOnErrorFn(func(ctx context.Context, taskID scheduler.ID, scheduledAt time.Time, err error) {
schLogger.Info(
"error in scheduler run",
zap.String("taskID", platform.ID(taskID).String()),
zap.Time("scheduledAt", scheduledAt),
zap.Error(err))
}),
)
if err != nil {
m.log.Fatal("could not start task scheduler", zap.Error(err))
}
m.scheduler = sch
m.reg.MustRegister(sm.PrometheusCollectors()...)
coordLogger := m.log.With(zap.String("service", "task-coordinator"))
taskCoord := coordinator.NewCoordinator(
coordLogger,
sch,
executor)

taskSvc = middleware.New(combinedTaskService, taskCoord)
m.taskControlService = combinedTaskService
if err := taskbackend.TaskNotifyCoordinatorOfExisting(
ctx,
taskSvc,
combinedTaskService,
taskCoord,
func(ctx context.Context, taskID platform.ID, runID platform.ID) error {
_, err := executor.ResumeCurrentRun(ctx, taskID, runID)
return err
},
coordLogger); err != nil {
m.log.Error("Failed to resume existing tasks", zap.Error(err))
}

}

var checkSvc platform.CheckService
{
coordinator := coordinator.New(m.log, m.scheduler)
coordinator := coordinator.NewCoordinator(m.log, m.scheduler, m.executor)
checkSvc = middleware.NewCheckService(m.kvService, m.kvService, coordinator)
}

var notificationRuleSvc platform.NotificationRuleStore
{
coordinator := coordinator.New(m.log, m.scheduler)
coordinator := coordinator.NewCoordinator(m.log, m.scheduler, m.executor)
notificationRuleSvc = middleware.NewNotificationRuleStore(m.kvService, m.kvService, coordinator)
}

Expand Down Expand Up @@ -985,12 +952,6 @@ func (m *Launcher) TaskControlService() taskbackend.TaskControlService {
return m.taskControlService
}

// TaskScheduler returns the internal scheduler service.
// TODO(docmerlin): remove this when we delete the old scheduler
func (m *Launcher) TaskScheduler() taskbackend.Scheduler {
return m.scheduler
}

// KeyValueService returns the internal key-value service.
func (m *Launcher) KeyValueService() *kv.Service {
return m.kvService
Expand Down
8 changes: 0 additions & 8 deletions cmd/influxd/launcher/tasks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/cmd/influxd/launcher"
pctx "github.com/influxdata/influxdb/context"
"github.com/influxdata/influxdb/task/backend"
)

func TestLauncher_Task(t *testing.T) {
Expand Down Expand Up @@ -103,13 +102,6 @@ from(bucket:"my_bucket_in") |> range(start:-5m) |> to(bucket:"%s", org:"%s")`, b
t.Fatal(err)
}

// Find the next due run of the task we just created, so that we can accurately tick the scheduler to it.
ndr, err := be.TaskControlService().NextDueRun(ctx, created.ID)
if err != nil {
t.Fatal(err)
}
be.TaskScheduler().(*backend.TickScheduler).Tick(ndr + 1)

// Poll for the task to have started and finished.
deadline := time.Now().Add(10 * time.Second) // Arbitrary deadline; 10s seems safe for -race on a resource-constrained system.
var targetRun influxdb.Run
Expand Down
Loading

0 comments on commit 6c6bd39

Please sign in to comment.