forked from projectbuendia/buendia
-
Notifications
You must be signed in to change notification settings - Fork 0
/
profile_apply
executable file
·382 lines (329 loc) · 16.2 KB
/
profile_apply
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
#!/usr/bin/env python
import csv
import MySQLdb
import os
import re
import sys
import uuid
CHART_UUID = 'ea43f213-66fb-4af6-8a49-70fd6b9ce5d4'
LOCALE = 'en_GB_client'
class Database:
def __init__(self, database, user, password):
self.db = MySQLdb.connect(db=database, user=user, passwd=password)
cursor = self.db.cursor()
cursor.execute('set character_set_client = utf8;')
cursor.execute('set character_set_server = utf8;')
cursor.execute('set character_set_connection = utf8;')
cursor.execute('set character_set_results = utf8;')
cursor.execute('set character_set_database = utf8;')
cursor.close()
self.commit = self.db.commit
def iter(self, query, *params):
cursor = self.db.cursor()
cursor.execute(query, params)
return iter(cursor)
def execute(self, query, *params):
cursor = self.db.cursor()
cursor.execute(query, params)
cursor.close()
def get(self, field, table=None, **constraints):
table = table or field.endswith('_id') and field[:-3]
items = constraints.items()
condition = ' and '.join(key + ' = %s' for (key, value) in items)
for row in self.iter(
'select %s from %s where %s' % (field, table, condition),
*(value for (key, value) in items)):
return row[0]
class OpenmrsDatabase:
def __init__(self, db, username):
self.db = db
self.user_id = db.get('user_id', 'users', username=username)
self.columns = {}
def get_columns(self, table):
if table not in self.columns:
self.columns[table] = [row[0]
for row in self.db.iter('describe ' + table)]
return self.columns[table]
def insert(self, table, **kwargs):
kwargs.setdefault('creator', self.user_id)
kwargs.setdefault('uuid', uuid.uuid4())
columns = kwargs.keys() + ['date_created']
values = kwargs.values()
placeholders = ['%s'] * len(values) + ['now()']
self.db.execute(
'insert into %s (%s) values (%s)' %
(table, ', '.join(columns), ', '.join(placeholders)), *values)
if table + '_id' in self.get_columns(table):
return self.db.get(table + '_id', uuid=kwargs['uuid'])
def update(self, table, id, **kwargs):
pairs = [column + ' = %s' for column in kwargs.keys()]
values = kwargs.values()
if 'changed_by' in self.get_columns(table):
pairs += ['changed_by = %s', 'date_changed = now()']
values += [self.user_id]
self.db.execute(
'update %s set %s where %s = %%s' %
(table, ', '.join(pairs), table + '_id'), *(values + [id]))
return id
def get_db(database):
pipe = os.popen("bash -c '. /usr/share/buendia/utils.sh; "
"echo $OPENMRS_MYSQL_USER; echo $OPENMRS_MYSQL_PASSWORD'")
user = pipe.readline().strip() or 'openmrs_user'
password = pipe.readline().strip() or 'openmrs'
return Database(database, user, password)
def read_csv(filename, tab=None):
"""Returns a dictionary mapping tab names to lists of row dictionaries."""
infile = open(filename)
tabs = {}
for row in csv.DictReader(infile):
tab = row['tab'] or tab
tabs.setdefault(tab, []).append(row)
return tabs
def apply(tabs):
db = get_db('openmrs')
odb = OpenmrsDatabase(db, 'buendia_admin')
concept_field_type = db.get('field_type_id', name='Concept')
element_field_type = db.get('field_type_id', name='Database element')
section_field_type = db.get('field_type_id', name='Section')
coded_datatype = db.get('concept_datatype_id', name='Coded')
numeric_datatype = db.get('concept_datatype_id', name='Numeric')
text_datatype = db.get('concept_datatype_id', name='Text')
answer_datatype = db.get('concept_datatype_id', name='N/A')
datatypes_by_type = {
'yes_no': coded_datatype, # unknown (1067), no (1066), or yes (1065)
'select_one': coded_datatype,
'select_multiple': coded_datatype, # no (1066) or yes (1065)
'number': numeric_datatype,
'text': text_datatype,
}
# "Finding", "Symptom", and "Symptom/Finding" all seem like reasonable
# classes for observations; "Finding" is by far the most commonly used.
obs_class = db.get('concept_class_id', name='Finding')
def get_or_insert(table, **values):
return (db.get(table + '_id', **values) or odb.insert(table, **values))
def update_or_insert(table, id, **values):
if db.get(table + '_id', **{table + '_id': id}):
odb.update(table, id, **values)
else:
odb.insert(table, **dict(values.items() + [(table + '_id', id)]))
def get_field_for_element(table, attribute, name=None):
field_id = get_or_insert('field', field_type=element_field_type,
table_name=table, attribute_name=attribute)
return odb.update('field', field_id,
name=name or table + '.' + attribute)
def get_field_for_section(name, description=None, concept_id=None):
return get_or_insert('field', field_type=section_field_type,
name=name, description=description or name,
concept_id=concept_id)
def get_field_for_concept(concept_id, name=None, description=None):
name = name or db.get('name', 'concept_name', concept_id=concept_id)
return get_or_insert('field', field_type=concept_field_type,
name=name, description=description or name,
concept_id=concept_id)
def add_field_to_form(form_id, field_id, parent_form_field_id=None,
field_number=None, sort_weight=1, required=0):
return odb.insert('form_field', form_id=form_id, field_id=field_id,
parent_form_field=parent_form_field_id,
field_number=field_number, sort_weight=sort_weight,
required=required)
def set_concept_name(concept_id, name, locale):
if name:
concept_name_id = get_or_insert(
'concept_name', concept_id=concept_id, locale=locale)
return odb.update('concept_name', concept_name_id,
name=name, locale_preferred=1)
def set_concept_numeric(concept_id, (normal_low, normal_high),
(noncritical_low, noncritical_high),
(absolute_low, absolute_high), units, precise=1):
db.execute('delete from concept_numeric'
' where concept_id = %s', concept_id)
db.execute('insert into concept_numeric'
' (concept_id, low_normal, hi_normal, low_critical,'
' hi_critical, low_absolute, hi_absolute, units, precise)'
' values (%s, %s, %s, %s, %s, %s, %s, %s, %s)',
concept_id, normal_low, normal_high, noncritical_low,
noncritical_high, absolute_low, absolute_high,
units, precise)
def put_coded_concept(concept_id, name, locale):
put_concept(concept_id, name, locale, coded_datatype, obs_class)
def put_answer_concept(concept_id, name, locale):
put_concept(concept_id, name, locale, answer_datatype, obs_class)
def set_concept_answers(concept_id, answer_concept_ids):
db.execute(
'delete from concept_answer where concept_id = %s', concept_id)
for i, answer_concept_id in enumerate(answer_concept_ids):
odb.insert('concept_answer', concept_id=concept_id,
answer_concept=answer_concept_id, sort_weight=i)
def put_concept(concept_id, name, locale, datatype_id, class_id):
update_or_insert('concept', concept_id,
datatype_id=datatype_id, class_id=class_id, retired=0)
set_concept_name(concept_id, name, locale)
def apply_charts(rows):
chart_layout = get_or_insert(
'encounter_type', name='buendia-chart_layout',
description='Buendia: patient chart layout definition')
for chart_rows in split_by_title(rows):
put_chart(chart_rows)
def apply_chart(rows, form_id):
"""Applies the chart definition given rows from the chart tab."""
section = None
grid_rows = []
for row in rows:
section = row['section'] or section
concept_id = validate_int(row['concept'])
label = row['label']
if section == 'grid' and concept_id:
grid_rows.append((concept_id, label))
apply_grid(grid_rows, form_id)
def clear_form(form_id):
# Clear out the references to form_field_id so deletion can proceed.
db.execute('update form_field set parent_form_field = null'
' where form_id = %s', form_id)
db.execute('delete from form_field where form_id = %s', form_id)
def validate_type(type):
type = type.strip().lower()
if not type:
return None, None
elif type in datatypes_by_type:
return type, datatypes_by_type[type]
else:
return 'n/a', None
def validate_int(text):
try:
return int(text)
except ValueError:
return None
def validate_range(range):
try:
low, high = re.split(r'\.\.+', range or '', 1)
low = low.strip() and float(low) or None
high = high.strip() and float(high) or None
return low, high
except ValueError:
return None, None
def apply_grid(grid_rows, form_id):
"""
Applies the desired selection and sequence of grid rows by making
changes to the given chart form. The OpenMRS data model consists of:
- 'form' table: each row is a form
- 'form_field' table: puts fields in forms and determines their order
- 'field' table: each row is a field (usually linked to a concept)
"""
clear_form(form_id)
# Add a single section under which we put all the fields (to satisfy
# the client's assumption that all fields are grouped under sections).
field_id = get_field_for_section('All', concept_id=1163)
section_id = add_field_to_form(form_id, field_id, field_number=1)
# Add the fields to the section, setting the appropriate concept names.
# Concepts must already exist (we can't create them here, because we
# don't know what data type they should have).
for i, (concept_id, label) in enumerate(grid_rows):
if db.get('concept_id', concept_id=concept_id):
set_concept_name(concept_id, label, LOCALE)
field_id = get_field_for_concept(concept_id, label)
add_field_to_form(form_id, field_id, section_id, i + 1)
def split_by_title(rows):
row_numbers = []
for i in range(len(rows)):
if rows[i].get('title'):
row_numbers.append(i)
row_numbers.append(len(rows))
for start, stop in zip(row_numbers[:-1], row_numbers[1:]):
yield rows[start:stop]
def apply_forms(rows):
db.execute('update form set published = 0')
for form_rows in split_by_title(rows):
put_form(form_rows)
def put_form(rows):
adult_return = db.get('encounter_type_id', name='ADULTRETURN')
title = rows[0]['title']
uuid = 'buendia-form-' + re.sub('[\W_]+', '_', title.lower()).strip('_')
form_id = db.get('form_id', uuid=uuid)
if not form_id:
form_id = odb.insert('form', name=title, version='1',
encounter_type=adult_return, uuid=uuid)
apply_form(rows, form_id)
db.execute('update form set published = 1 where form_id = %s', form_id)
def apply_form(rows, form_id):
"""
Applies the desired selection and sequence of grid rows by making
changes to the given form. The OpenMRS data model consists of:
- 'form' table: each row is a form
- 'form_field' table: puts fields in forms and determines their order
- 'field' table: each row is a field (usually linked to a concept)
- 'concept' table: each row is a concept with a datatype and class
- 'concept_answer' table: links answer concepts to parent concepts
- 'concept_name' table: provides localized names for each concept
"""
clear_form(form_id)
# Add default fields
fn = 1
field_id = get_field_for_section('ENCOUNTER', 'Encounter')
section_id = add_field_to_form(form_id, field_id, None, fn)
field_id = get_field_for_element('encounter', 'encounter_datetime')
add_field_to_form(form_id, field_id, section_id, sort_weight=1)
field_id = get_field_for_element('encounter', 'location_id')
add_field_to_form(form_id, field_id, section_id, sort_weight=2)
field_id = get_field_for_element('encounter', 'provider_id')
add_field_to_form(form_id, field_id, section_id, sort_weight=3)
section_id = None
last_type = None
select_concept_id, select_answer_ids = None, []
for i, row in enumerate(rows):
if row['section']:
fn += 1
field_id = get_field_for_section(row['section'])
section_id = add_field_to_form(form_id, field_id, None, fn)
last_type = None
continue
type, datatype = validate_type(row['type'])
concept_id = validate_int(row['concept'])
label = row['label']
option_concept_id = validate_int(row['option concept'])
option_label = row['option label']
if type:
if type == 'select_multiple':
fn += 1
label = row['label']
field_id = get_field_for_section(label + '[binary]', label)
section_id = add_field_to_form(form_id, field_id, None, fn)
elif last_type == 'select_multiple':
fn += 1
field_id = get_field_for_section('[invisible]')
section_id = add_field_to_form(form_id, field_id, None, fn)
if (concept_id and
type in ['number', 'text', 'yes_no', 'select_one']):
put_concept(concept_id, label, LOCALE, datatype, obs_class)
if type == 'yes_no':
set_concept_answers(concept_id, [1067, 1066, 1065])
if datatype == numeric_datatype:
set_concept_numeric(
concept_id,
validate_range(row.get('normal range')),
validate_range(row.get('noncritical range')),
validate_range(row.get('absolute range')),
row.get('units'))
field_id = get_field_for_concept(concept_id, label)
add_field_to_form(form_id, field_id, section_id, None, i,
validate_int(row.get('required')) or 0)
select_concept_id, select_answer_ids = concept_id, []
last_type = type or last_type
if option_concept_id:
if last_type == 'select_one':
put_answer_concept(option_concept_id, option_label, LOCALE)
select_answer_ids += [option_concept_id]
set_concept_answers(select_concept_id, select_answer_ids)
if last_type == 'select_multiple':
put_coded_concept(option_concept_id, option_label, LOCALE)
set_concept_answers(option_concept_id, [1066, 1065])
field_id = get_field_for_concept(
option_concept_id, option_label)
add_field_to_form(form_id, field_id, section_id, None, i,
validate_int(row.get('required')) or 0)
if 'form' in tabs:
apply_forms(tabs['form'])
if 'chart' in tabs:
apply_chart(tabs['chart'], db.get('form_id', uuid=CHART_UUID))
db.commit()
if __name__ == '__main__':
apply(read_csv(sys.argv[1]))