-
Notifications
You must be signed in to change notification settings - Fork 76
/
Copy pathbackup.py
237 lines (205 loc) · 8.44 KB
/
backup.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
__all__ = ["Backup"]
from numbers import Number
from typing import Optional
from arango.api import ApiGroup
from arango.exceptions import (
BackupCreateError,
BackupDeleteError,
BackupDownloadError,
BackupGetError,
BackupRestoreError,
BackupUploadError,
)
from arango.formatter import (
format_backup,
format_backup_restore,
format_backup_transfer,
format_backups,
)
from arango.request import Request
from arango.response import Response
from arango.result import Result
from arango.typings import Json
class Backup(ApiGroup): # pragma: no cover
def get(self, backup_id: Optional[str] = None) -> Result[Json]:
"""Return backup details.
:param backup_id: If set, details on only the specified backup is
returned. Otherwise, details on all backups are returned.
:type backup_id: str
:return: Backup details.
:rtype: dict
:raise arango.exceptions.BackupGetError: If delete fails.
"""
request = Request(
method="post",
endpoint="/_admin/backup/list",
data=None if backup_id is None else {"id": backup_id},
)
def response_handler(resp: Response) -> Json:
if resp.is_success:
return format_backups(resp.body["result"])
raise BackupGetError(resp, request)
return self._execute(request, response_handler)
def create(
self,
label: Optional[str] = None,
allow_inconsistent: Optional[bool] = None,
force: Optional[bool] = None,
timeout: Optional[Number] = None,
) -> Result[Json]:
"""Create a backup when the global write lock can be obtained.
:param label: Backup label. If not given, a UUID is used.
:type label: str
:param allow_inconsistent: Allow inconsistent backup when the global
transaction lock cannot be acquired before timeout. Default value
is False.
:type allow_inconsistent: bool
:param force: Forcefully abort all running transactions to ensure a
consistent backup when the global transaction lock cannot be
acquired before timeout. Default (and highly recommended) value
is False.
:type force: bool
:param timeout: Timeout in seconds for creating the backup. Default
value is 120 seconds.
:type timeout: int
:return: Result of the create operation.
:rtype: dict
:raise arango.exceptions.BackupCreateError: If create fails.
"""
data: Json = {"label": label}
if allow_inconsistent is not None:
data["allowInconsistent"] = allow_inconsistent
if force is not None:
data["force"] = force
if timeout is not None:
data["timeout"] = timeout
request = Request(method="post", endpoint="/_admin/backup/create", data=data)
def response_handler(resp: Response) -> Json:
if resp.is_success:
return format_backup(resp.body["result"])
raise BackupCreateError(resp, request)
return self._execute(request, response_handler)
def delete(self, backup_id: str) -> Result[bool]:
"""Delete a backup.
:param backup_id: Backup ID.
:type backup_id: str
:return: True if the backup was deleted successfully.
:rtype: bool
:raise arango.exceptions.BackupDeleteError: If delete fails.
"""
request = Request(
method="post", endpoint="/_admin/backup/delete", data={"id": backup_id}
)
def response_handler(resp: Response) -> bool:
if resp.is_success:
return True
raise BackupDeleteError(resp, request)
return self._execute(request, response_handler)
def download(
self,
backup_id: Optional[str] = None,
repository: Optional[str] = None,
abort: Optional[bool] = None,
config: Optional[Json] = None,
download_id: Optional[str] = None,
) -> Result[Json]:
"""Manage backup downloads.
:param backup_id: Backup ID used for scheduling a download. Mutually
exclusive with parameter **download_id**.
:type backup_id: str
:param repository: Remote repository URL (e.g. "local://tmp/backups").
Required for scheduling a download and mutually exclusive with
parameter **download_id**.
:type repository: str
:param abort: If set to True, running download is aborted. Used with
parameter **download_id**.
:type abort: bool
:param config: Remote repository configuration. Required for scheduling
a download and mutually exclusive with parameter **download_id**.
:type config: dict
:param download_id: Download ID. Mutually exclusive with parameters
**backup_id**, **repository**, and **config**.
:type download_id: str
:return: Download details.
:rtype: dict
:raise arango.exceptions.BackupDownloadError: If operation fails.
"""
data: Json = {}
if download_id is not None:
data["downloadId"] = download_id
if backup_id is not None:
data["id"] = backup_id
if repository is not None:
data["remoteRepository"] = repository
if abort is not None:
data["abort"] = abort
if config is not None:
data["config"] = config
request = Request(method="post", endpoint="/_admin/backup/download", data=data)
def response_handler(resp: Response) -> Json:
if resp.is_success:
return format_backup_transfer(resp.body["result"])
raise BackupDownloadError(resp, request)
return self._execute(request, response_handler)
def upload(
self,
backup_id: Optional[str] = None,
repository: Optional[str] = None,
abort: Optional[bool] = None,
config: Optional[Json] = None,
upload_id: Optional[str] = None,
) -> Result[Json]:
"""Manage backup uploads.
:param backup_id: Backup ID used for scheduling an upload. Mutually
exclusive with parameter **upload_id**.
:type backup_id: str
:param repository: Remote repository URL (e.g. "local://tmp/backups").
Required for scheduling a upload and mutually exclusive with
parameter **upload_id**.
:type repository: str
:param config: Remote repository configuration. Required for scheduling
an upload and mutually exclusive with parameter **upload_id**.
:type config: dict
:param upload_id: Upload ID. Mutually exclusive with parameters
**backup_id**, **repository**, and **config**.
:type upload_id: str
:param abort: If set to True, running upload is aborted. Used with
parameter **upload_id**.
:type abort: bool
:return: Upload details.
:rtype: dict
:raise arango.exceptions.BackupUploadError: If upload operation fails.
"""
data: Json = {}
if upload_id is not None:
data["uploadId"] = upload_id
if backup_id is not None:
data["id"] = backup_id
if repository is not None:
data["remoteRepository"] = repository
if abort is not None:
data["abort"] = abort
if config is not None:
data["config"] = config
request = Request(method="post", endpoint="/_admin/backup/upload", data=data)
def response_handler(resp: Response) -> Json:
if resp.is_success:
return format_backup_transfer(resp.body["result"])
raise BackupUploadError(resp, request)
return self._execute(request, response_handler)
def restore(self, backup_id: str) -> Result[Json]:
"""Restore from a local backup.
:param backup_id: Backup ID.
:type backup_id: str
:return: Result of the restore operation.
:rtype: dict
:raise arango.exceptions.BackupRestoreError: If restore fails.
"""
request = Request(
method="post", endpoint="/_admin/backup/restore", data={"id": backup_id}
)
def response_handler(resp: Response) -> Json:
if resp.is_success:
return format_backup_restore(resp.body["result"])
raise BackupRestoreError(resp, request)
return self._execute(request, response_handler)