forked from AmbitionEng/django-pgtrigger
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathregistry.py
143 lines (105 loc) · 4.43 KB
/
registry.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
import collections
from pgtrigger import features
_unset = object()
# All registered triggers for each model
class _Registry(collections.UserDict):
@property
def pg_function_names(self):
"""
The postgres function names of all registered triggers
"""
return {trigger.get_pgid(model) for model, trigger in self.values()}
@property
def by_db_table(self):
"""
Return the registry keys by db_table, name
"""
return {(model._meta.db_table, trigger.name): trigger for model, trigger in self.values()}
def __getitem__(self, key):
assert isinstance(key, str)
if len(key.split(":")) == 1:
raise ValueError(
'Trigger URI must be in the format of "app_label.model_name:trigger_name"'
)
elif key not in _registry:
raise KeyError(f'URI "{key}" not found in pgtrigger registry')
return super().__getitem__(key)
def __setitem__(self, key, value):
assert isinstance(key, str)
model, trigger = value
assert f"{model._meta.label}:{trigger.name}" == key
found_trigger = self.by_db_table.get((model._meta.db_table, trigger.name))
if not found_trigger or found_trigger != trigger:
if found_trigger:
raise KeyError(
f'Trigger name "{trigger.name}" already'
f' used for model "{model._meta.label}"'
f' table "{model._meta.db_table}".'
)
if trigger.get_pgid(model) in self.pg_function_names:
raise KeyError(
f'Trigger "{trigger.name}" on model "{model._meta.label}"'
" has Postgres function name that's already in use."
" Use a different name for the trigger."
)
# Add the trigger to Meta.triggers.
# Note, pgtrigger's App.ready() method auto-registers any
# triggers in Meta already, meaning the trigger may already exist. If so, ignore it
if features.migrations(): # pragma: no branch
if trigger not in getattr(model._meta, "triggers", []):
model._meta.triggers = list(getattr(model._meta, "triggers", [])) + [trigger]
if trigger not in model._meta.original_attrs.get("triggers", []):
model._meta.original_attrs["triggers"] = list(
model._meta.original_attrs.get("triggers", [])
) + [trigger]
return super().__setitem__(key, value)
def __delitem__(self, key):
model, trigger = self[key]
super().__delitem__(key)
# If we support migration integration, remove from Meta triggers
if features.migrations(): # pragma: no branch
model._meta.triggers.remove(trigger)
# If model._meta.triggers and the original_attrs triggers are the same,
# we don't need to remove it from the original_attrs
if trigger in model._meta.original_attrs["triggers"]: # pragma: no branch
model._meta.original_attrs["triggers"].remove(trigger)
_registry = _Registry()
def set(uri, *, model, trigger):
_registry[uri] = (model, trigger)
def delete(uri):
del _registry[uri]
def registered(*uris):
"""
Get registered trigger objects.
Args:
*uris (str): URIs of triggers to get. If none are provided,
all triggers are returned. URIs are in the format of
``{app_label}.{model_name}:{trigger_name}``.
Returns:
List[Tuple[``models.Model``, `pgtrigger.Trigger`]]: Matching trigger objects.
"""
uris = uris or _registry.keys()
return [_registry[uri] for uri in uris]
def register(*triggers):
"""
Register the given triggers with wrapped Model class.
Args:
*triggers (`pgtrigger.Trigger`): Trigger classes to register.
Examples:
Register by decorating a model::
@pgtrigger.register(
pgtrigger.Protect(
name="append_only",
operation=(pgtrigger.Update | pgtrigger.Delete)
)
)
class MyModel(models.Model):
pass
Register by calling functionally::
pgtrigger.register(trigger_object)(MyModel)
"""
def _model_wrapper(model_class):
for trigger in triggers:
trigger.register(model_class)
return model_class
return _model_wrapper