forked from dimagi/commcare-hq
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsettingshelper.py
199 lines (159 loc) · 6.92 KB
/
settingshelper.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
from __future__ import print_function
from collections import namedtuple
import os
import sys
import tempfile
import uuid
from django.db.backends.base.creation import TEST_DATABASE_PREFIX
def is_testing():
return len(sys.argv) > 1 and sys.argv[1] == "test"
class SharedDriveConfiguration(object):
def __init__(self, shared_drive_path, restore_dir, transfer_dir, temp_dir, blob_dir):
self.shared_drive_path = shared_drive_path
self.restore_dir_name = restore_dir
self.transfer_dir_name = transfer_dir
self.temp_dir_name = temp_dir
self.blob_dir_name = blob_dir
self._restore_dir = self._init_dir(restore_dir)
self.transfer_dir = self._init_dir(transfer_dir)
self.temp_dir = self._init_dir(temp_dir)
self.blob_dir = self._init_dir(blob_dir)
self.tzmigration_planning_dir = self._init_dir('tzmigration-planning')
def _init_dir(self, name):
if not self.shared_drive_path or not os.path.isdir(self.shared_drive_path) or not name:
return None
path = os.path.join(self.shared_drive_path, name)
if not os.path.exists(path):
os.mkdir(path)
elif not os.path.isdir(path):
raise Exception('Shared folder is not a directory: {}'.format(name))
return path
def get_unset_reason(self, name):
if not self.shared_drive_path:
return "invalid shared drive path: %r" % (self.shared_drive_path,)
if not os.path.isdir(self.shared_drive_path):
return "shared drive path is not a directory: %r" % (self.shared_drive_path,)
directory = getattr(self, name + "_name")
if not directory:
return name + " is empty or not configured in settings"
return None
@property
def restore_dir(self):
return self._restore_dir or tempfile.gettempdir()
@property
def transfer_enabled(self):
from django_transfer import is_enabled
return is_enabled() and self.transfer_dir
def get_temp_file(self, suffix="", prefix="tmp"):
name = '{}{}{}'.format(prefix, uuid.uuid4().hex, suffix)
return os.path.join(self.temp_dir, name)
def get_server_url(http_method, server_root, username, password):
if username and password:
return '%(http_method)s://%(user)s:%(pass)s@%(server)s' % {
'http_method': http_method,
'user': username,
'pass': password,
'server': server_root,
}
else:
return '%(http_method)s://%(server)s' % {
'http_method': http_method,
'server': server_root,
}
def get_dynamic_db_settings(server_root, username, password, dbname,
use_https=False):
"""
Get dynamic database settings.
Other apps can use this if they want to change settings
"""
http_method = 'https' if use_https else 'http'
server_url = get_server_url(http_method, server_root, username, password)
database = '%(server)s/%(database)s' % {
'server': server_url,
'database': dbname,
}
return {
'COUCH_SERVER': server_url,
'COUCH_DATABASE': database,
}
def get_db_name(dbname, is_test):
"""Get databse name (possibly with test prefix)
:param is_test: Add test prefix if true.
"""
return (TEST_DATABASE_PREFIX + dbname) if is_test else dbname
def assign_test_db_names(dbs):
"""Fix database names for REUSE_DB
Django automatically uses test database names when testing, but
only if the test database setup routine is called. This allows us
to safely skip the test database setup with REUSE_DB.
"""
for db in dbs.values():
test_db_name = get_db_name(db['NAME'], True)
db['NAME'] = db.setdefault('TEST', {}).setdefault('NAME', test_db_name)
class CouchSettingsHelper(namedtuple('CouchSettingsHelper',
['couch_database_url', 'couchdb_apps', 'extra_db_names'])):
def make_couchdb_tuples(self):
"""
Helper function to generate couchdb tuples
for mapping app name to couch database URL.
"""
return [self._make_couchdb_tuple(row) for row in self.couchdb_apps]
def _format_db_uri(self, db_uri):
return db_uri
def _make_couchdb_tuple(self, row):
if isinstance(row, basestring):
app_label, postfix = row, None
else:
app_label, postfix = row
if postfix:
return app_label, self._format_db_uri('%s__%s' % (self.couch_database_url, postfix))
else:
return app_label, self._format_db_uri(self.couch_database_url)
def get_extra_couchdbs(self):
"""
Create a mapping from database prefix to database url
"""
extra_dbs = {}
postfixes = []
for row in self.couchdb_apps:
if isinstance(row, tuple):
_, postfix = row
if postfix:
postfixes.append(postfix)
postfixes.extend(self.extra_db_names)
for postfix in postfixes:
extra_dbs[postfix] = self._format_db_uri('%s__%s' % (self.couch_database_url, postfix))
return extra_dbs
def celery_failure_handler(task, exc, task_id, args, kwargs, einfo):
from redis.exceptions import ConnectionError
from django_redis.exceptions import ConnectionInterrupted
if isinstance(exc, (ConnectionInterrupted, ConnectionError)):
task.retry(args=args, kwargs=kwargs, exc=exc, max_retries=3, countdown=60 * 5)
def get_allowed_websocket_channels(request, channels):
from django.core.exceptions import PermissionDenied
if request.user and request.user.is_authenticated() and request.user.is_superuser:
return channels
else:
# todo: support for non-superusers
# it might be easier to wait on supporting this until we switch to channels
raise PermissionDenied(
'Not allowed to subscribe or to publish to websockets without '
'superuser permissions or domain membership!'
)
def fix_logger_obfuscation(fix_logger_obfuscation_, logging_config):
if fix_logger_obfuscation_:
# this is here because the logging config cannot import
# corehq.util.log.HqAdminEmailHandler, for example, if there
# is a syntax error in any module imported by corehq/__init__.py
# Setting FIX_LOGGER_ERROR_OBFUSCATION = True in
# localsettings.py will reveal the real error.
# Note that changing this means you will not be able to use/test anything
# related to email logging.
for handler in logging_config["handlers"].values():
if handler["class"].startswith("corehq."):
if fix_logger_obfuscation_ != 'quiet':
print("{} logger is being changed to {}".format(
handler['class'],
'logging.StreamHandler'
), file=sys.stderr)
handler["class"] = "logging.StreamHandler"