Skip to content

Commit

Permalink
Add ability to restore from a snapshot
Browse files Browse the repository at this point in the history
With this commit we introduce new operations in Rally that allow Rally
to restore data from a snapshot.

Closes elastic#341
Relates elastic#793
  • Loading branch information
danielmitterdorfer authored and dliappis committed Oct 16, 2019
1 parent 3ecc1be commit a05207f
Show file tree
Hide file tree
Showing 4 changed files with 297 additions and 1 deletion.
48 changes: 48 additions & 0 deletions docs/track.rst
Original file line number Diff line number Diff line change
Expand Up @@ -805,6 +805,54 @@ With the operation ``sleep`` you can sleep for a certain duration to ensure no r

This is an administrative operation. Metrics are not reported by default. Reporting can be forced by setting ``include-in-reporting`` to ``true``.

delete-snapshot-repository
~~~~~~~~~~~~~~~~~~~~~~~~~~

With the operation ``delete-snapshot-repository`` you can delete an existing snapshot repository. The ``delete-snapshot-repository`` operation supports the following parameter:

* ``repository`` (mandatory): The name of the snapshot repository to delete.

This is an administrative operation. Metrics are not reported by default. Reporting can be forced by setting ``include-in-reporting`` to ``true``.

create-snapshot-repository
~~~~~~~~~~~~~~~~~~~~~~~~~~

With the operation ``create-snapshot-repository`` you can create a new snapshot repository. The ``create-snapshot-repository`` operation supports the following parameters:

* ``repository`` (mandatory): The name of the snapshot repository to create.
* ``body`` (mandatory): The body of the create snapshot repository request.
* ``request-params`` (optional): A structure containing HTTP request parameters.

This is an administrative operation. Metrics are not reported by default. Reporting can be forced by setting ``include-in-reporting`` to ``true``.

restore-snapshot
~~~~~~~~~~~~~~~~

With the operation ``restore-snapshot`` you can restore a snapshot from an already created snapshot repository. The ``restore-snapshot`` operation supports the following parameters:

* ``repository`` (mandatory): The name of the snapshot repository to use. This snapshot repository must exist prior to calling ``restore-snapshot``.
* ``snapshot`` (mandatory): The name of the snapshot to restore.
* ``wait-for-completion`` (optional, defaults to ``False``): Whether this call should return immediately or block until the snapshot is restored.
* ``request-params`` (optional): A structure containing HTTP request parameters.

.. note::
In order to ensure that the track execution only continues after a snapshot has been restored, set ``wait-for-completion`` to ``true`` **and** increase the request timeout. In the example below we set it to 7200 seconds (or 2 hours)::

"request-params": {
"request_timeout": 7200
}

However, this might not work if a proxy is in between the client and Elasticsearch and the proxy has a shorter request timeout configured than the client. In this case, keep the default value for ``wait-for-completion`` and instead add a ``wait-for-recovery`` runner in the next step. This has the additional advantage that you'll get a progress report while the snapshot is being restored.

This is an administrative operation. Metrics are not reported by default. Reporting can be forced by setting ``include-in-reporting`` to ``true``.

wait-for-recovery
~~~~~~~~~~~~~~~~~

With the operation ``wait-for-recovery`` you can wait until an ongoing index recovery finishes. The ``wait-for-recovery`` operation does not support any parameters.

This is an administrative operation. Metrics are not reported by default. Reporting can be forced by setting ``include-in-reporting`` to ``true``.

schedule
~~~~~~~~

Expand Down
104 changes: 103 additions & 1 deletion esrally/driver/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ def register_default_runners():
register_runner(track.OperationType.RawRequest.name, RawRequest())
# This is an administrative operation but there is no need for a retry here as we don't issue a request
register_runner(track.OperationType.Sleep.name, Sleep())

# these requests should not be retried as they are not idempotent
register_runner(track.OperationType.RestoreSnapshot.name, RestoreSnapshot())
# We treat the following as administrative commands and thus already start to wrap them in a retry.
register_runner(track.OperationType.ClusterHealth.name, Retry(ClusterHealth()))
register_runner(track.OperationType.PutPipeline.name, Retry(PutPipeline()))
Expand All @@ -56,6 +57,9 @@ def register_default_runners():
register_runner(track.OperationType.DeleteMlJob.name, Retry(DeleteMlJob()))
register_runner(track.OperationType.OpenMlJob.name, Retry(OpenMlJob()))
register_runner(track.OperationType.CloseMlJob.name, Retry(CloseMlJob()))
register_runner(track.OperationType.DeleteSnapshotRepository.name, Retry(DeleteSnapshotRepository()))
register_runner(track.OperationType.CreateSnapshotRepository.name, Retry(CreateSnapshotRepository()))
register_runner(track.OperationType.WaitForRecovery.name, Retry(IndicesRecovery()))


def runner_for(operation_type):
Expand Down Expand Up @@ -1122,6 +1126,104 @@ def __repr__(self, *args, **kwargs):
return "sleep"


class DeleteSnapshotRepository(Runner):
"""
Deletes a snapshot repository
"""
def __call__(self, es, params):
es.snapshot.delete_repository(repository=mandatory(params, "repository", repr(self)))

def __repr__(self, *args, **kwargs):
return "delete-snapshot-repository"


class CreateSnapshotRepository(Runner):
"""
Creates a new snapshot repository
"""
def __call__(self, es, params):
request_params = params.get("request-params", {})
es.snapshot.create_repository(
repository=mandatory(params, "repository", repr(self)),
body=mandatory(params, "body", repr(self)),
params=request_params)

def __repr__(self, *args, **kwargs):
return "create-snapshot-repository"


class RestoreSnapshot(Runner):
"""
Restores a snapshot from an already registered repository
"""
def __call__(self, es, params):
request_params = params.get("request-params", {})
if "retries" not in request_params:
# It is possible that there is a proxy in between the cluster and our client which has a shorter timeout
# configured. In that case, the proxy would return 504, the client would retry the operation and hit an
# error about an index that is already existing. This error message is confusing and thus we explicitly
# disallow the client to ever retry.
#
# see also the docs for ``retries`` in the ``urllib3.connectionpool.urlopen``.
request_params["retries"] = 0

es.snapshot.restore(repository=mandatory(params, "repository", repr(self)),
snapshot=mandatory(params, "snapshot", repr(self)),
wait_for_completion=params.get("wait-for-completion", False),
params=request_params)

def __repr__(self, *args, **kwargs):
return "restore-snapshot"


class IndicesRecovery(Runner):
def __init__(self):
super().__init__()
self._completed = False
self._percent_completed = 0.0
self._last_recovered = None

@property
def completed(self):
return self._completed

@property
def percent_completed(self):
return self._percent_completed

def __call__(self, es, params):
response = es.indices.recovery(active_only=True)
if not response:
self._completed = True
self._percent_completed = 1.0
self._last_recovered = None
return 0, "bytes"
else:
recovered = 0
total_size = 0
for _, idx_data in response.items():
for _, shard_data in idx_data.items():
for shard in shard_data:
idx_size = shard["index"]["size"]
recovered += idx_size["recovered_in_bytes"]
total_size += idx_size["total_in_bytes"]
# translog is not in size but rather in absolute numbers. Ignore it for progress reporting.
# translog = shard_data["translog"]
# we only consider it completed if we get an empty response
self._completed = False
self._percent_completed = recovered / total_size
# this is cumulative so we need to consider the data from last time
if self._last_recovered:
newly_recovered = max(recovered - self._last_recovered, 0)
else:
newly_recovered = recovered
self._last_recovered = recovered
return newly_recovered, "bytes"

def __repr__(self, *args, **kwargs):
return "wait-for-recovery"


# TODO: Allow to use this from (selected) regular runners and add user documentation.
# TODO: It would maybe be interesting to add meta-data on how many retries there were.
class Retry(Runner):
Expand Down
12 changes: 12 additions & 0 deletions esrally/track/track.py
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,10 @@ class OperationType(Enum):
OpenMlJob = 1016
CloseMlJob = 1017
Sleep = 1018
DeleteSnapshotRepository = 1019
CreateSnapshotRepository = 1020
RestoreSnapshot = 1021
WaitForRecovery = 1022

@property
def admin_op(self):
Expand Down Expand Up @@ -490,6 +494,14 @@ def from_hyphenated_string(cls, v):
return OperationType.CloseMlJob
elif v == "sleep":
return OperationType.Sleep
elif v == "delete-snapshot-repository":
return OperationType.DeleteSnapshotRepository
elif v == "create-snapshot-repository":
return OperationType.CreateSnapshotRepository
elif v == "restore-snapshot":
return OperationType.RestoreSnapshot
elif v == "wait-for-recovery":
return OperationType.WaitForRecovery
else:
raise KeyError("No enum value for [%s]" % v)

Expand Down
134 changes: 134 additions & 0 deletions tests/driver/runner_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1976,6 +1976,140 @@ def test_sleep(self, sleep, es):
sleep.assert_called_once_with(4.3)


class DeleteSnapshotRepositoryTests(TestCase):
@mock.patch("elasticsearch.Elasticsearch")
def test_delete_snapshot_repository(self, es):
params = {
"repository": "backups"
}

r = runner.DeleteSnapshotRepository()
r(es, params)

es.snapshot.delete_repository.assert_called_once_with(repository="backups")


class CreateSnapshotRepositoryTests(TestCase):
@mock.patch("elasticsearch.Elasticsearch")
def test_create_snapshot_repository(self, es):
params = {
"repository": "backups",
"body": {
"type": "fs",
"settings": {
"location": "/var/backups"
}
}
}

r = runner.CreateSnapshotRepository()
r(es, params)

es.snapshot.create_repository.assert_called_once_with(repository="backups",
body={
"type": "fs",
"settings": {
"location": "/var/backups"
}
},
params={})


class RestoreSnapshotTests(TestCase):
@mock.patch("elasticsearch.Elasticsearch")
def test_restore_snapshot(self, es):
params = {
"repository": "backups",
"snapshot": "snapshot-001",
"wait-for-completion": True,
"request-params": {
"request_timeout": 7200
}
}

r = runner.RestoreSnapshot()
r(es, params)

es.snapshot.restore.assert_called_once_with(repository="backups",
snapshot="snapshot-001",
wait_for_completion=True,
params={
"request_timeout": 7200,
"retries": 0
})


class IndicesRecoveryTests(TestCase):
@mock.patch("elasticsearch.Elasticsearch")
def test_indices_recovery_already_finished(self, es):
# empty response
es.indices.recovery.return_value = {}

r = runner.IndicesRecovery()
self.assertFalse(r.completed)
self.assertEqual(r.percent_completed, 0.0)

r(es, {})

self.assertTrue(r.completed)
self.assertEqual(r.percent_completed, 1.0)

es.indices.recovery.assert_called_once_with(active_only=True)

@mock.patch("elasticsearch.Elasticsearch")
def test_waits_for_ongoing_indices_recovery(self, es):
# empty response
es.indices.recovery.side_effect = [
# active recovery
{
"index1": {
"shards": [
{
"id": 0,
"index": {
"size": {
"total": "75.4mb",
"total_in_bytes": 79063092,
"recovered": "65.7mb",
"recovered_in_bytes": 68891939,
}
}
},
{
"id": 1,
"index": {
"size": {
"total": "175.4mb",
"total_in_bytes": 179063092,
"recovered": "165.7mb",
"recovered_in_bytes": 168891939,
}
}
}
]
}
},
# completed
{}
]

r = runner.IndicesRecovery()
self.assertFalse(r.completed)
self.assertEqual(r.percent_completed, 0.0)

while not r.completed:
recovered_bytes, unit = r(es, {})
if r.completed:
# no additional bytes recovered since the last call
self.assertEqual(recovered_bytes, 0)
self.assertEqual(r.percent_completed, 1.0)
else:
# sum of both shards
self.assertEqual(recovered_bytes, 237783878)
self.assertAlmostEqual(r.percent_completed, 0.9211, places=3)
self.assertEqual(unit, "bytes")


class ShrinkIndexTests(TestCase):
@mock.patch("elasticsearch.Elasticsearch")
# To avoid real sleeps in unit tests
Expand Down

0 comments on commit a05207f

Please sign in to comment.