forked from couchbase/couchbase-python-client
-
Notifications
You must be signed in to change notification settings - Fork 0
/
durability.py
125 lines (95 loc) · 4.01 KB
/
durability.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
from functools import wraps
from typing import *
from couchbase_core.supportability import internal
from .options import Cardinal, OptionBlock, OptionBlockTimeOut
from couchbase_core.durability import Durability
from datetime import timedelta
try:
from typing import TypedDict
except:
from typing_extensions import TypedDict
ReplicateTo = Cardinal
PersistTo = Cardinal
T = TypeVar('T', bound=OptionBlock)
class DurabilityTypeBase(dict):
def __init__(self, content):
super(DurabilityTypeBase,self).__init__(**content)
class DurabilityType(dict):
@internal
def __init__(self, # type: DurabilityType
content # type: Dict[str, Any]
):
# type: (...) -> None
"""
Durability configuration options
:param content: dictionary passed up from subclasses
"""
super(DurabilityType, self).__init__(content)
class ClientDurability(DurabilityType):
Storage = TypedDict('Storage', {'replicate_to': ReplicateTo, 'persist_to': PersistTo}, total=True)
def __init__(self, # type: T
replicate_to=ReplicateTo.NONE, # type: ReplicateTo
persist_to=PersistTo.NONE # type: PersistTo
):
# type: (...) -> None
"""
Client Durability
:param persist_to: If set, wait for the item to be removed
from the storage of at least these many nodes
:param replicate_to: If set, wait for the item to be removed
from the cache of at least these many nodes
(excluding the master)
"""
super(ClientDurability, self).__init__(ClientDurability.Storage(replicate_to=replicate_to, persist_to=persist_to))
class ServerDurability(DurabilityType):
Storage = TypedDict('Storage', {'level': Durability}, total=True)
def __init__(self, # type: ServerDurability
level, # type: Durability
):
# type: (...) -> None
"""
Server-based Durability (Synchronous Replication)
:param Durability level: durability level
"""
super(ServerDurability, self).__init__(ServerDurability.Storage(level=level))
class ClientDurableOptionBlock(OptionBlockTimeOut):
def __init__(self, # type: ClientDurableOptionBlock
timeout=None, # type: timedelta
durability=None # type: ClientDurability
):
# type: (...) -> None
"""
Options for operations with client-type durability
:param durability: Client durability settings
:param timeout: Timeout for operation
"""
super(ClientDurableOptionBlock, self).__init__(durability=durability, timeout=timeout)
class ServerDurableOptionBlock(OptionBlockTimeOut):
def __init__(self, # type: ServerDurableOptionBlock
timeout=None, # type: timedelta
durability=None # type: ServerDurability
):
# type: (...) -> None
"""
Options for operations with server-type durability
:param durability: Server durability settings
:param timeout: Timeout for operation
"""
super(ServerDurableOptionBlock, self).__init__(durability=durability, timeout=timeout)
class DurabilityOptionBlock(OptionBlockTimeOut):
def __init__(self, # type: DurabilityOptionBlock
timeout=None, # type: timedelta
durability=None, # type: DurabilityType
expiry=None, # type: timedelta
**kwargs):
# type: (...) -> None
"""
Options for operations with any type of durability
:param durability: Durability settings
:param expiry: When any mutation should expire
:param timeout: Timeout for operation
"""
super(DurabilityOptionBlock, self).__init__(durability=durability, expiry=expiry, timeout=timeout, **kwargs)
@property
def expiry(self):
return self.get('expiry', None)