forked from microsoft/fhir-loader
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathImportUtils.cs
141 lines (133 loc) · 7.35 KB
/
ImportUtils.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
using Microsoft.Extensions.Logging;
using Newtonsoft.Json.Linq;
using System;
using System.Collections.Generic;
using System.IO;
using System.Net.Http;
using System.Text;
using System.Threading.Tasks;
namespace FHIRBulkImport
{
public static class ImportUtils
{
public static async Task ImportBundle(Stream myBlob, string name, ILogger log)
{
bool trbundles = true;
string strbundles = System.Environment.GetEnvironmentVariable("FBI-TRANSFORMBUNDLES");
if (!string.IsNullOrEmpty(strbundles) && (strbundles.ToLower().Equals("no") || strbundles.ToLower().Equals("false"))) trbundles = false;
if (myBlob == null) return;
log.LogInformation($"ImportFHIRBUndles: Processing file Name:{name} \n Size: {myBlob.Length}");
var cbclient = StorageUtils.GetCloudBlobClient(System.Environment.GetEnvironmentVariable("FBI-STORAGEACCT"));
StreamReader reader = new StreamReader(myBlob);
var text = await reader.ReadToEndAsync();
var trtext = (trbundles ? FHIRUtils.TransformBundle(text, log) : text);
var fhirbundle = await FHIRUtils.CallFHIRServer("", trtext, HttpMethod.Post, log);
var result = LoadErrorsDetected(trtext, fhirbundle, name, log);
//Bundle Post was Throttled we can retry
if (!fhirbundle.Success && fhirbundle.Status == System.Net.HttpStatusCode.TooManyRequests)
{
//Currently cannot use retry hints with EventGrid Trigger function bindings so we will throw and exception to enter eventgrid retry logic for FHIR Server throttling and do
//our own dead letter for internal errors or unrecoverable conditions
log.LogInformation($"ImportFHIRBUndles File Name:{name} is throttled...");
throw new Exception($"ImportFHIRBUndles: Transient Error File: {name}...Entering eventgrid retry process until success or ultimate failure to dead letter if configured.");
}
//No Errors move to processed container
if (fhirbundle.Success && ((JArray)result["errors"]).Count == 0 && ((JArray)result["throttled"]).Count == 0)
{
await StorageUtils.MoveTo(cbclient, "bundles", "bundlesprocessed", name, $"{name}.processed", log);
await StorageUtils.WriteStringToBlob(cbclient, "bundlesprocessed", $"{name}.processed.result", fhirbundle.Content, log);
log.LogInformation($"ImportFHIRBUndles Processed file Name:{name}");
}
//Handle Errors from FHIR Server of proxy
if (!fhirbundle.Success || ((JArray)result["errors"]).Count > 0)
{
await StorageUtils.MoveTo(cbclient, "bundles", "bundleserr", name, $"{name}.err", log);
await StorageUtils.WriteStringToBlob(cbclient, "bundleserr", $"{name}.err.response", fhirbundle.Content, log);
await StorageUtils.WriteStringToBlob(cbclient, "bundleserr", $"{name}.err.actionneeded", result.ToString(), log);
log.LogInformation($"ImportFHIRBUndles File Name:{name} had errors. Moved to deadletter bundleserr directory");
}
//Handle Throttled Requests inside of bundle so we will create a new bundle to retry
if (fhirbundle.Success && ((JArray)result["throttled"]).Count > 0)
{
var nb = ImportNDJSON.initBundle();
nb["entry"] = result["throttled"];
string fn = $"retry{Guid.NewGuid().ToString().Replace("-", "")}.json";
await StorageUtils.MoveTo(cbclient, "bundles", "bundlesprocessed", name, $"{name}.processed", log);
await StorageUtils.WriteStringToBlob(cbclient, "bundlesprocessed", $"{name}.processed.result", fhirbundle.Content, log);
await StorageUtils.WriteStringToBlob(cbclient, "bundles", fn, nb.ToString(), log);
log.LogInformation($"ImportFHIRBUndles File Name:{name} had throttled resources in response bundle. Moved to processed..Created retry bunde {fn}");
}
}
public static JObject LoadErrorsDetected(string source, FHIRResponse response, string name, ILogger log)
{
log.LogInformation($"ImportFHIRBundles:Checking for load errors file {name}");
JObject retVal = new JObject();
retVal["id"] = name;
retVal["errors"] = new JArray();
retVal["throttled"] = new JArray();
try
{
JObject so = JObject.Parse(source);
JObject o = JObject.Parse(response.Content);
int ec = 0;
if (o["entry"] != null && so["entry"] != null)
{
JArray oentries = (JArray)so["entry"];
JArray entries = (JArray)o["entry"];
if (oentries.Count != entries.Count)
{
log.LogWarning($"ImportFHIRBundles: Original resource count and response counts do not agree for file {name}");
}
foreach (JToken tok in entries)
{
if (tok["response"] != null && tok["response"]["status"] != null)
{
string s = (string)tok["response"]["status"];
int rc = 200;
if (int.TryParse(s, out rc))
{
if (rc < 200 || rc > 299)
{
if (rc == 429)
{
JArray ja = (JArray)retVal["throttled"];
ja.Add(oentries[ec]);
}
else
{
JObject errcontainer = new JObject();
errcontainer["resource"] = oentries[ec];
errcontainer["response"] = tok["response"];
JArray ja = (JArray)retVal["errors"];
ja.Add(errcontainer);
}
}
}
}
ec++;
}
JArray jac = (JArray)retVal["throttled"];
if (jac.Count > 0)
{
log.LogError($"ImportFHIRBundles: {jac.Count} resources were throttled by server for {name}");
}
jac = (JArray)retVal["errors"];
if (jac.Count > 0)
{
log.LogError($"ImportFHIRBundles: {jac.Count} errors detected in response entries for {name}");
}
}
else
{
log.LogWarning($"ImportFHIRBundles: Cannot detect resource entries in source/response for {name}");
}
return retVal;
}
catch (Exception e)
{
log.LogError($"ImportFHIRBundles: Unable to parse server response to check for errors file {name}:{e.Message}");
return retVal;
}
}
}
}