forked from hermes-router/hermes
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathroute_series.py
executable file
·443 lines (359 loc) · 21 KB
/
route_series.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
import os
from pathlib import Path
import uuid
import json
import shutil
import daiquiri
# App-specific includes
import common.config as config
import common.rule_evaluation as rule_evaluation
import common.monitor as monitor
import common.helper as helper
import common.notification as notification
from common.constants import mercure_defs, mercure_names, mercure_actions, mercure_rule, mercure_config, mercure_options, mercure_folders, mercure_events
from routing.generate_taskfile import generate_taskfile_route, generate_taskfile_process
logger = daiquiri.getLogger("route_series")
def route_series(series_UID):
"""Processes the series with the given series UID from the incoming folder."""
lock_file=Path(config.mercure['incoming_folder'] + '/' + str(series_UID) + mercure_names.LOCK)
if lock_file.exists():
# Series is locked, so another instance might be working on it
return
try:
lock=helper.FileLock(lock_file)
except:
# Can't create lock file, so something must be seriously wrong
logger.error(f'Unable to create lock file {lock_file}')
monitor.send_event(monitor.h_events.PROCESSING, monitor.severity.ERROR, f'Unable to create lock file {lock_file}')
return
logger.info(f'Processing series {series_UID}')
fileList = []
seriesPrefix=series_UID+"#"
# Collect all files belonging to the series
for entry in os.scandir(config.mercure['incoming_folder']):
if entry.name.endswith(mercure_names.TAGS) and entry.name.startswith(seriesPrefix) and not entry.is_dir():
stemName=entry.name[:-5]
fileList.append(stemName)
logger.info("DICOM files found: "+str(len(fileList)))
# Use the tags file from the first slice for evaluating the routing rules
tagsMasterFile=Path(config.mercure['incoming_folder'] + '/' + fileList[0] + mercure_names.TAGS)
if not tagsMasterFile.exists():
logger.error(f'Missing file! {tagsMasterFile.name}')
monitor.send_event(monitor.h_events.PROCESSING, monitor.severity.ERROR, f'Missing file {tagsMasterFile.name}')
return
try:
with open(tagsMasterFile, "r") as json_file:
tagsList=json.load(json_file)
except Exception:
logger.exception(f"Invalid tag information of series {series_UID}")
monitor.send_series_event(monitor.s_events.ERROR, entry, 0, "", "Invalid tag information")
monitor.send_event(monitor.h_events.PROCESSING, monitor.severity.ERROR, f"Invalid tag for series {series_UID}")
return
monitor.send_register_series(tagsList)
monitor.send_series_event(monitor.s_events.REGISTERED, series_UID, len(fileList), "", "")
discard_series = ""
# Now test the routing rules and evaluate which rules have been triggered. If one of the triggered
# rules enforces discarding, discard_series will be True.
triggered_rules, discard_series = get_triggered_rules(tagsList)
if (len(triggered_rules)==0) or (discard_series):
# If no routing rule has triggered or discarding has been enforced, discard the series
push_series_discard(fileList,series_UID,discard_series)
else:
# Strategy: If only one triggered rule, move files. If multiple, copy files
push_series_studylevel(triggered_rules,fileList,series_UID,tagsList)
push_series_serieslevel(triggered_rules,fileList,series_UID,tagsList)
if (len(triggered_rules)>1):
remove_series(fileList)
try:
lock.free()
except:
# Can't delete lock file, so something must be seriously wrong
logger.error(f'Unable to remove lock file {lock_file}')
monitor.send_event(monitor.h_events.PROCESSING, monitor.severity.ERROR, f'Unable to remove lock file {lock_file}')
return
def get_triggered_rules(tagList):
"""Evaluates the routing rules and returns a list with trigger rules."""
triggered_rules = {}
discard_rule = ""
for current_rule in config.mercure["rules"]:
try:
if config.mercure["rules"][current_rule].get("disabled","False")=="True":
continue
if rule_evaluation.parse_rule(config.mercure["rules"][current_rule].get("rule","False"),tagList):
triggered_rules[current_rule]=current_rule
if config.mercure["rules"][current_rule].get("action","")==mercure_actions.DISCARD:
discard_rule=current_rule
break
except Exception as e:
logger.error(e)
logger.error(f"Invalid rule found: {current_rule}")
monitor.send_event(monitor.h_events.PROCESSING, monitor.severity.ERROR, f"Invalid rule: {current_rule}")
continue
logger.info("Triggered rules:")
logger.info(triggered_rules)
return triggered_rules, discard_rule
def push_series_discard(fileList,series_UID,discard_series):
"""Discards the series by moving all files into the "discard" folder, which is periodically cleared."""
# Define the source and target folder. Use UUID as name for the target folder in the
# discard directory to avoid collisions
discard_path =config.mercure['discard_folder'] + '/' + str(uuid.uuid1())
discard_folder=discard_path + '/'
source_folder =config.mercure['incoming_folder'] + '/'
# Create subfolder in the discard directory and validate that is has been created
try:
os.mkdir(discard_path)
except Exception:
logger.exception(f'Unable to create outgoing folder {discard_path}')
monitor.send_event(monitor.h_events.PROCESSING, monitor.severity.ERROR, f'Unable to create discard folder {discard_path}')
return
if not Path(discard_path).exists():
logger.error(f'Creating discard folder not possible {discard_path}')
monitor.send_event(monitor.h_events.PROCESSING, monitor.severity.ERROR, f'Creating discard folder not possible {discard_path}')
return
# Create lock file in destination folder (to prevent the cleaner module to work on the folder). Note that
# the DICOM series in the incoming folder has already been locked in the parent function.
try:
lock_file=Path(discard_path / mercure_names.LOCK)
lock=helper.FileLock(lock_file)
except:
# Can't create lock file, so something must be seriously wrong
logger.error(f'Unable to create lock file {lock_file}')
monitor.send_event(monitor.h_events.PROCESSING, monitor.severity.ERROR, f'Unable to create lock file in discard folder {lock_file}')
return
info_text = ""
if discard_series:
info_text = "Discard by rule " + discard_series
monitor.send_series_event(monitor.s_events.DISCARD, series_UID, len(fileList), "", info_text)
for entry in fileList:
try:
shutil.move(source_folder+entry+mercure_names.DCM,discard_folder+entry+mercure_names.DCM)
shutil.move(source_folder+entry+mercure_names.TAGS,discard_folder+entry+mercure_names.TAGS)
except Exception:
logger.exception(f'Problem while discarding file {entry}')
logger.exception(f'Source folder {source_folder}')
logger.exception(f'Target folder {discard_folder}')
monitor.send_event(monitor.h_events.PROCESSING, monitor.severity.ERROR, f'Problem during discarding file {entry}')
monitor.send_series_event(monitor.s_events.MOVE, series_UID, len(fileList), discard_path, "")
try:
lock.free()
except:
# Can't delete lock file, so something must be seriously wrong
logger.error(f'Unable to remove lock file {lock_file}')
monitor.send_event(monitor.h_events.PROCESSING, monitor.severity.ERROR, f'Unable to remove lock file {lock_file}')
return
def push_series_studylevel(triggered_rules,file_list,series_UID,tags_list):
"""Prepeares study-level routing for the current series."""
# Move series into individual study-level folder for every rule
for current_rule in triggered_rules:
if config.mercure[mercure_config.RULES][current_rule].get(mercure_rule.ACTION_TRIGGER,mercure_options.SERIES)==mercure_options.STUDY:
folder_name=series_UID+mercure_defs.SEPARATOR+current_rule
if (not os.path.exists(folder_name)):
try:
os.mkdir(folder_name)
except:
logger.error(f'Unable to create folder {folder_name}')
monitor.send_event(monitor.h_events.PROCESSING, monitor.severity.ERROR, f'Unable to create folder {folder_name}')
continue
push_files(file_list, folder_name, (len(triggered_rules)>1))
def push_series_serieslevel(triggered_rules,file_list,series_UID,tags_list):
"""Prepeares all series-level routings for the current series."""
push_serieslevel_routing(triggered_rules,file_list,series_UID,tags_list)
push_serieslevel_processing(triggered_rules,file_list,series_UID,tags_list)
push_serieslevel_notification(triggered_rules,file_list,series_UID,tags_list)
def trigger_serieslevel_notification_reception(current_rule,tags_list):
notification.send_webhook(config.mercure[mercure_config.RULES][current_rule].get(mercure_rule.NOTIFICATION_WEBHOOK,""),
config.mercure[mercure_config.RULES][current_rule].get(mercure_rule.NOTIFICATION_PAYLOAD,""),
mercure_events.RECEPTION)
def push_serieslevel_routing(triggered_rules,file_list,series_UID,tags_list):
selected_targets = {}
# Collect the dispatch-only targets to avoid that a series is sent twice to the
# same target due to multiple targets triggered (note: this only makes sense for
# routing-only tasks)
for current_rule in triggered_rules:
if config.mercure[mercure_config.RULES][current_rule].get(mercure_rule.ACTION_TRIGGER,mercure_options.SERIES)==mercure_options.SERIES:
if config.mercure[mercure_config.RULES][current_rule].get(mercure_rule.ACTION,"")==mercure_actions.ROUTE:
target=config.mercure["rules"][current_rule].get("target","")
if target:
selected_targets[target]=current_rule
trigger_serieslevel_notification_reception(current_rule,tags_list)
push_serieslevel_outgoing(triggered_rules,file_list,series_UID,tags_list,selected_targets)
def push_serieslevel_processing(triggered_rules,file_list,series_UID,tags_list):
for current_rule in triggered_rules:
if config.mercure[mercure_config.RULES][current_rule].get(mercure_rule.ACTION_TRIGGER,mercure_options.SERIES)==mercure_options.SERIES:
if ((config.mercure[mercure_config.RULES][current_rule].get(mercure_rule.ACTION,"")==mercure_actions.PROCESS) or
(config.mercure[mercure_config.RULES][current_rule].get(mercure_rule.ACTION,"")==mercure_actions.BOTH)):
# Determine if the files should be copied or moved. If only one rule triggered, files can
# safely be moved, otherwise files will be moved and removed in the end
copy_files=True
if len(triggered_rules)==1:
copy_files=False
folder_name=config.mercure[mercure_folders.PROCESSING] + '/' + str(uuid.uuid1())
target_folder=folder_name+"/"
try:
os.mkdir(folder_name)
except Exception:
logger.exception(f'Unable to create outgoing folder {folder_name}')
monitor.send_event(monitor.h_events.PROCESSING, monitor.severity.ERROR, f'Unable to create processing folder {folder_name}')
return
if not Path(folder_name).exists():
logger.error(f'Creating folder not possible {folder_name}')
monitor.send_event(monitor.h_events.PROCESSING, monitor.severity.ERROR, f'Creating folder not possible {folder_name}')
return
try:
lock_file=Path(folder_name / mercure_names.LOCK)
lock=helper.FileLock(lock_file)
except:
# Can't create lock file, so something must be seriously wrong
logger.error(f'Unable to create lock file {lock_file}')
monitor.send_event(monitor.h_events.PROCESSING, monitor.severity.ERROR, f'Unable to create lock file {lock_file}')
return
# Generate task file with dispatch information
task_filename = target_folder + mercure_names.TASKFILE
task_json = generate_taskfile_process(series_UID, mercure_options.SERIES, current_rule, tags_list)
try:
with open(task_filename, 'w') as task_file:
json.dump(task_json, task_file)
except:
logger.error(f"Unable to create task file {task_filename}")
monitor.send_event(monitor.h_events.PROCESSING, monitor.severity.ERROR, f"Unable to create task file {task_filename}")
continue
if (not push_files(file_list, target_folder, copy_files)):
logger.error(f'Unable to push files into processing folder {target_folder}')
monitor.send_event(monitor.h_events.PROCESSING, monitor.severity.ERROR, f'Unable to push files into processing folder {target_folder}')
return
lock.free()
trigger_serieslevel_notification_reception(current_rule,tags_list)
def push_serieslevel_notification(triggered_rules,file_list,series_UID,tags_list):
for current_rule in triggered_rules:
if config.mercure[mercure_config.RULES][current_rule].get(mercure_rule.ACTION_TRIGGER,mercure_options.SERIES)==mercure_options.SERIES:
if config.mercure[mercure_config.RULES][current_rule].get(mercure_rule.ACTION,"")==mercure_actions.NOTIFICATION:
trigger_serieslevel_notification_reception(current_rule,tags_list)
# If the current rule is "notification-only" and this is the only rule that
# has been triggered, then remove the files (if more than one rule has been
# triggered, the parent function will take care of it)
if (len(triggered_rules==1)):
remove_series(file_list)
def remove_series(file_list):
"""Deletes the given files from the incoming folder."""
source_folder=config.mercure[mercure_folders.INCOMING] + '/'
for entry in file_list:
try:
os.remove(source_folder+entry+mercure_names.TAGS)
os.remove(source_folder+entry+mercure_names.DCM)
except Exception:
logger.exception(f'Error while removing file {entry}')
monitor.send_event(monitor.h_events.PROCESSING, monitor.severity.ERROR, f'Error while removing file {entry}')
def push_files(file_list, target_path, copy_files):
"""Copies or moves the given files to the target path. If copy_files is True, files are copied, otherwise moved."""
if (copy_files==False):
operation=shutil.move
else:
operation=shutil.copy
source_folder=config.mercure[mercure_folders.INCOMING] + '/'
target_folder=target_path + '/'
# TODO: Secure operation with lock file
for entry in file_list:
try:
operation(source_folder+entry+mercure_names.DCM, target_folder+entry+mercure_names.DCM)
operation(source_folder+entry+mercure_names.TAGS,target_folder+entry+mercure_names.TAGS)
except Exception:
logger.exception(f'Problem while pushing file to outgoing {entry}')
logger.exception(f'Source folder {source_folder}')
logger.exception(f'Target folder {target_folder}')
monitor.send_event(monitor.h_events.PROCESSING, monitor.severity.ERROR, f'Problem while pushing file to outgoing {entry}')
return False
return True
def push_serieslevel_outgoing(triggered_rules,file_list,series_UID,tags_list,selected_targets):
"""Move the DICOM files of the series to a separate subfolder for each target in the outgoing folder."""
source_folder=config.mercure['incoming_folder'] + '/'
# Determine if the files should be copied or moved. If only one rule triggered, files can
# safely be moved, otherwise files will be moved and removed in the end
move_operation=False
if len(triggered_rules)==1:
move_operation=True
for target in selected_targets:
if not target in config.mercure["targets"]:
logger.error(f"Invalid target selected {target}")
monitor.send_event(monitor.h_events.PROCESSING, monitor.severity.ERROR, f"Invalid target selected {target}")
continue
folder_name=config.mercure['outgoing_folder'] + '/' + str(uuid.uuid1())
target_folder=folder_name+"/"
try:
os.mkdir(folder_name)
except Exception:
logger.exception(f'Unable to create outgoing folder {folder_name}')
monitor.send_event(monitor.h_events.PROCESSING, monitor.severity.ERROR, f'Unable to create outgoing folder {folder_name}')
return
if not Path(folder_name).exists():
logger.error(f'Creating folder not possible {folder_name}')
monitor.send_event(monitor.h_events.PROCESSING, monitor.severity.ERROR, f'Creating folder not possible {folder_name}')
return
try:
lock_file=Path(folder_name / mercure_names.LOCK)
lock=helper.FileLock(lock_file)
except:
# Can't create lock file, so something must be seriously wrong
logger.error(f'Unable to create lock file {lock_file}')
monitor.send_event(monitor.h_events.PROCESSING, monitor.severity.ERROR, f'Unable to create lock file {lock_file}')
return
# Generate task file with dispatch information
task_filename = target_folder + mercure_names.TASKFILE
task_json = generate_taskfile_route(series_UID, mercure_options.SERIES, selected_targets[target], tags_list, target)
try:
with open(task_filename, 'w') as task_file:
json.dump(task_json, task_file)
except:
logger.error(f"Unable to create task file {task_filename}")
monitor.send_event(monitor.h_events.PROCESSING, monitor.severity.ERROR, f"Unable to create task file {task_filename}")
continue
monitor.send_series_event(monitor.s_events.ROUTE, series_UID, len(file_list), target, selected_targets[target])
if move_operation:
operation=shutil.move
else:
operation=shutil.copy
for entry in file_list:
try:
operation(source_folder+entry+mercure_names.DCM, target_folder+entry+mercure_names.DCM)
operation(source_folder+entry+mercure_names.TAGS,target_folder+entry+mercure_names.TAGS)
except Exception:
logger.exception(f'Problem while pushing file to outgoing {entry}')
logger.exception(f'Source folder {source_folder}')
logger.exception(f'Target folder {target_folder}')
monitor.send_event(monitor.h_events.PROCESSING, monitor.severity.ERROR, f'Problem while pushing file to outgoing {entry}')
monitor.send_series_event(monitor.s_events.MOVE, series_UID, len(file_list), folder_name, "")
try:
lock.free()
except:
# Can't delete lock file, so something must be seriously wrong
logger.error(f'Unable to remove lock file {lock_file}')
monitor.send_event(monitor.h_events.PROCESSING, monitor.severity.ERROR, f'Unable to remove lock file {lock_file}')
return
def route_error_files():
"""
Looks for error files, moves these files and the corresponding DICOM files to the error folder,
and sends an alert to the bookkeeper instance.
"""
error_files_found = 0
for entry in os.scandir(config.mercure['incoming_folder']):
if entry.name.endswith(".error") and not entry.is_dir():
# Check if a lock file exists. If not, create one.
lock_file=Path(config.mercure['incoming_folder'] / entry.name + mercure_names.LOCK)
if lock_file.exists():
continue
try:
lock=helper.FileLock(lock_file)
except:
continue
logger.error(f'Found incoming error file {entry.name}')
error_files_found += 1
shutil.move(config.mercure['incoming_folder'] + '/' + entry.name,
config.mercure['error_folder'] + '/' + entry.name)
dicom_filename = entry.name[:-6]
dicom_file = Path(config.mercure['incoming_folder'] + '/' + dicom_filename)
if dicom_file.exists():
shutil.move(config.mercure['incoming_folder'] + '/' + dicom_filename,
config.mercure['error_folder'] + '/' + dicom_filename)
lock.free()
if error_files_found > 0:
monitor.send_event(monitor.h_events.PROCESSING, monitor.severity.ERROR, f'Error parsing {error_files_found} incoming files')
return