Skip to content

Commit

Permalink
Initialize schedule lazily
Browse files Browse the repository at this point in the history
Previously, we have initialized the schedule eagerly when it was
created. Among the state of the schedule is the point in time when it
was started. However, as the schedule is not immediately executed after
creation, it is possible that the start timestamp is slightly off. This
is usually not a problem in practice but may cause test failures where
we use very short-lived schedules (one second in total).

With this commit we wrap the schedule (which is a generator) in a
handle. This handle is passed to the executor which can then lazily
initialize the schedule so the timestamp matches exactly the point in
time when the schedule is executed.

Closes elastic#600
Relates elastic#801
  • Loading branch information
danielmitterdorfer authored Oct 18, 2019
1 parent 5f49dbc commit 0aee2cf
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 66 deletions.
108 changes: 61 additions & 47 deletions esrally/driver/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -1091,7 +1091,7 @@ def __init__(self, task, schedule, es, sampler, cancel, complete, abort_on_error
"""
self.task = task
self.op = task.operation
self.schedule = schedule
self.schedule_handle = schedule
self.es = es
self.sampler = sampler
self.cancel = cancel
Expand All @@ -1101,9 +1101,11 @@ def __init__(self, task, schedule, es, sampler, cancel, complete, abort_on_error

def __call__(self, *args, **kwargs):
total_start = time.perf_counter()
# lazily initialize the schedule
schedule = self.schedule_handle()
# noinspection PyBroadException
try:
for expected_scheduled_time, sample_type, percent_completed, runner, params in self.schedule:
for expected_scheduled_time, sample_type, percent_completed, runner, params in schedule:
if self.cancel.is_set():
self.logger.info("User cancelled execution.")
break
Expand Down Expand Up @@ -1392,7 +1394,7 @@ def schedule_for(current_track, task, client_index):
"iterations and [%s] iterations.", task.schedule, task.name, str(warmup_iterations), str(iterations))
loop_control = IterationBased(warmup_iterations, iterations)

return generator_for_schedule(task.name, sched, loop_control, runner_for_op, params_for_op)
return ScheduleHandle(task.name, sched, loop_control, runner_for_op, params_for_op)


def requires_time_period_schedule(task, params):
Expand All @@ -1405,50 +1407,62 @@ def requires_time_period_schedule(task, params):
return not params.infinite


def generator_for_schedule(task_name, sched, task_progress_control, runner, params):
"""
Creates a generator that will yield individual task invocations for the provided schedule.
:param task_name: The name of the task for which the schedule is generated.
:param sched: The scheduler for this task.
:param task_progress_control: Controls how and how often this generator will loop.
:param runner: The runner for a given operation.
:param params: The parameter source for a given operation.
:return: A generator for the corresponding parameters.
"""
next_scheduled = 0
logger = logging.getLogger(__name__)
if task_progress_control.infinite:
logger.info("Parameter source will determine when the schedule for [%s] terminates.", task_name)
param_source_knows_progress = hasattr(params, "percent_completed")
task_progress_control.start()
while True:
try:
# does not contribute at all to completion. Hence, we cannot define completion.
percent_completed = params.percent_completed if param_source_knows_progress else None
yield (next_scheduled, task_progress_control.sample_type, percent_completed, runner, params.params())
next_scheduled = sched.next(next_scheduled)
task_progress_control.next()
except StopIteration:
logger.info("%s schedule for [%s] stopped due to StopIteration.", str(task_progress_control), task_name)
return
else:
task_progress_control.start()
logger.info("%s schedule will determine when the schedule for [%s] terminates.",
str(task_progress_control), task_name)
while not task_progress_control.completed:
try:
yield (next_scheduled,
task_progress_control.sample_type,
task_progress_control.percent_completed,
runner,
params.params())
next_scheduled = sched.next(next_scheduled)
task_progress_control.next()
except StopIteration:
logger.info("%s schedule for [%s] stopped due to StopIteration.", str(task_progress_control), task_name)
return
logger.info("%s schedule for [%s] stopped regularly.", str(task_progress_control), task_name)
class ScheduleHandle:
def __init__(self, task_name, sched, task_progress_control, runner, params):
"""
Creates a generator that will yield individual task invocations for the provided schedule.
:param task_name: The name of the task for which the schedule is generated.
:param sched: The scheduler for this task.
:param task_progress_control: Controls how and how often this generator will loop.
:param runner: The runner for a given operation.
:param params: The parameter source for a given operation.
:return: A generator for the corresponding parameters.
"""
self.task_name = task_name
self.sched = sched
self.task_progress_control = task_progress_control
self.runner = runner
self.params = params
self.logger = logging.getLogger(__name__)

def __call__(self):
next_scheduled = 0

if self.task_progress_control.infinite:
self.logger.info("Parameter source will determine when the schedule for [%s] terminates.", self.task_name)
param_source_knows_progress = hasattr(self.params, "percent_completed")
self.task_progress_control.start()
while True:
try:
# does not contribute at all to completion. Hence, we cannot define completion.
percent_completed = self.params.percent_completed if param_source_knows_progress else None
yield (next_scheduled, self.task_progress_control.sample_type, percent_completed, self.runner,
self.params.params())
next_scheduled = self.sched.next(next_scheduled)
self.task_progress_control.next()
except StopIteration:
self.logger.info("%s schedule for [%s] stopped due to StopIteration.",
str(self.task_progress_control), self.task_name)
return
else:
self.task_progress_control.start()
self.logger.info("%s schedule will determine when the schedule for [%s] terminates.",
str(self.task_progress_control), self.task_name)
while not self.task_progress_control.completed:
try:
yield (next_scheduled,
self.task_progress_control.sample_type,
self.task_progress_control.percent_completed,
self.runner,
self.params.params())
next_scheduled = self.sched.next(next_scheduled)
self.task_progress_control.next()
except StopIteration:
self.logger.info("%s schedule for [%s] stopped due to StopIteration.",
str(self.task_progress_control), self.task_name)
return
self.logger.info("%s schedule for [%s] stopped regularly.", str(self.task_progress_control), self.task_name)


class TimePeriodBased:
Expand Down
49 changes: 30 additions & 19 deletions tests/driver/driver_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -457,7 +457,8 @@ def setUp(self):
def test_search_task_one_client(self):
task = track.Task("search", track.Operation("search", track.OperationType.Search.name, param_source="driver-test-param-source"),
warmup_iterations=3, iterations=5, clients=1, params={"target-throughput": 10, "clients": 1})
schedule = driver.schedule_for(self.test_track, task, 0)
schedule_handle = driver.schedule_for(self.test_track, task, 0)
schedule = schedule_handle()

expected_schedule = [
(0, metrics.SampleType.Warmup, 1 / 8, {}),
Expand All @@ -474,7 +475,8 @@ def test_search_task_one_client(self):
def test_search_task_two_clients(self):
task = track.Task("search", track.Operation("search", track.OperationType.Search.name, param_source="driver-test-param-source"),
warmup_iterations=1, iterations=5, clients=2, params={"target-throughput": 10, "clients": 2})
schedule = driver.schedule_for(self.test_track, task, 0)
schedule_handle = driver.schedule_for(self.test_track, task, 0)
schedule = schedule_handle()

expected_schedule = [
(0, metrics.SampleType.Warmup, 1 / 6, {}),
Expand All @@ -492,47 +494,51 @@ def test_schedule_param_source_determines_iterations_no_warmup(self):
param_source="driver-test-param-source"),
clients=1, params={"target-throughput": 4, "clients": 4})

invocations = driver.schedule_for(self.test_track, task, 0)
schedule_handle = driver.schedule_for(self.test_track, task, 0)
schedule = schedule_handle()

self.assert_schedule([
(0.0, metrics.SampleType.Normal, 1 / 3, {"body": ["a"], "size": 3}),
(1.0, metrics.SampleType.Normal, 2 / 3, {"body": ["a"], "size": 3}),
(2.0, metrics.SampleType.Normal, 3 / 3, {"body": ["a"], "size": 3}),
], list(invocations))
], list(schedule))

def test_schedule_param_source_determines_iterations_including_warmup(self):
task = track.Task("bulk-index", track.Operation("bulk-index", track.OperationType.Bulk.name, params={"body": ["a"], "size": 5},
param_source="driver-test-param-source"),
warmup_iterations=2, clients=1, params={"target-throughput": 4, "clients": 4})

invocations = driver.schedule_for(self.test_track, task, 0)
schedule_handle = driver.schedule_for(self.test_track, task, 0)
schedule = schedule_handle()

self.assert_schedule([
(0.0, metrics.SampleType.Warmup, 1 / 5, {"body": ["a"], "size": 5}),
(1.0, metrics.SampleType.Warmup, 2 / 5, {"body": ["a"], "size": 5}),
(2.0, metrics.SampleType.Normal, 3 / 5, {"body": ["a"], "size": 5}),
(3.0, metrics.SampleType.Normal, 4 / 5, {"body": ["a"], "size": 5}),
(4.0, metrics.SampleType.Normal, 5 / 5, {"body": ["a"], "size": 5}),
], list(invocations))
], list(schedule))

def test_schedule_defaults_to_iteration_based(self):
# no time-period and no iterations specified on the task. Also, the parameter source does not define a size.
task = track.Task("bulk-index", track.Operation("bulk-index", track.OperationType.Bulk.name, params={"body": ["a"]},
param_source="driver-test-param-source"),
clients=1, params={"target-throughput": 4, "clients": 4})

invocations = driver.schedule_for(self.test_track, task, 0)
schedule_handle = driver.schedule_for(self.test_track, task, 0)
schedule = schedule_handle()

self.assert_schedule([
(0.0, metrics.SampleType.Normal, 1 / 1, {"body": ["a"]}),
], list(invocations))
], list(schedule))

def test_schedule_for_warmup_time_based(self):
task = track.Task("time-based", track.Operation("time-based", track.OperationType.Bulk.name, params={"body": ["a"], "size": 11},
param_source="driver-test-param-source"),
warmup_time_period=0, clients=4, params={"target-throughput": 4, "clients": 4})

invocations = driver.schedule_for(self.test_track, task, 0)
schedule_handle = driver.schedule_for(self.test_track, task, 0)
schedule = schedule_handle()

self.assert_schedule([
(0.0, metrics.SampleType.Normal, 1 / 11, {"body": ["a"], "size": 11}),
Expand All @@ -546,50 +552,53 @@ def test_schedule_for_warmup_time_based(self):
(8.0, metrics.SampleType.Normal, 9 / 11, {"body": ["a"], "size": 11}),
(9.0, metrics.SampleType.Normal, 10 / 11, {"body": ["a"], "size": 11}),
(10.0, metrics.SampleType.Normal, 11 / 11, {"body": ["a"], "size": 11}),
], list(invocations))
], list(schedule))

def test_infinite_schedule_without_progress_indication(self):
task = track.Task("time-based", track.Operation("time-based", track.OperationType.Bulk.name, params={"body": ["a"]},
param_source="driver-test-param-source"),
warmup_time_period=0, clients=4, params={"target-throughput": 4, "clients": 4})

invocations = driver.schedule_for(self.test_track, task, 0)
schedule_handle = driver.schedule_for(self.test_track, task, 0)
schedule = schedule_handle()

self.assert_schedule([
(0.0, metrics.SampleType.Normal, None, {"body": ["a"]}),
(1.0, metrics.SampleType.Normal, None, {"body": ["a"]}),
(2.0, metrics.SampleType.Normal, None, {"body": ["a"]}),
(3.0, metrics.SampleType.Normal, None, {"body": ["a"]}),
(4.0, metrics.SampleType.Normal, None, {"body": ["a"]}),
], invocations, infinite_schedule=True)
], schedule, infinite_schedule=True)

def test_finite_schedule_with_progress_indication(self):
task = track.Task("time-based", track.Operation("time-based", track.OperationType.Bulk.name, params={"body": ["a"], "size": 5},
param_source="driver-test-param-source"),
warmup_time_period=0, clients=4, params={"target-throughput": 4, "clients": 4})

invocations = driver.schedule_for(self.test_track, task, 0)
schedule_handle = driver.schedule_for(self.test_track, task, 0)
schedule = schedule_handle()

self.assert_schedule([
(0.0, metrics.SampleType.Normal, 1 / 5, {"body": ["a"], "size": 5}),
(1.0, metrics.SampleType.Normal, 2 / 5, {"body": ["a"], "size": 5}),
(2.0, metrics.SampleType.Normal, 3 / 5, {"body": ["a"], "size": 5}),
(3.0, metrics.SampleType.Normal, 4 / 5, {"body": ["a"], "size": 5}),
(4.0, metrics.SampleType.Normal, 5 / 5, {"body": ["a"], "size": 5}),
], list(invocations), infinite_schedule=False)
], list(schedule), infinite_schedule=False)

def test_schedule_for_time_based(self):
task = track.Task("time-based", track.Operation("time-based", track.OperationType.Bulk.name, params={"body": ["a"], "size": 11},
param_source="driver-test-param-source"), warmup_time_period=0.1, time_period=0.1,
clients=1)

invocations = list(driver.schedule_for(self.test_track, task, 0))
schedule_handle = driver.schedule_for(self.test_track, task, 0)
schedule = list(schedule_handle())

self.assertTrue(len(invocations) > 0)
self.assertTrue(len(schedule) > 0)

last_progress = -1

for invocation_time, sample_type, progress_percent, runner, params in invocations:
for invocation_time, sample_type, progress_percent, runner, params in schedule:
# we're not throughput throttled
self.assertEqual(0, invocation_time)
if progress_percent <= 0.5:
Expand Down Expand Up @@ -830,16 +839,18 @@ class ExpectedUnitTestException(Exception):
def run(*args, **kwargs):
raise ExpectedUnitTestException()

def schedule_handle():
return [(0, metrics.SampleType.Warmup, 0, self.context_managed(run), None)]

task = track.Task("no-op", track.Operation("no-op", track.OperationType.Bulk.name, params={},
param_source="driver-test-param-source"),
warmup_time_period=0.5, time_period=0.5, clients=4,
params={"clients": 4})

schedule = [(0, metrics.SampleType.Warmup, 0, self.context_managed(run), None)]
sampler = driver.Sampler(client_id=0, task=None, start_timestamp=0)
cancel = threading.Event()
complete = threading.Event()
execute_schedule = driver.Executor(task, schedule, es, sampler, cancel, complete)
execute_schedule = driver.Executor(task, schedule_handle, es, sampler, cancel, complete)

with self.assertRaises(ExpectedUnitTestException):
execute_schedule()
Expand Down

0 comments on commit 0aee2cf

Please sign in to comment.