Skip to content

Commit

Permalink
Properly wait for recovery to finish
Browse files Browse the repository at this point in the history
With this commit we ensure that a proper schedule is chosen when waiting
for shard recovery to finish. We also ensure that a short time-span where no
recoveries are active (but more might continue soon) is not mistakenly
treated as a condition where all recoveries have already finished.

Closes elastic#796
Relates elastic#800
  • Loading branch information
danielmitterdorfer authored Oct 21, 2019
1 parent 6817d8c commit 2e2d947
Show file tree
Hide file tree
Showing 6 changed files with 136 additions and 33 deletions.
5 changes: 5 additions & 0 deletions docs/adding_tracks.rst
Original file line number Diff line number Diff line change
Expand Up @@ -922,6 +922,11 @@ Example (assuming Rally has been invoked specifying ``default`` and ``remote`` i

For cases, where you want to provide a progress indication, you can implement the two properties ``percent_completed`` which returns a floating point value between ``0.0`` and ``1.0`` and the property ``completed`` which needs to return ``True`` if the runner has completed. This can be useful in cases when it is only possible to determine progress by calling an API, for example when waiting for a recovery to finish.

.. warning::

Rally will still treat such a runner like any other. If you want to poll status at certain intervals then limit the number of calls by specifying the ``target-throughput`` property on the corresponding task.


Custom schedulers
^^^^^^^^^^^^^^^^^

Expand Down
20 changes: 19 additions & 1 deletion docs/track.rst
Original file line number Diff line number Diff line change
Expand Up @@ -850,7 +850,25 @@ This is an administrative operation. Metrics are not reported by default. Report
wait-for-recovery
~~~~~~~~~~~~~~~~~

With the operation ``wait-for-recovery`` you can wait until an ongoing index recovery finishes. The ``wait-for-recovery`` operation does not support any parameters.
With the operation ``wait-for-recovery`` you can wait until an ongoing shard recovery finishes. The ``wait-for-recovery`` operation supports the following parameters:

* ``completion-recheck-attempts`` (optional, defaults to 3): It might be possible that the `index recovery API <https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-recovery.html>`_ reports that there are no active shard recoveries when a new one might be scheduled shortly afterwards. Therefore, this operation will check several times whether there are still no active recoveries. In between those attempts, it will wait for a time period specified by ``completion-recheck-wait-period``.
* ``completion-recheck-wait-period`` (optional, defaults to 2 seconds): Time in seconds to wait in between consecutive attempts.

.. warning::

By default this operation will run unthrottled (like any other) but you should limit the number of calls by specifying the ``target-throughput`` property on the corresponding task::

{
"operation": {
"operation-type": "wait-for-recovery",
"completion-recheck-attempts": 2,
"completion-recheck-wait-period": 5
},
"target-throughput": 10
}

In this example, Rally will check the progress of shard recovery every ten seconds (as specified by ``target-throughput``). When the index recovery API reports that there are no active recoveries, it will still check this twice (``completion-recheck-attempts``), waiting for five seconds in between those calls (``completion-recheck-wait-period``).

This is an administrative operation. Metrics are not reported by default. Reporting can be forced by setting ``include-in-reporting`` to ``true``.

Expand Down
7 changes: 5 additions & 2 deletions esrally/driver/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -1375,7 +1375,7 @@ def schedule_for(current_track, task, client_index):
runner_for_op = runner.runner_for(op.type)
params_for_op = track.operation_parameters(current_track, op).partition(client_index, num_clients)

if requires_time_period_schedule(task, params_for_op):
if requires_time_period_schedule(task, runner_for_op, params_for_op):
warmup_time_period = task.warmup_time_period if task.warmup_time_period else 0
logger.info("Creating time-period based schedule with [%s] distribution for [%s] with a warmup period of [%s] "
"seconds and a time period of [%s] seconds.", task.schedule, task.name,
Expand All @@ -1397,12 +1397,15 @@ def schedule_for(current_track, task, client_index):
return ScheduleHandle(task.name, sched, loop_control, runner_for_op, params_for_op)


def requires_time_period_schedule(task, params):
def requires_time_period_schedule(task, task_runner, params):
if task.warmup_time_period is not None or task.time_period is not None:
return True
# user has explicitly requested iterations
if task.warmup_iterations is not None or task.iterations is not None:
return False
# the runner determines completion
if task_runner.completed is not None:
return True
# If the parameter source ends after a finite amount of iterations, we will run with a time-based schedule
return not params.infinite

Expand Down
79 changes: 55 additions & 24 deletions esrally/driver/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ class Runner:
Base class for all operations against Elasticsearch.
"""

def __init__(self):
def __init__(self, *args, **kwargs):
super(Runner, self).__init__(*args, **kwargs)
self.logger = logging.getLogger(__name__)

def __enter__(self):
Expand All @@ -127,29 +128,52 @@ def __exit__(self, exc_type, exc_val, exc_tb):
return False


class Delegator:
"""
Mixin to unify delegate handling
"""
def __init__(self, delegate, *args, **kwargs):
super(Delegator, self).__init__(*args, **kwargs)
self.delegate = delegate


def unwrap(runner):
"""
Unwraps all delegators until the actual runner.
:param runner: An arbitrarily nested chain of delegators around a runner.
:return: The innermost runner.
"""
delegate = getattr(runner, "delegate", None)
if delegate:
return unwrap(delegate)
else:
return runner


def _single_cluster_runner(runnable, name, context_manager_enabled=False):
# only pass the default ES client
delegate = DelegatingRunner(runnable, name, lambda es: es["default"], context_manager_enabled)
return _with_completion(delegate, runnable)
delegate = MultiClientRunner(runnable, name, lambda es: es["default"], context_manager_enabled)
return _with_completion(delegate)


def _multi_cluster_runner(runnable, name, context_manager_enabled=False):
# pass all ES clients
delegate = DelegatingRunner(runnable, name, lambda es: es, context_manager_enabled)
return _with_completion(delegate, runnable)
delegate = MultiClientRunner(runnable, name, lambda es: es, context_manager_enabled)
return _with_completion(delegate)


def _with_completion(delegate, runnable):
if hasattr(runnable, "completed") and hasattr(runnable, "percent_completed"):
return WithCompletion(delegate, runnable)
def _with_completion(delegate):
unwrapped_runner = unwrap(delegate)
if hasattr(unwrapped_runner, "completed") and hasattr(unwrapped_runner, "percent_completed"):
return WithCompletion(delegate, unwrapped_runner)
else:
return NoCompletion(delegate)


class NoCompletion(Runner):
class NoCompletion(Runner, Delegator):
def __init__(self, delegate):
super().__init__()
self.delegate = delegate
super().__init__(delegate=delegate)

@property
def completed(self):
Expand All @@ -173,10 +197,9 @@ def __exit__(self, exc_type, exc_val, exc_tb):
return self.delegate.__exit__(exc_type, exc_val, exc_tb)


class WithCompletion(Runner):
class WithCompletion(Runner, Delegator):
def __init__(self, delegate, progressable):
super().__init__()
self.delegate = delegate
super().__init__(delegate=delegate)
self.progressable = progressable

@property
Expand All @@ -201,16 +224,15 @@ def __exit__(self, exc_type, exc_val, exc_tb):
return self.delegate.__exit__(exc_type, exc_val, exc_tb)


class DelegatingRunner(Runner):
class MultiClientRunner(Runner, Delegator):
def __init__(self, runnable, name, client_extractor, context_manager_enabled=False):
super().__init__()
self.runnable = runnable
super().__init__(delegate=runnable)
self.name = name
self.client_extractor = client_extractor
self.context_manager_enabled = context_manager_enabled

def __call__(self, *args):
return self.runnable(self.client_extractor(args[0]), *args[1:])
return self.delegate(self.client_extractor(args[0]), *args[1:])

def __repr__(self, *args, **kwargs):
if self.context_manager_enabled:
Expand All @@ -220,12 +242,12 @@ def __repr__(self, *args, **kwargs):

def __enter__(self):
if self.context_manager_enabled:
self.runnable.__enter__()
self.delegate.__enter__()
return self

def __exit__(self, exc_type, exc_val, exc_tb):
if self.context_manager_enabled:
return self.runnable.__exit__(exc_type, exc_val, exc_tb)
return self.delegate.__exit__(exc_type, exc_val, exc_tb)
else:
return False

Expand Down Expand Up @@ -1184,7 +1206,17 @@ def percent_completed(self):
return self._percent_completed

def __call__(self, es, params):
response = es.indices.recovery(active_only=True)
remaining_attempts = params.get("completion-recheck-attempts", 3)
wait_period = params.get("completion-recheck-wait-period", 2)
response = None
while not response and remaining_attempts > 0:
response = es.indices.recovery(active_only=True)
remaining_attempts -= 1
# This might also happen if all recoveries have just finished and we happen to call the API
# before the next recovery is scheduled.
if not response:
time.sleep(wait_period)

if not response:
self._completed = True
self._percent_completed = 1.0
Expand Down Expand Up @@ -1218,7 +1250,7 @@ def __repr__(self, *args, **kwargs):

# TODO: Allow to use this from (selected) regular runners and add user documentation.
# TODO: It would maybe be interesting to add meta-data on how many retries there were.
class Retry(Runner):
class Retry(Runner, Delegator):
"""
This runner can be used as a wrapper around regular runners to retry operations.
Expand All @@ -1234,8 +1266,7 @@ class Retry(Runner):
"""

def __init__(self, delegate):
super().__init__()
self.delegate = delegate
super().__init__(delegate=delegate)

def __enter__(self):
self.delegate.__enter__()
Expand Down
40 changes: 39 additions & 1 deletion tests/driver/driver_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -449,10 +449,30 @@ def calculate_global_throughput(self, samples):


class SchedulerTests(ScheduleTestCase):
class RunnerWithProgress:
def __init__(self, complete_after=3):
self.completed = False
self.percent_completed = 0.0
self.calls = 0
self.complete_after = complete_after

def __call__(self, *args, **kwargs):
self.calls += 1
if not self.completed:
self.percent_completed = self.calls / self.complete_after
self.completed = self.calls == self.complete_after
else:
self.percent_completed = 1.0

def setUp(self):
self.test_track = track.Track(name="unittest")
self.runner_with_progress = SchedulerTests.RunnerWithProgress()
params.register_param_source_for_name("driver-test-param-source", DriverTestParamSource)
runner.register_default_runners()
self.test_track = track.Track(name="unittest")
runner.register_runner("driver-test-runner-with-completion", self.runner_with_progress)

def tearDown(self):
runner.remove_runner("driver-test-runner-with-completion")

def test_search_task_one_client(self):
task = track.Task("search", track.Operation("search", track.OperationType.Search.name, param_source="driver-test-param-source"),
Expand Down Expand Up @@ -586,6 +606,24 @@ def test_finite_schedule_with_progress_indication(self):
(4.0, metrics.SampleType.Normal, 5 / 5, {"body": ["a"], "size": 5}),
], list(schedule), infinite_schedule=False)

def test_schedule_with_progress_determined_by_runner(self):
task = track.Task("time-based", track.Operation("time-based", "driver-test-runner-with-completion",
params={"body": ["a"]},
param_source="driver-test-param-source"),
clients=1,
params={"target-throughput": 1, "clients": 1})

schedule_handle = driver.schedule_for(self.test_track, task, 0)
schedule = schedule_handle()

self.assert_schedule([
(0.0, metrics.SampleType.Normal, None, {"body": ["a"]}),
(1.0, metrics.SampleType.Normal, None, {"body": ["a"]}),
(2.0, metrics.SampleType.Normal, None, {"body": ["a"]}),
(3.0, metrics.SampleType.Normal, None, {"body": ["a"]}),
(4.0, metrics.SampleType.Normal, None, {"body": ["a"]}),
], schedule, infinite_schedule=True)

def test_schedule_for_time_based(self):
task = track.Task("time-based", track.Operation("time-based", track.OperationType.Bulk.name, params={"body": ["a"], "size": 11},
param_source="driver-test-param-source"), warmup_time_period=0.1, time_period=0.1,
Expand Down
18 changes: 13 additions & 5 deletions tests/driver/runner_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -2080,12 +2080,16 @@ def test_indices_recovery_already_finished(self, es):
self.assertFalse(r.completed)
self.assertEqual(r.percent_completed, 0.0)

r(es, {})
r(es, {
"completion-recheck-wait-period": 0
})

self.assertTrue(r.completed)
self.assertEqual(r.percent_completed, 1.0)

es.indices.recovery.assert_called_once_with(active_only=True)
es.indices.recovery.assert_called_with(active_only=True)
# retries three times
self.assertEqual(3, es.indices.recovery.call_count)

@mock.patch("elasticsearch.Elasticsearch")
def test_waits_for_ongoing_indices_recovery(self, es):
Expand Down Expand Up @@ -2120,16 +2124,20 @@ def test_waits_for_ongoing_indices_recovery(self, es):
]
}
},
# completed
{}
# completed - will be called three times
{},
{},
{},
]

r = runner.IndicesRecovery()
self.assertFalse(r.completed)
self.assertEqual(r.percent_completed, 0.0)

while not r.completed:
recovered_bytes, unit = r(es, {})
recovered_bytes, unit = r(es, {
"completion-recheck-wait-period": 0
})
if r.completed:
# no additional bytes recovered since the last call
self.assertEqual(recovered_bytes, 0)
Expand Down

0 comments on commit 2e2d947

Please sign in to comment.